about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-27 18:45:50 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-27 20:06:12 +0200
commit951675d62f631df7ccaca7c22de004d9576d3e91 (patch)
tree7fe41a57991009c4f5dbe2d5aa62f5667ae2255f
parentmisc: Create first Supabase migration (diff)
downloadfs-tracer-backend-951675d62f631df7ccaca7c22de004d9576d3e91.tar.gz
fs-tracer-backend-951675d62f631df7ccaca7c22de004d9576d3e91.tar.bz2
fs-tracer-backend-951675d62f631df7ccaca7c22de004d9576d3e91.zip
payload-processor: insert to the file table
-rw-r--r--MODULE.bazel1
-rw-r--r--go.mod1
-rw-r--r--go.sum2
-rw-r--r--k8s/payload-processor/BUILD.bazel2
-rw-r--r--src/payload-processor/processor/BUILD.bazel2
-rw-r--r--src/payload-processor/processor/db.go7
-rw-r--r--src/payload-processor/processor/mock_db.go16
-rw-r--r--src/payload-processor/processor/model.go10
-rw-r--r--src/payload-processor/processor/processor.go21
-rw-r--r--src/payload-processor/processor/processor_test.go17
-rw-r--r--src/rest-api/handler/handler.go4
11 files changed, 64 insertions, 19 deletions
diff --git a/MODULE.bazel b/MODULE.bazel
index b4180be..9cc9247 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -29,6 +29,7 @@ go_deps = use_extension("@bazel_gazelle//:extensions.bzl", "go_deps")
 go_deps.from_file(go_mod = "//:go.mod")
 use_repo(
     go_deps,
+    "com_github_google_uuid",
     "com_github_jmoiron_sqlx",
     "com_github_lib_pq",
     "com_github_segmentio_kafka_go",
diff --git a/go.mod b/go.mod
index 15d66fa..9448c17 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module github.com/Baitinq/fs-tracer-backend
 go 1.22.2
 
 require (
+	github.com/google/uuid v1.6.0
 	github.com/jmoiron/sqlx v1.4.0
 	github.com/lib/pq v1.10.9
 	github.com/segmentio/kafka-go v0.4.47
diff --git a/go.sum b/go.sum
index dd78085..7f85511 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
 github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
+github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
+github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
 github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
 github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
diff --git a/k8s/payload-processor/BUILD.bazel b/k8s/payload-processor/BUILD.bazel
index a2f1fbb..6aa422b 100644
--- a/k8s/payload-processor/BUILD.bazel
+++ b/k8s/payload-processor/BUILD.bazel
@@ -8,7 +8,7 @@ helm_chart(
     stamp = 1,
     substitutions = {
         "IMAGE_TAG": "payload-processor-{STABLE_GIT_SHA}",
-        "DB_PASSWORD": "{DB_PASSWORD}"
+        "DB_PASSWORD": "{DB_PASSWORD}",
     },
     values = "values.yaml",
 )
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel
index 9761c58..d8d8afe 100644
--- a/src/payload-processor/processor/BUILD.bazel
+++ b/src/payload-processor/processor/BUILD.bazel
@@ -5,11 +5,13 @@ go_library(
     srcs = [
         "db.go",
         "mock_db.go",
+        "model.go",
         "processor.go",
     ],
     importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor",
     visibility = ["//visibility:public"],
     deps = [
+        "@com_github_google_uuid//:uuid",
         "@com_github_jmoiron_sqlx//:sqlx",
         "@com_github_segmentio_kafka_go//:kafka-go",
         "@org_uber_go_mock//gomock",
diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go
index bad31d1..ce7d5bb 100644
--- a/src/payload-processor/processor/db.go
+++ b/src/payload-processor/processor/db.go
@@ -2,14 +2,13 @@ package processor
 
 import (
 	"context"
-	"time"
 
 	"github.com/jmoiron/sqlx"
 )
 
 //go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE
 type DB interface {
-	TestInsert(ctx context.Context, message string) error
+	InsertFile(ctx context.Context, file File) error
 }
 
 type DBImpl struct {
@@ -22,7 +21,7 @@ func NewDB(db *sqlx.DB) DB {
 	return &DBImpl{db: db}
 }
 
-func (db DBImpl) TestInsert(ctx context.Context, message string) error {
-	_, err := db.db.ExecContext(ctx, "INSERT INTO test (created_at, test) VALUES ($1, $2)", time.Now(), message)
+func (db DBImpl) InsertFile(ctx context.Context, file File) error {
+	_, err := db.db.NamedExecContext(ctx, "INSERT INTO private.file (user_id, absolute_path, contents, timestamp) VALUES (:user_id, :absolute_path, :contents, :timestamp)", file)
 	return err
 }
diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go
index 235657d..53e5491 100644
--- a/src/payload-processor/processor/mock_db.go
+++ b/src/payload-processor/processor/mock_db.go
@@ -1,9 +1,9 @@
 // Code generated by MockGen. DO NOT EDIT.
-// Source: db.go
+// Source: src/payload-processor/processor/db.go
 //
 // Generated by this command:
 //
-//	mockgen -source=db.go -package=processor -destination=mock_db.go
+//	mockgen -source src/payload-processor/processor/db.go -package processor -destination=src/payload-processor/processor/mock_db.go
 //
 
 // Package processor is a generated GoMock package.
@@ -39,16 +39,16 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder {
 	return m.recorder
 }
 
-// TestInsert mocks base method.
-func (m *MockDB) TestInsert(ctx context.Context, message string) error {
+// InsertFile mocks base method.
+func (m *MockDB) InsertFile(ctx context.Context, file File) error {
 	m.ctrl.T.Helper()
-	ret := m.ctrl.Call(m, "TestInsert", ctx, message)
+	ret := m.ctrl.Call(m, "InsertFile", ctx, file)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
-// TestInsert indicates an expected call of TestInsert.
-func (mr *MockDBMockRecorder) TestInsert(ctx, message any) *gomock.Call {
+// InsertFile indicates an expected call of InsertFile.
+func (mr *MockDBMockRecorder) InsertFile(ctx, file any) *gomock.Call {
 	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TestInsert", reflect.TypeOf((*MockDB)(nil).TestInsert), ctx, message)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file)
 }
diff --git a/src/payload-processor/processor/model.go b/src/payload-processor/processor/model.go
new file mode 100644
index 0000000..4921f3d
--- /dev/null
+++ b/src/payload-processor/processor/model.go
@@ -0,0 +1,10 @@
+package processor
+
+import "time"
+
+type File struct {
+	User_id       string    `db:"user_id"`
+	Absolute_path string    `db:"absolute_path"`
+	Contents      string    `db:"contents"`
+	Timestamp     time.Time `db:"timestamp"`
+}
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 5f2cf25..072f0dd 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -2,6 +2,7 @@ package processor
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"log"
 	"os"
@@ -10,6 +11,7 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/google/uuid"
 	"github.com/jmoiron/sqlx"
 	"github.com/segmentio/kafka-go"
 )
@@ -66,6 +68,17 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			cancel()
 			break
 		}
+
+		// TODO: Remove after testing
+		if string(m.Value) == "" {
+			m.Value = []byte(fmt.Sprintf(`{
+				"user_id": "%s",
+				"absolute_path": "/home/user/file.txt",
+				"contents": "Hello, World!",
+				"timestamp": "%s"
+			}`, uuid.New(), time.Now().Format(time.RFC3339)))
+		}
+
 		err = p.handleMessage(ctx, m)
 		if err != nil {
 			log.Println("failed to handle message:", err)
@@ -78,7 +91,13 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value))
 
-	err := p.db.TestInsert(ctx, string(m.Value))
+	var file File
+	err := json.Unmarshal(m.Value, &file)
+	if err != nil {
+		return err
+	}
+
+	err = p.db.InsertFile(ctx, file)
 	if err != nil {
 		return err
 	}
diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go
index 5fa34a8..d3778de 100644
--- a/src/payload-processor/processor/processor_test.go
+++ b/src/payload-processor/processor/processor_test.go
@@ -3,6 +3,7 @@ package processor
 import (
 	"context"
 	"testing"
+	"time"
 
 	"github.com/segmentio/kafka-go"
 	"github.com/stretchr/testify/require"
@@ -16,11 +17,23 @@ func TestProcessMessage(t *testing.T) {
 		db: mockdb,
 	}
 
-	message := []byte("test")
+	message := []byte(`
+	{
+		"user_id": "1",
+		"absolute_path": "/tmp/file.txt",
+		"contents": "hello world",
+		"timestamp": "2021-01-01T00:00:00Z"
+	}
+	`)
 
 	ctx := context.Background()
 
-	mockdb.EXPECT().TestInsert(ctx, string(message)).Return(nil)
+	mockdb.EXPECT().InsertFile(ctx, File{
+		User_id:       "1",
+		Absolute_path: "/tmp/file.txt",
+		Contents:      "hello world",
+		Timestamp:     time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
+	}).Return(nil)
 
 	err := processor.handleMessage(ctx, kafka.Message{Value: message})
 
diff --git a/src/rest-api/handler/handler.go b/src/rest-api/handler/handler.go
index 3454486..9e44612 100644
--- a/src/rest-api/handler/handler.go
+++ b/src/rest-api/handler/handler.go
@@ -27,14 +27,12 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 		log.Fatal(err)
 	}
 
-	body := fmt.Sprint("Hello World!", r.RemoteAddr, string(bytes))
-
 	ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
 	defer cancel()
 
 	err = h.kafka_writer.WriteMessages(ctx, kafka.Message{
 		Key:   []byte("key"), //TODO:This routes to a partition. We should probably route by agent UUID TODO: wont this negate having multiple topics
-		Value: []byte(body),
+		Value: bytes,
 	})
 	if err != nil {
 		log.Fatal(err)