diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-09 00:15:13 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-11 01:32:43 +0200 |
commit | 0a7be3e1ca6e651759fdb853e83e639f476e0c47 (patch) | |
tree | 0a53188b22edf971380290fefc3b6075a6a4e629 | |
parent | k8s: set default number of kafka topic partitions to 2 (diff) | |
download | fs-tracer-backend-0a7be3e1ca6e651759fdb853e83e639f476e0c47.tar.gz fs-tracer-backend-0a7be3e1ca6e651759fdb853e83e639f476e0c47.tar.bz2 fs-tracer-backend-0a7be3e1ca6e651759fdb853e83e639f476e0c47.zip |
payloads-processor: connect to DB and insert dummy data
-rw-r--r-- | .gitignore | 2 | ||||
-rw-r--r-- | MODULE.bazel | 3 | ||||
-rw-r--r-- | TODO | 4 | ||||
-rwxr-xr-x | deploy.sh | 3 | ||||
-rw-r--r-- | flake.nix | 1 | ||||
-rw-r--r-- | go.mod | 3 | ||||
-rw-r--r-- | go.sum | 12 | ||||
-rw-r--r-- | k8s/payload-processor/templates/deployment.yaml | 2 | ||||
-rw-r--r-- | k8s/payload-processor/values.yaml | 3 | ||||
-rw-r--r-- | src/payload-processor/cmd/BUILD.bazel | 6 | ||||
-rw-r--r-- | src/payload-processor/cmd/main.go | 15 | ||||
-rw-r--r-- | src/payload-processor/processor/BUILD.bazel | 17 | ||||
-rw-r--r-- | src/payload-processor/processor/db.go | 28 | ||||
-rw-r--r-- | src/payload-processor/processor/mock_db.go | 54 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 14 | ||||
-rw-r--r-- | src/payload-processor/processor/processor_test.go | 12 | ||||
-rw-r--r-- | src/rest-api/cmd/BUILD.bazel | 4 | ||||
-rw-r--r-- | src/rest-api/handler/BUILD.bazel | 2 |
18 files changed, 168 insertions, 17 deletions
diff --git a/.gitignore b/.gitignore index 4fad39f..6930a80 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ /bazel-* .helmsman-tmp/ + +vendor/ diff --git a/MODULE.bazel b/MODULE.bazel index c4a151b..da57d72 100644 --- a/MODULE.bazel +++ b/MODULE.bazel @@ -27,6 +27,9 @@ go_deps = use_extension("@bazel_gazelle//:extensions.bzl", "go_deps") go_deps.from_file(go_mod = "//:go.mod") use_repo( go_deps, + "com_github_jmoiron_sqlx", + "com_github_lib_pq", "com_github_segmentio_kafka_go", "com_github_stretchr_testify", + "org_uber_go_mock", ) diff --git a/TODO b/TODO index 2589e41..b130e70 100644 --- a/TODO +++ b/TODO @@ -1,2 +1,4 @@ -cassandra +better managing of secrets (maybe we shouldnt have environment variables) +add db migrations +add db tests nixos-k3s repo diff --git a/deploy.sh b/deploy.sh index 93323a7..d66667e 100755 --- a/deploy.sh +++ b/deploy.sh @@ -4,4 +4,5 @@ bazel run //src/rest-api/cmd:push -- --tag "rest-api-$(git rev-parse --short HEA helm upgrade rest-api --set image.tag="rest-api-$(git rev-parse --short HEAD)" k8s/rest-api bazel run //src/payload-processor/cmd:push -- --tag "payload-processor-$(git rev-parse --short HEAD)" -helm upgrade payload-processor --set image.tag="payload-processor-$(git rev-parse --short HEAD)" k8s/payload-processor + +helm upgrade payload-processor --set image.tag="payload-processor-$(git rev-parse --short HEAD)" --set db.password=$DB_PASSWORD k8s/payload-processor diff --git a/flake.nix b/flake.nix index b1cdba6..0a52768 100644 --- a/flake.nix +++ b/flake.nix @@ -27,6 +27,7 @@ buildozer go gopls + mockgen ]; }; } diff --git a/go.mod b/go.mod index 855f601..15d66fa 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,11 @@ module github.com/Baitinq/fs-tracer-backend go 1.22.2 require ( + github.com/jmoiron/sqlx v1.4.0 + github.com/lib/pq v1.10.9 github.com/segmentio/kafka-go v0.4.47 github.com/stretchr/testify v1.9.0 + go.uber.org/mock v0.4.0 ) require ( diff --git a/go.sum b/go.sum index ed6fcfd..dd78085 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,18 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -22,6 +32,8 @@ github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3k github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= +go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= diff --git a/k8s/payload-processor/templates/deployment.yaml b/k8s/payload-processor/templates/deployment.yaml index b9ab76e..6ab4c90 100644 --- a/k8s/payload-processor/templates/deployment.yaml +++ b/k8s/payload-processor/templates/deployment.yaml @@ -39,6 +39,8 @@ spec: secretKeyRef: name: kafka-user-passwords key: client-passwords + - name: DB_PASSWORD + value: {{ .Values.db.password }} {{- with .Values.nodeSelector }} nodeSelector: {{- toYaml . | nindent 8 }} diff --git a/k8s/payload-processor/values.yaml b/k8s/payload-processor/values.yaml index b6c065b..a245901 100644 --- a/k8s/payload-processor/values.yaml +++ b/k8s/payload-processor/values.yaml @@ -4,6 +4,9 @@ replicaCount: 1 +db: + password: "" + image: repository: docker.io/baitinq/fs-tracer pullPolicy: IfNotPresent diff --git a/src/payload-processor/cmd/BUILD.bazel b/src/payload-processor/cmd/BUILD.bazel index a11c89c..61b6d50 100644 --- a/src/payload-processor/cmd/BUILD.bazel +++ b/src/payload-processor/cmd/BUILD.bazel @@ -9,8 +9,10 @@ go_library( visibility = ["//visibility:private"], deps = [ "//src/payload-processor/processor", - "@com_github_segmentio_kafka_go//:kafka-go", - "@com_github_segmentio_kafka_go//sasl/plain", + "//vendor/github.com/jmoiron/sqlx", + "//vendor/github.com/lib/pq", + "//vendor/github.com/segmentio/kafka-go", + "//vendor/github.com/segmentio/kafka-go/sasl/plain", ], ) diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 2604d8e..b86f5d1 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -1,11 +1,14 @@ package main import ( + "fmt" "log" "os" "time" "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/plain" ) @@ -29,6 +32,16 @@ func main() { GroupID: "group-A", MaxBytes: 10e6, // 10MB }) - processor := processor.NewProcessor(kafka_reader, 4) + + db_password, ok := os.LookupEnv("DB_PASSWORD") + if !ok { + log.Fatal("DB_PASSWORD not set") + } + db, err := sqlx.Connect("postgres", fmt.Sprintf("postgres://postgres.slpoocycjgqsuoedhkbn:%s@aws-0-eu-central-1.pooler.supabase.com:5432/postgres", db_password)) + if err != nil { + log.Fatal("cannot initalize db client", err) + } + + processor := processor.NewProcessor(kafka_reader, db, 4) processor.ProcessMessages() } diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index f957fd3..b01383a 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -2,10 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "processor", - srcs = ["processor.go"], + srcs = [ + "db.go", + "mock_db.go", + "processor.go", + ], importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor", visibility = ["//visibility:public"], - deps = ["@com_github_segmentio_kafka_go//:kafka-go"], + deps = [ + "//vendor/github.com/jmoiron/sqlx", + "//vendor/github.com/segmentio/kafka-go", + "//vendor/go.uber.org/mock/gomock", + ], ) go_test( @@ -13,7 +21,8 @@ go_test( srcs = ["processor_test.go"], embed = [":processor"], deps = [ - "@com_github_segmentio_kafka_go//:kafka-go", - "@com_github_stretchr_testify//require", + "//vendor/github.com/segmentio/kafka-go", + "//vendor/github.com/stretchr/testify/require", + "//vendor/go.uber.org/mock/gomock", ], ) diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go new file mode 100644 index 0000000..bad31d1 --- /dev/null +++ b/src/payload-processor/processor/db.go @@ -0,0 +1,28 @@ +package processor + +import ( + "context" + "time" + + "github.com/jmoiron/sqlx" +) + +//go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE +type DB interface { + TestInsert(ctx context.Context, message string) error +} + +type DBImpl struct { + db *sqlx.DB +} + +var _ DB = (*DBImpl)(nil) + +func NewDB(db *sqlx.DB) DB { + return &DBImpl{db: db} +} + +func (db DBImpl) TestInsert(ctx context.Context, message string) error { + _, err := db.db.ExecContext(ctx, "INSERT INTO test (created_at, test) VALUES ($1, $2)", time.Now(), message) + return err +} diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go new file mode 100644 index 0000000..235657d --- /dev/null +++ b/src/payload-processor/processor/mock_db.go @@ -0,0 +1,54 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: db.go +// +// Generated by this command: +// +// mockgen -source=db.go -package=processor -destination=mock_db.go +// + +// Package processor is a generated GoMock package. +package processor + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockDB is a mock of DB interface. +type MockDB struct { + ctrl *gomock.Controller + recorder *MockDBMockRecorder +} + +// MockDBMockRecorder is the mock recorder for MockDB. +type MockDBMockRecorder struct { + mock *MockDB +} + +// NewMockDB creates a new mock instance. +func NewMockDB(ctrl *gomock.Controller) *MockDB { + mock := &MockDB{ctrl: ctrl} + mock.recorder = &MockDBMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDB) EXPECT() *MockDBMockRecorder { + return m.recorder +} + +// TestInsert mocks base method. +func (m *MockDB) TestInsert(ctx context.Context, message string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TestInsert", ctx, message) + ret0, _ := ret[0].(error) + return ret0 +} + +// TestInsert indicates an expected call of TestInsert. +func (mr *MockDBMockRecorder) TestInsert(ctx, message any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TestInsert", reflect.TypeOf((*MockDB)(nil).TestInsert), ctx, message) +} diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 4b03bd5..5f2cf25 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -10,18 +10,21 @@ import ( "syscall" "time" + "github.com/jmoiron/sqlx" "github.com/segmentio/kafka-go" ) type Processor struct { kafka_reader *kafka.Reader + db DB concurrency int } -func NewProcessor(kafka_reader *kafka.Reader, concurrency int) Processor { +func NewProcessor(kafka_reader *kafka.Reader, db *sqlx.DB, concurrency int) Processor { log.Println("Created processor with concurrency: ", concurrency) return Processor{ kafka_reader: kafka_reader, + db: NewDB(db), concurrency: concurrency, } } @@ -63,7 +66,7 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { cancel() break } - err = p.handleMessage(m) + err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) continue @@ -72,7 +75,12 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { } } -func (p Processor) handleMessage(m kafka.Message) error { +func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value)) + + err := p.db.TestInsert(ctx, string(m.Value)) + if err != nil { + return err + } return nil } diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go index ee2ef5f..5e954c4 100644 --- a/src/payload-processor/processor/processor_test.go +++ b/src/payload-processor/processor/processor_test.go @@ -1,18 +1,26 @@ package processor import ( + "context" "testing" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" ) func TestProcessMessage(t *testing.T) { - processor := Processor{} + ctrl := gomock.NewController(t) + mockdb := NewMockDB(ctrl) + processor := Processor{ + db: mockdb, + } message := []byte("test") - err := processor.handleMessage(kafka.Message{Value: message}) + mockdb.EXPECT().TestInsert(gomock.Any(), string(message)).Return(nil) + + err := processor.handleMessage(context.Background(), kafka.Message{Value: message}) require.NoError(t, err) } diff --git a/src/rest-api/cmd/BUILD.bazel b/src/rest-api/cmd/BUILD.bazel index 2a142c7..10554e8 100644 --- a/src/rest-api/cmd/BUILD.bazel +++ b/src/rest-api/cmd/BUILD.bazel @@ -9,8 +9,8 @@ go_library( visibility = ["//visibility:private"], deps = [ "//src/rest-api/handler", - "@com_github_segmentio_kafka_go//:kafka-go", - "@com_github_segmentio_kafka_go//sasl/plain", + "//vendor/github.com/segmentio/kafka-go", + "//vendor/github.com/segmentio/kafka-go/sasl/plain", ], ) diff --git a/src/rest-api/handler/BUILD.bazel b/src/rest-api/handler/BUILD.bazel index 89adc69..13e9a5d 100644 --- a/src/rest-api/handler/BUILD.bazel +++ b/src/rest-api/handler/BUILD.bazel @@ -5,5 +5,5 @@ go_library( srcs = ["handler.go"], importpath = "github.com/Baitinq/fs-tracer-backend/src/rest-api/handler", visibility = ["//visibility:public"], - deps = ["@com_github_segmentio_kafka_go//:kafka-go"], + deps = ["//vendor/github.com/segmentio/kafka-go"], ) |