about summary refs log tree commit diff
path: root/src/payload-processor/processor
diff options
context:
space:
mode:
Diffstat (limited to 'src/payload-processor/processor')
-rw-r--r--src/payload-processor/processor/BUILD.bazel1
-rw-r--r--src/payload-processor/processor/db.go5
-rw-r--r--src/payload-processor/processor/mock_db.go8
-rw-r--r--src/payload-processor/processor/processor.go26
-rw-r--r--src/payload-processor/processor/processor_test.go6
5 files changed, 28 insertions, 18 deletions
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel
index 411300d..384c88b 100644
--- a/src/payload-processor/processor/BUILD.bazel
+++ b/src/payload-processor/processor/BUILD.bazel
@@ -11,7 +11,6 @@ go_library(
     visibility = ["//visibility:public"],
     deps = [
         "//lib",
-        "@com_github_google_uuid//:uuid",
         "@com_github_jmoiron_sqlx//:sqlx",
         "@com_github_segmentio_kafka_go//:kafka-go",
         "@org_uber_go_mock//gomock",
diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go
index 45ec484..2200937 100644
--- a/src/payload-processor/processor/db.go
+++ b/src/payload-processor/processor/db.go
@@ -9,7 +9,7 @@ import (
 
 //go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE
 type DB interface {
-	InsertFile(ctx context.Context, file lib.File) error
+	InsertFile(ctx context.Context, file lib.File, user_id string) error
 }
 
 type DBImpl struct {
@@ -22,7 +22,8 @@ func NewDB(db *sqlx.DB) DB {
 	return &DBImpl{db: db}
 }
 
-func (db DBImpl) InsertFile(ctx context.Context, file lib.File) error {
+func (db DBImpl) InsertFile(ctx context.Context, file lib.File, user_id string) error {
+	file.User_id = user_id
 	_, err := db.db.NamedExecContext(ctx, "INSERT INTO private.file (user_id, absolute_path, contents, timestamp) VALUES (:user_id, :absolute_path, :contents, :timestamp)", file)
 	return err
 }
diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go
index 1b283bd..f6347d7 100644
--- a/src/payload-processor/processor/mock_db.go
+++ b/src/payload-processor/processor/mock_db.go
@@ -41,15 +41,15 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder {
 }
 
 // InsertFile mocks base method.
-func (m *MockDB) InsertFile(ctx context.Context, file lib.File) error {
+func (m *MockDB) InsertFile(ctx context.Context, file lib.File, user_id string) error {
 	m.ctrl.T.Helper()
-	ret := m.ctrl.Call(m, "InsertFile", ctx, file)
+	ret := m.ctrl.Call(m, "InsertFile", ctx, file, user_id)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InsertFile indicates an expected call of InsertFile.
-func (mr *MockDBMockRecorder) InsertFile(ctx, file any) *gomock.Call {
+func (mr *MockDBMockRecorder) InsertFile(ctx, file, user_id any) *gomock.Call {
 	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file, user_id)
 }
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index ab128cb..1203fea 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -12,7 +12,6 @@ import (
 	"time"
 
 	"github.com/Baitinq/fs-tracer-backend/lib"
-	"github.com/google/uuid"
 	"github.com/jmoiron/sqlx"
 	"github.com/segmentio/kafka-go"
 )
@@ -70,17 +69,21 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			break
 		}
 
+		user_id, err := getHeaderValue(m.Headers, "user_id")
+		if err != nil {
+			log.Fatal("failed to get user_id from headers:", err)
+		}
+
 		// TODO: Remove after testing
 		if string(m.Value) == "" {
 			m.Value = []byte(fmt.Sprintf(`{
-				"user_id": "%s",
 				"absolute_path": "/home/user/file.txt",
 				"contents": "Hello, World!",
 				"timestamp": "%s"
-			}`, uuid.New(), time.Now().Format(time.RFC3339)))
+			}`, time.Now().Format(time.RFC3339)))
 		}
 
-		err = p.handleMessage(ctx, m)
+		err = p.handleMessage(ctx, m, string(user_id))
 		if err != nil {
 			log.Println("failed to handle message:", err)
 			p.handleError(ctx, m, err)
@@ -90,8 +93,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 	}
 }
 
-func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
-	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value))
+func (p Processor) handleMessage(ctx context.Context, m kafka.Message, user_id string) error {
+	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s, user_id = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value), user_id)
 
 	var file lib.File
 	err := json.Unmarshal(m.Value, &file)
@@ -99,7 +102,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 		return err
 	}
 
-	err = p.db.InsertFile(ctx, file)
+	err = p.db.InsertFile(ctx, file, user_id)
 	if err != nil {
 		return err
 	}
@@ -113,3 +116,12 @@ func (p Processor) handleError(ctx context.Context, m kafka.Message, err error)
 		p.kafka_reader.CommitMessages(ctx, m)
 	}
 }
+
+func getHeaderValue(headers []kafka.Header, key string) ([]byte, error) {
+	for _, header := range headers {
+		if header.Key == key {
+			return header.Value, nil
+		}
+	}
+	return []byte{}, fmt.Errorf("Header %s not found", key)
+}
diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go
index 39e965e..4a55e48 100644
--- a/src/payload-processor/processor/processor_test.go
+++ b/src/payload-processor/processor/processor_test.go
@@ -20,7 +20,6 @@ func TestProcessMessage(t *testing.T) {
 
 	message := []byte(`
 	{
-		"user_id": "1",
 		"absolute_path": "/tmp/file.txt",
 		"contents": "hello world",
 		"timestamp": "2021-01-01T00:00:00Z"
@@ -30,13 +29,12 @@ func TestProcessMessage(t *testing.T) {
 	ctx := context.Background()
 
 	mockdb.EXPECT().InsertFile(ctx, lib.File{
-		User_id:       "1",
 		Absolute_path: "/tmp/file.txt",
 		Contents:      "hello world",
 		Timestamp:     time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
-	}).Return(nil)
+	}, "USER_ID").Return(nil)
 
-	err := processor.handleMessage(ctx, kafka.Message{Value: message})
+	err := processor.handleMessage(ctx, kafka.Message{Value: message}, "USER_ID")
 
 	require.NoError(t, err)
 }