diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-27 18:45:50 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-27 20:06:12 +0200 |
commit | 951675d62f631df7ccaca7c22de004d9576d3e91 (patch) | |
tree | 7fe41a57991009c4f5dbe2d5aa62f5667ae2255f | |
parent | misc: Create first Supabase migration (diff) | |
download | fs-tracer-backend-951675d62f631df7ccaca7c22de004d9576d3e91.tar.gz fs-tracer-backend-951675d62f631df7ccaca7c22de004d9576d3e91.tar.bz2 fs-tracer-backend-951675d62f631df7ccaca7c22de004d9576d3e91.zip |
payload-processor: insert to the file table
-rw-r--r-- | MODULE.bazel | 1 | ||||
-rw-r--r-- | go.mod | 1 | ||||
-rw-r--r-- | go.sum | 2 | ||||
-rw-r--r-- | k8s/payload-processor/BUILD.bazel | 2 | ||||
-rw-r--r-- | src/payload-processor/processor/BUILD.bazel | 2 | ||||
-rw-r--r-- | src/payload-processor/processor/db.go | 7 | ||||
-rw-r--r-- | src/payload-processor/processor/mock_db.go | 16 | ||||
-rw-r--r-- | src/payload-processor/processor/model.go | 10 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 21 | ||||
-rw-r--r-- | src/payload-processor/processor/processor_test.go | 17 | ||||
-rw-r--r-- | src/rest-api/handler/handler.go | 4 |
11 files changed, 64 insertions, 19 deletions
diff --git a/MODULE.bazel b/MODULE.bazel index b4180be..9cc9247 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -29,6 +29,7 @@ go_deps = use_extension("@bazel_gazelle//:extensions.bzl", "go_deps") go_deps.from_file(go_mod = "//:go.mod") use_repo( go_deps, + "com_github_google_uuid", "com_github_jmoiron_sqlx", "com_github_lib_pq", "com_github_segmentio_kafka_go", diff --git a/go.mod b/go.mod index 15d66fa..9448c17 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/Baitinq/fs-tracer-backend go 1.22.2 require ( + github.com/google/uuid v1.6.0 github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 github.com/segmentio/kafka-go v0.4.47 diff --git a/go.sum b/go.sum index dd78085..7f85511 100644 --- a/go.sum +++ b/go.sum @@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= diff --git a/k8s/payload-processor/BUILD.bazel b/k8s/payload-processor/BUILD.bazel index a2f1fbb..6aa422b 100644 --- a/k8s/payload-processor/BUILD.bazel +++ b/k8s/payload-processor/BUILD.bazel @@ -8,7 +8,7 @@ helm_chart( stamp = 1, substitutions = { "IMAGE_TAG": "payload-processor-{STABLE_GIT_SHA}", - "DB_PASSWORD": "{DB_PASSWORD}" + "DB_PASSWORD": "{DB_PASSWORD}", }, values = "values.yaml", ) diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index 9761c58..d8d8afe 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -5,11 +5,13 @@ go_library( srcs = [ "db.go", "mock_db.go", + "model.go", "processor.go", ], importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor", visibility = ["//visibility:public"], deps = [ + "@com_github_google_uuid//:uuid", "@com_github_jmoiron_sqlx//:sqlx", "@com_github_segmentio_kafka_go//:kafka-go", "@org_uber_go_mock//gomock", diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go index bad31d1..ce7d5bb 100644 --- a/src/payload-processor/processor/db.go +++ b/src/payload-processor/processor/db.go @@ -2,14 +2,13 @@ package processor import ( "context" - "time" "github.com/jmoiron/sqlx" ) //go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE type DB interface { - TestInsert(ctx context.Context, message string) error + InsertFile(ctx context.Context, file File) error } type DBImpl struct { @@ -22,7 +21,7 @@ func NewDB(db *sqlx.DB) DB { return &DBImpl{db: db} } -func (db DBImpl) TestInsert(ctx context.Context, message string) error { - _, err := db.db.ExecContext(ctx, "INSERT INTO test (created_at, test) VALUES ($1, $2)", time.Now(), message) +func (db DBImpl) InsertFile(ctx context.Context, file File) error { + _, err := db.db.NamedExecContext(ctx, "INSERT INTO private.file (user_id, absolute_path, contents, timestamp) VALUES (:user_id, :absolute_path, :contents, :timestamp)", file) return err } diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go index 235657d..53e5491 100644 --- a/src/payload-processor/processor/mock_db.go +++ b/src/payload-processor/processor/mock_db.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: db.go +// Source: src/payload-processor/processor/db.go // // Generated by this command: // -// mockgen -source=db.go -package=processor -destination=mock_db.go +// mockgen -source src/payload-processor/processor/db.go -package processor -destination=src/payload-processor/processor/mock_db.go // // Package processor is a generated GoMock package. @@ -39,16 +39,16 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder { return m.recorder } -// TestInsert mocks base method. -func (m *MockDB) TestInsert(ctx context.Context, message string) error { +// InsertFile mocks base method. +func (m *MockDB) InsertFile(ctx context.Context, file File) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TestInsert", ctx, message) + ret := m.ctrl.Call(m, "InsertFile", ctx, file) ret0, _ := ret[0].(error) return ret0 } -// TestInsert indicates an expected call of TestInsert. -func (mr *MockDBMockRecorder) TestInsert(ctx, message any) *gomock.Call { +// InsertFile indicates an expected call of InsertFile. +func (mr *MockDBMockRecorder) InsertFile(ctx, file any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TestInsert", reflect.TypeOf((*MockDB)(nil).TestInsert), ctx, message) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file) } diff --git a/src/payload-processor/processor/model.go b/src/payload-processor/processor/model.go new file mode 100644 index 0000000..4921f3d --- /dev/null +++ b/src/payload-processor/processor/model.go @@ -0,0 +1,10 @@ +package processor + +import "time" + +type File struct { + User_id string `db:"user_id"` + Absolute_path string `db:"absolute_path"` + Contents string `db:"contents"` + Timestamp time.Time `db:"timestamp"` +} diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 5f2cf25..072f0dd 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -2,6 +2,7 @@ package processor import ( "context" + "encoding/json" "fmt" "log" "os" @@ -10,6 +11,7 @@ import ( "syscall" "time" + "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/segmentio/kafka-go" ) @@ -66,6 +68,17 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { cancel() break } + + // TODO: Remove after testing + if string(m.Value) == "" { + m.Value = []byte(fmt.Sprintf(`{ + "user_id": "%s", + "absolute_path": "/home/user/file.txt", + "contents": "Hello, World!", + "timestamp": "%s" + }`, uuid.New(), time.Now().Format(time.RFC3339))) + } + err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) @@ -78,7 +91,13 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value)) - err := p.db.TestInsert(ctx, string(m.Value)) + var file File + err := json.Unmarshal(m.Value, &file) + if err != nil { + return err + } + + err = p.db.InsertFile(ctx, file) if err != nil { return err } diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go index 5fa34a8..d3778de 100644 --- a/src/payload-processor/processor/processor_test.go +++ b/src/payload-processor/processor/processor_test.go @@ -3,6 +3,7 @@ package processor import ( "context" "testing" + "time" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" @@ -16,11 +17,23 @@ func TestProcessMessage(t *testing.T) { db: mockdb, } - message := []byte("test") + message := []byte(` + { + "user_id": "1", + "absolute_path": "/tmp/file.txt", + "contents": "hello world", + "timestamp": "2021-01-01T00:00:00Z" + } + `) ctx := context.Background() - mockdb.EXPECT().TestInsert(ctx, string(message)).Return(nil) + mockdb.EXPECT().InsertFile(ctx, File{ + User_id: "1", + Absolute_path: "/tmp/file.txt", + Contents: "hello world", + Timestamp: time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC), + }).Return(nil) err := processor.handleMessage(ctx, kafka.Message{Value: message}) diff --git a/src/rest-api/handler/handler.go b/src/rest-api/handler/handler.go index 3454486..9e44612 100644 --- a/src/rest-api/handler/handler.go +++ b/src/rest-api/handler/handler.go @@ -27,14 +27,12 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Fatal(err) } - body := fmt.Sprint("Hello World!", r.RemoteAddr, string(bytes)) - ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() err = h.kafka_writer.WriteMessages(ctx, kafka.Message{ Key: []byte("key"), //TODO:This routes to a partition. We should probably route by agent UUID TODO: wont this negate having multiple topics - Value: []byte(body), + Value: bytes, }) if err != nil { log.Fatal(err) |