about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-09 00:15:13 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-11 01:32:43 +0200
commit0a7be3e1ca6e651759fdb853e83e639f476e0c47 (patch)
tree0a53188b22edf971380290fefc3b6075a6a4e629
parentk8s: set default number of kafka topic partitions to 2 (diff)
downloadfs-tracer-backend-0a7be3e1ca6e651759fdb853e83e639f476e0c47.tar.gz
fs-tracer-backend-0a7be3e1ca6e651759fdb853e83e639f476e0c47.tar.bz2
fs-tracer-backend-0a7be3e1ca6e651759fdb853e83e639f476e0c47.zip
payloads-processor: connect to DB and insert dummy data
-rw-r--r--.gitignore2
-rw-r--r--MODULE.bazel3
-rw-r--r--TODO4
-rwxr-xr-xdeploy.sh3
-rw-r--r--flake.nix1
-rw-r--r--go.mod3
-rw-r--r--go.sum12
-rw-r--r--k8s/payload-processor/templates/deployment.yaml2
-rw-r--r--k8s/payload-processor/values.yaml3
-rw-r--r--src/payload-processor/cmd/BUILD.bazel6
-rw-r--r--src/payload-processor/cmd/main.go15
-rw-r--r--src/payload-processor/processor/BUILD.bazel17
-rw-r--r--src/payload-processor/processor/db.go28
-rw-r--r--src/payload-processor/processor/mock_db.go54
-rw-r--r--src/payload-processor/processor/processor.go14
-rw-r--r--src/payload-processor/processor/processor_test.go12
-rw-r--r--src/rest-api/cmd/BUILD.bazel4
-rw-r--r--src/rest-api/handler/BUILD.bazel2
18 files changed, 168 insertions, 17 deletions
diff --git a/.gitignore b/.gitignore
index 4fad39f..6930a80 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,3 +3,5 @@
 /bazel-*
 
 .helmsman-tmp/
+
+vendor/
diff --git a/MODULE.bazel b/MODULE.bazel
index c4a151b..da57d72 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -27,6 +27,9 @@ go_deps = use_extension("@bazel_gazelle//:extensions.bzl", "go_deps")
 go_deps.from_file(go_mod = "//:go.mod")
 use_repo(
     go_deps,
+    "com_github_jmoiron_sqlx",
+    "com_github_lib_pq",
     "com_github_segmentio_kafka_go",
     "com_github_stretchr_testify",
+    "org_uber_go_mock",
 )
diff --git a/TODO b/TODO
index 2589e41..b130e70 100644
--- a/TODO
+++ b/TODO
@@ -1,2 +1,4 @@
-cassandra
+better managing of secrets (maybe we shouldnt have environment variables)
+add db migrations
+add db tests
 nixos-k3s repo
diff --git a/deploy.sh b/deploy.sh
index 93323a7..d66667e 100755
--- a/deploy.sh
+++ b/deploy.sh
@@ -4,4 +4,5 @@ bazel run //src/rest-api/cmd:push -- --tag "rest-api-$(git rev-parse --short HEA
 helm upgrade rest-api --set image.tag="rest-api-$(git rev-parse --short HEAD)" k8s/rest-api
 
 bazel run //src/payload-processor/cmd:push -- --tag "payload-processor-$(git rev-parse --short HEAD)"
-helm upgrade payload-processor --set image.tag="payload-processor-$(git rev-parse --short HEAD)" k8s/payload-processor
+
+helm upgrade payload-processor --set image.tag="payload-processor-$(git rev-parse --short HEAD)" --set db.password=$DB_PASSWORD k8s/payload-processor
diff --git a/flake.nix b/flake.nix
index b1cdba6..0a52768 100644
--- a/flake.nix
+++ b/flake.nix
@@ -27,6 +27,7 @@
             buildozer
             go
             gopls
+            mockgen
           ];
         };
       }
diff --git a/go.mod b/go.mod
index 855f601..15d66fa 100644
--- a/go.mod
+++ b/go.mod
@@ -3,8 +3,11 @@ module github.com/Baitinq/fs-tracer-backend
 go 1.22.2
 
 require (
+	github.com/jmoiron/sqlx v1.4.0
+	github.com/lib/pq v1.10.9
 	github.com/segmentio/kafka-go v0.4.47
 	github.com/stretchr/testify v1.9.0
+	go.uber.org/mock v0.4.0
 )
 
 require (
diff --git a/go.sum b/go.sum
index ed6fcfd..dd78085 100644
--- a/go.sum
+++ b/go.sum
@@ -1,8 +1,18 @@
+filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
+filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
+github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
+github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o=
+github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY=
 github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
 github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
+github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
+github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
+github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
+github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
 github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
 github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -22,6 +32,8 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k
 github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
 github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
+go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
+go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
diff --git a/k8s/payload-processor/templates/deployment.yaml b/k8s/payload-processor/templates/deployment.yaml
index b9ab76e..6ab4c90 100644
--- a/k8s/payload-processor/templates/deployment.yaml
+++ b/k8s/payload-processor/templates/deployment.yaml
@@ -39,6 +39,8 @@ spec:
               secretKeyRef:
                 name: kafka-user-passwords
                 key: client-passwords
+          - name: DB_PASSWORD
+            value: {{ .Values.db.password }}
       {{- with .Values.nodeSelector }}
       nodeSelector:
         {{- toYaml . | nindent 8 }}
diff --git a/k8s/payload-processor/values.yaml b/k8s/payload-processor/values.yaml
index b6c065b..a245901 100644
--- a/k8s/payload-processor/values.yaml
+++ b/k8s/payload-processor/values.yaml
@@ -4,6 +4,9 @@
 
 replicaCount: 1
 
+db:
+  password: ""
+
 image:
   repository: docker.io/baitinq/fs-tracer
   pullPolicy: IfNotPresent
diff --git a/src/payload-processor/cmd/BUILD.bazel b/src/payload-processor/cmd/BUILD.bazel
index a11c89c..61b6d50 100644
--- a/src/payload-processor/cmd/BUILD.bazel
+++ b/src/payload-processor/cmd/BUILD.bazel
@@ -9,8 +9,10 @@ go_library(
     visibility = ["//visibility:private"],
     deps = [
         "//src/payload-processor/processor",
-        "@com_github_segmentio_kafka_go//:kafka-go",
-        "@com_github_segmentio_kafka_go//sasl/plain",
+        "//vendor/github.com/jmoiron/sqlx",
+        "//vendor/github.com/lib/pq",
+        "//vendor/github.com/segmentio/kafka-go",
+        "//vendor/github.com/segmentio/kafka-go/sasl/plain",
     ],
 )
 
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go
index 2604d8e..b86f5d1 100644
--- a/src/payload-processor/cmd/main.go
+++ b/src/payload-processor/cmd/main.go
@@ -1,11 +1,14 @@
 package main
 
 import (
+	"fmt"
 	"log"
 	"os"
 	"time"
 
 	"github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor"
+	"github.com/jmoiron/sqlx"
+	_ "github.com/lib/pq"
 	"github.com/segmentio/kafka-go"
 	"github.com/segmentio/kafka-go/sasl/plain"
 )
@@ -29,6 +32,16 @@ func main() {
 		GroupID:  "group-A",
 		MaxBytes: 10e6, // 10MB
 	})
-	processor := processor.NewProcessor(kafka_reader, 4)
+
+	db_password, ok := os.LookupEnv("DB_PASSWORD")
+	if !ok {
+		log.Fatal("DB_PASSWORD not set")
+	}
+	db, err := sqlx.Connect("postgres", fmt.Sprintf("postgres://postgres.slpoocycjgqsuoedhkbn:%s@aws-0-eu-central-1.pooler.supabase.com:5432/postgres", db_password))
+	if err != nil {
+		log.Fatal("cannot initalize db client", err)
+	}
+
+	processor := processor.NewProcessor(kafka_reader, db, 4)
 	processor.ProcessMessages()
 }
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel
index f957fd3..b01383a 100644
--- a/src/payload-processor/processor/BUILD.bazel
+++ b/src/payload-processor/processor/BUILD.bazel
@@ -2,10 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "processor",
-    srcs = ["processor.go"],
+    srcs = [
+        "db.go",
+        "mock_db.go",
+        "processor.go",
+    ],
     importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor",
     visibility = ["//visibility:public"],
-    deps = ["@com_github_segmentio_kafka_go//:kafka-go"],
+    deps = [
+        "//vendor/github.com/jmoiron/sqlx",
+        "//vendor/github.com/segmentio/kafka-go",
+        "//vendor/go.uber.org/mock/gomock",
+    ],
 )
 
 go_test(
@@ -13,7 +21,8 @@ go_test(
     srcs = ["processor_test.go"],
     embed = [":processor"],
     deps = [
-        "@com_github_segmentio_kafka_go//:kafka-go",
-        "@com_github_stretchr_testify//require",
+        "//vendor/github.com/segmentio/kafka-go",
+        "//vendor/github.com/stretchr/testify/require",
+        "//vendor/go.uber.org/mock/gomock",
     ],
 )
diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go
new file mode 100644
index 0000000..bad31d1
--- /dev/null
+++ b/src/payload-processor/processor/db.go
@@ -0,0 +1,28 @@
+package processor
+
+import (
+	"context"
+	"time"
+
+	"github.com/jmoiron/sqlx"
+)
+
+//go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE
+type DB interface {
+	TestInsert(ctx context.Context, message string) error
+}
+
+type DBImpl struct {
+	db *sqlx.DB
+}
+
+var _ DB = (*DBImpl)(nil)
+
+func NewDB(db *sqlx.DB) DB {
+	return &DBImpl{db: db}
+}
+
+func (db DBImpl) TestInsert(ctx context.Context, message string) error {
+	_, err := db.db.ExecContext(ctx, "INSERT INTO test (created_at, test) VALUES ($1, $2)", time.Now(), message)
+	return err
+}
diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go
new file mode 100644
index 0000000..235657d
--- /dev/null
+++ b/src/payload-processor/processor/mock_db.go
@@ -0,0 +1,54 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: db.go
+//
+// Generated by this command:
+//
+//	mockgen -source=db.go -package=processor -destination=mock_db.go
+//
+
+// Package processor is a generated GoMock package.
+package processor
+
+import (
+	context "context"
+	reflect "reflect"
+
+	gomock "go.uber.org/mock/gomock"
+)
+
+// MockDB is a mock of DB interface.
+type MockDB struct {
+	ctrl     *gomock.Controller
+	recorder *MockDBMockRecorder
+}
+
+// MockDBMockRecorder is the mock recorder for MockDB.
+type MockDBMockRecorder struct {
+	mock *MockDB
+}
+
+// NewMockDB creates a new mock instance.
+func NewMockDB(ctrl *gomock.Controller) *MockDB {
+	mock := &MockDB{ctrl: ctrl}
+	mock.recorder = &MockDBMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockDB) EXPECT() *MockDBMockRecorder {
+	return m.recorder
+}
+
+// TestInsert mocks base method.
+func (m *MockDB) TestInsert(ctx context.Context, message string) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "TestInsert", ctx, message)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// TestInsert indicates an expected call of TestInsert.
+func (mr *MockDBMockRecorder) TestInsert(ctx, message any) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TestInsert", reflect.TypeOf((*MockDB)(nil).TestInsert), ctx, message)
+}
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 4b03bd5..5f2cf25 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -10,18 +10,21 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/jmoiron/sqlx"
 	"github.com/segmentio/kafka-go"
 )
 
 type Processor struct {
 	kafka_reader *kafka.Reader
+	db           DB
 	concurrency  int
 }
 
-func NewProcessor(kafka_reader *kafka.Reader, concurrency int) Processor {
+func NewProcessor(kafka_reader *kafka.Reader, db *sqlx.DB, concurrency int) Processor {
 	log.Println("Created processor with concurrency: ", concurrency)
 	return Processor{
 		kafka_reader: kafka_reader,
+		db:           NewDB(db),
 		concurrency:  concurrency,
 	}
 }
@@ -63,7 +66,7 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			cancel()
 			break
 		}
-		err = p.handleMessage(m)
+		err = p.handleMessage(ctx, m)
 		if err != nil {
 			log.Println("failed to handle message:", err)
 			continue
@@ -72,7 +75,12 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 	}
 }
 
-func (p Processor) handleMessage(m kafka.Message) error {
+func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value))
+
+	err := p.db.TestInsert(ctx, string(m.Value))
+	if err != nil {
+		return err
+	}
 	return nil
 }
diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go
index ee2ef5f..5e954c4 100644
--- a/src/payload-processor/processor/processor_test.go
+++ b/src/payload-processor/processor/processor_test.go
@@ -1,18 +1,26 @@
 package processor
 
 import (
+	"context"
 	"testing"
 
 	"github.com/segmentio/kafka-go"
 	"github.com/stretchr/testify/require"
+	gomock "go.uber.org/mock/gomock"
 )
 
 func TestProcessMessage(t *testing.T) {
-	processor := Processor{}
+	ctrl := gomock.NewController(t)
+	mockdb := NewMockDB(ctrl)
+	processor := Processor{
+		db: mockdb,
+	}
 
 	message := []byte("test")
 
-	err := processor.handleMessage(kafka.Message{Value: message})
+	mockdb.EXPECT().TestInsert(gomock.Any(), string(message)).Return(nil)
+
+	err := processor.handleMessage(context.Background(), kafka.Message{Value: message})
 
 	require.NoError(t, err)
 }
diff --git a/src/rest-api/cmd/BUILD.bazel b/src/rest-api/cmd/BUILD.bazel
index 2a142c7..10554e8 100644
--- a/src/rest-api/cmd/BUILD.bazel
+++ b/src/rest-api/cmd/BUILD.bazel
@@ -9,8 +9,8 @@ go_library(
     visibility = ["//visibility:private"],
     deps = [
         "//src/rest-api/handler",
-        "@com_github_segmentio_kafka_go//:kafka-go",
-        "@com_github_segmentio_kafka_go//sasl/plain",
+        "//vendor/github.com/segmentio/kafka-go",
+        "//vendor/github.com/segmentio/kafka-go/sasl/plain",
     ],
 )
 
diff --git a/src/rest-api/handler/BUILD.bazel b/src/rest-api/handler/BUILD.bazel
index 89adc69..13e9a5d 100644
--- a/src/rest-api/handler/BUILD.bazel
+++ b/src/rest-api/handler/BUILD.bazel
@@ -5,5 +5,5 @@ go_library(
     srcs = ["handler.go"],
     importpath = "github.com/Baitinq/fs-tracer-backend/src/rest-api/handler",
     visibility = ["//visibility:public"],
-    deps = ["@com_github_segmentio_kafka_go//:kafka-go"],
+    deps = ["//vendor/github.com/segmentio/kafka-go"],
 )