diff options
Diffstat (limited to 'src/payload-processor/processor')
-rw-r--r-- | src/payload-processor/processor/BUILD.bazel | 2 | ||||
-rw-r--r-- | src/payload-processor/processor/db.go | 7 | ||||
-rw-r--r-- | src/payload-processor/processor/mock_db.go | 16 | ||||
-rw-r--r-- | src/payload-processor/processor/model.go | 10 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 21 | ||||
-rw-r--r-- | src/payload-processor/processor/processor_test.go | 17 |
6 files changed, 58 insertions, 15 deletions
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index 9761c58..d8d8afe 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -5,11 +5,13 @@ go_library( srcs = [ "db.go", "mock_db.go", + "model.go", "processor.go", ], importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor", visibility = ["//visibility:public"], deps = [ + "@com_github_google_uuid//:uuid", "@com_github_jmoiron_sqlx//:sqlx", "@com_github_segmentio_kafka_go//:kafka-go", "@org_uber_go_mock//gomock", diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go index bad31d1..ce7d5bb 100644 --- a/src/payload-processor/processor/db.go +++ b/src/payload-processor/processor/db.go @@ -2,14 +2,13 @@ package processor import ( "context" - "time" "github.com/jmoiron/sqlx" ) //go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE type DB interface { - TestInsert(ctx context.Context, message string) error + InsertFile(ctx context.Context, file File) error } type DBImpl struct { @@ -22,7 +21,7 @@ func NewDB(db *sqlx.DB) DB { return &DBImpl{db: db} } -func (db DBImpl) TestInsert(ctx context.Context, message string) error { - _, err := db.db.ExecContext(ctx, "INSERT INTO test (created_at, test) VALUES ($1, $2)", time.Now(), message) +func (db DBImpl) InsertFile(ctx context.Context, file File) error { + _, err := db.db.NamedExecContext(ctx, "INSERT INTO private.file (user_id, absolute_path, contents, timestamp) VALUES (:user_id, :absolute_path, :contents, :timestamp)", file) return err } diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go index 235657d..53e5491 100644 --- a/src/payload-processor/processor/mock_db.go +++ b/src/payload-processor/processor/mock_db.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: db.go +// Source: src/payload-processor/processor/db.go // // Generated by this command: // -// mockgen -source=db.go -package=processor -destination=mock_db.go +// mockgen -source src/payload-processor/processor/db.go -package processor -destination=src/payload-processor/processor/mock_db.go // // Package processor is a generated GoMock package. @@ -39,16 +39,16 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder { return m.recorder } -// TestInsert mocks base method. -func (m *MockDB) TestInsert(ctx context.Context, message string) error { +// InsertFile mocks base method. +func (m *MockDB) InsertFile(ctx context.Context, file File) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TestInsert", ctx, message) + ret := m.ctrl.Call(m, "InsertFile", ctx, file) ret0, _ := ret[0].(error) return ret0 } -// TestInsert indicates an expected call of TestInsert. -func (mr *MockDBMockRecorder) TestInsert(ctx, message any) *gomock.Call { +// InsertFile indicates an expected call of InsertFile. +func (mr *MockDBMockRecorder) InsertFile(ctx, file any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TestInsert", reflect.TypeOf((*MockDB)(nil).TestInsert), ctx, message) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file) } diff --git a/src/payload-processor/processor/model.go b/src/payload-processor/processor/model.go new file mode 100644 index 0000000..4921f3d --- /dev/null +++ b/src/payload-processor/processor/model.go @@ -0,0 +1,10 @@ +package processor + +import "time" + +type File struct { + User_id string `db:"user_id"` + Absolute_path string `db:"absolute_path"` + Contents string `db:"contents"` + Timestamp time.Time `db:"timestamp"` +} diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 5f2cf25..072f0dd 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -2,6 +2,7 @@ package processor import ( "context" + "encoding/json" "fmt" "log" "os" @@ -10,6 +11,7 @@ import ( "syscall" "time" + "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/segmentio/kafka-go" ) @@ -66,6 +68,17 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { cancel() break } + + // TODO: Remove after testing + if string(m.Value) == "" { + m.Value = []byte(fmt.Sprintf(`{ + "user_id": "%s", + "absolute_path": "/home/user/file.txt", + "contents": "Hello, World!", + "timestamp": "%s" + }`, uuid.New(), time.Now().Format(time.RFC3339))) + } + err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) @@ -78,7 +91,13 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value)) - err := p.db.TestInsert(ctx, string(m.Value)) + var file File + err := json.Unmarshal(m.Value, &file) + if err != nil { + return err + } + + err = p.db.InsertFile(ctx, file) if err != nil { return err } diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go index 5fa34a8..d3778de 100644 --- a/src/payload-processor/processor/processor_test.go +++ b/src/payload-processor/processor/processor_test.go @@ -3,6 +3,7 @@ package processor import ( "context" "testing" + "time" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" @@ -16,11 +17,23 @@ func TestProcessMessage(t *testing.T) { db: mockdb, } - message := []byte("test") + message := []byte(` + { + "user_id": "1", + "absolute_path": "/tmp/file.txt", + "contents": "hello world", + "timestamp": "2021-01-01T00:00:00Z" + } + `) ctx := context.Background() - mockdb.EXPECT().TestInsert(ctx, string(message)).Return(nil) + mockdb.EXPECT().InsertFile(ctx, File{ + User_id: "1", + Absolute_path: "/tmp/file.txt", + Contents: "hello world", + Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + }).Return(nil) err := processor.handleMessage(ctx, kafka.Message{Value: message}) |