diff options
Diffstat (limited to 'src/payload-processor/processor')
-rw-r--r-- | src/payload-processor/processor/BUILD.bazel | 1 | ||||
-rw-r--r-- | src/payload-processor/processor/db.go | 5 | ||||
-rw-r--r-- | src/payload-processor/processor/mock_db.go | 8 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 26 | ||||
-rw-r--r-- | src/payload-processor/processor/processor_test.go | 6 |
5 files changed, 28 insertions, 18 deletions
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index 411300d..384c88b 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -11,7 +11,6 @@ go_library( visibility = ["//visibility:public"], deps = [ "//lib", - "@com_github_google_uuid//:uuid", "@com_github_jmoiron_sqlx//:sqlx", "@com_github_segmentio_kafka_go//:kafka-go", "@org_uber_go_mock//gomock", diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go index 45ec484..2200937 100644 --- a/src/payload-processor/processor/db.go +++ b/src/payload-processor/processor/db.go @@ -9,7 +9,7 @@ import ( //go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE type DB interface { - InsertFile(ctx context.Context, file lib.File) error + InsertFile(ctx context.Context, file lib.File, user_id string) error } type DBImpl struct { @@ -22,7 +22,8 @@ func NewDB(db *sqlx.DB) DB { return &DBImpl{db: db} } -func (db DBImpl) InsertFile(ctx context.Context, file lib.File) error { +func (db DBImpl) InsertFile(ctx context.Context, file lib.File, user_id string) error { + file.User_id = user_id _, err := db.db.NamedExecContext(ctx, "INSERT INTO private.file (user_id, absolute_path, contents, timestamp) VALUES (:user_id, :absolute_path, :contents, :timestamp)", file) return err } diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go index 1b283bd..f6347d7 100644 --- a/src/payload-processor/processor/mock_db.go +++ b/src/payload-processor/processor/mock_db.go @@ -41,15 +41,15 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder { } // InsertFile mocks base method. -func (m *MockDB) InsertFile(ctx context.Context, file lib.File) error { +func (m *MockDB) InsertFile(ctx context.Context, file lib.File, user_id string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InsertFile", ctx, file) + ret := m.ctrl.Call(m, "InsertFile", ctx, file, user_id) ret0, _ := ret[0].(error) return ret0 } // InsertFile indicates an expected call of InsertFile. -func (mr *MockDBMockRecorder) InsertFile(ctx, file any) *gomock.Call { +func (mr *MockDBMockRecorder) InsertFile(ctx, file, user_id any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file, user_id) } diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index ab128cb..1203fea 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -12,7 +12,6 @@ import ( "time" "github.com/Baitinq/fs-tracer-backend/lib" - "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/segmentio/kafka-go" ) @@ -70,17 +69,21 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { break } + user_id, err := getHeaderValue(m.Headers, "user_id") + if err != nil { + log.Fatal("failed to get user_id from headers:", err) + } + // TODO: Remove after testing if string(m.Value) == "" { m.Value = []byte(fmt.Sprintf(`{ - "user_id": "%s", "absolute_path": "/home/user/file.txt", "contents": "Hello, World!", "timestamp": "%s" - }`, uuid.New(), time.Now().Format(time.RFC3339))) + }`, time.Now().Format(time.RFC3339))) } - err = p.handleMessage(ctx, m) + err = p.handleMessage(ctx, m, string(user_id)) if err != nil { log.Println("failed to handle message:", err) p.handleError(ctx, m, err) @@ -90,8 +93,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { } } -func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { - fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value)) +func (p Processor) handleMessage(ctx context.Context, m kafka.Message, user_id string) error { + fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s, user_id = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value), user_id) var file lib.File err := json.Unmarshal(m.Value, &file) @@ -99,7 +102,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { return err } - err = p.db.InsertFile(ctx, file) + err = p.db.InsertFile(ctx, file, user_id) if err != nil { return err } @@ -113,3 +116,12 @@ func (p Processor) handleError(ctx context.Context, m kafka.Message, err error) p.kafka_reader.CommitMessages(ctx, m) } } + +func getHeaderValue(headers []kafka.Header, key string) ([]byte, error) { + for _, header := range headers { + if header.Key == key { + return header.Value, nil + } + } + return []byte{}, fmt.Errorf("Header %s not found", key) +} diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go index 39e965e..4a55e48 100644 --- a/src/payload-processor/processor/processor_test.go +++ b/src/payload-processor/processor/processor_test.go @@ -20,7 +20,6 @@ func TestProcessMessage(t *testing.T) { message := []byte(` { - "user_id": "1", "absolute_path": "/tmp/file.txt", "contents": "hello world", "timestamp": "2021-01-01T00:00:00Z" @@ -30,13 +29,12 @@ func TestProcessMessage(t *testing.T) { ctx := context.Background() mockdb.EXPECT().InsertFile(ctx, lib.File{ - User_id: "1", Absolute_path: "/tmp/file.txt", Contents: "hello world", Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), - }).Return(nil) + }, "USER_ID").Return(nil) - err := processor.handleMessage(ctx, kafka.Message{Value: message}) + err := processor.handleMessage(ctx, kafka.Message{Value: message}, "USER_ID") require.NoError(t, err) } |