about summary refs log tree commit diff
path: root/src/payload-processor
diff options
context:
space:
mode:
Diffstat (limited to 'src/payload-processor')
-rw-r--r--src/payload-processor/cmd/BUILD.bazel6
-rw-r--r--src/payload-processor/cmd/main.go15
-rw-r--r--src/payload-processor/processor/BUILD.bazel17
-rw-r--r--src/payload-processor/processor/db.go28
-rw-r--r--src/payload-processor/processor/mock_db.go54
-rw-r--r--src/payload-processor/processor/processor.go14
-rw-r--r--src/payload-processor/processor/processor_test.go12
7 files changed, 134 insertions, 12 deletions
diff --git a/src/payload-processor/cmd/BUILD.bazel b/src/payload-processor/cmd/BUILD.bazel
index a11c89c..61b6d50 100644
--- a/src/payload-processor/cmd/BUILD.bazel
+++ b/src/payload-processor/cmd/BUILD.bazel
@@ -9,8 +9,10 @@ go_library(
     visibility = ["//visibility:private"],
     deps = [
         "//src/payload-processor/processor",
-        "@com_github_segmentio_kafka_go//:kafka-go",
-        "@com_github_segmentio_kafka_go//sasl/plain",
+        "//vendor/github.com/jmoiron/sqlx",
+        "//vendor/github.com/lib/pq",
+        "//vendor/github.com/segmentio/kafka-go",
+        "//vendor/github.com/segmentio/kafka-go/sasl/plain",
     ],
 )
 
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go
index 2604d8e..b86f5d1 100644
--- a/src/payload-processor/cmd/main.go
+++ b/src/payload-processor/cmd/main.go
@@ -1,11 +1,14 @@
 package main
 
 import (
+	"fmt"
 	"log"
 	"os"
 	"time"
 
 	"github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor"
+	"github.com/jmoiron/sqlx"
+	_ "github.com/lib/pq"
 	"github.com/segmentio/kafka-go"
 	"github.com/segmentio/kafka-go/sasl/plain"
 )
@@ -29,6 +32,16 @@ func main() {
 		GroupID:  "group-A",
 		MaxBytes: 10e6, // 10MB
 	})
-	processor := processor.NewProcessor(kafka_reader, 4)
+
+	db_password, ok := os.LookupEnv("DB_PASSWORD")
+	if !ok {
+		log.Fatal("DB_PASSWORD not set")
+	}
+	db, err := sqlx.Connect("postgres", fmt.Sprintf("postgres://postgres.slpoocycjgqsuoedhkbn:%s@aws-0-eu-central-1.pooler.supabase.com:5432/postgres", db_password))
+	if err != nil {
+		log.Fatal("cannot initalize db client", err)
+	}
+
+	processor := processor.NewProcessor(kafka_reader, db, 4)
 	processor.ProcessMessages()
 }
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel
index f957fd3..b01383a 100644
--- a/src/payload-processor/processor/BUILD.bazel
+++ b/src/payload-processor/processor/BUILD.bazel
@@ -2,10 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "processor",
-    srcs = ["processor.go"],
+    srcs = [
+        "db.go",
+        "mock_db.go",
+        "processor.go",
+    ],
     importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor",
     visibility = ["//visibility:public"],
-    deps = ["@com_github_segmentio_kafka_go//:kafka-go"],
+    deps = [
+        "//vendor/github.com/jmoiron/sqlx",
+        "//vendor/github.com/segmentio/kafka-go",
+        "//vendor/go.uber.org/mock/gomock",
+    ],
 )
 
 go_test(
@@ -13,7 +21,8 @@ go_test(
     srcs = ["processor_test.go"],
     embed = [":processor"],
     deps = [
-        "@com_github_segmentio_kafka_go//:kafka-go",
-        "@com_github_stretchr_testify//require",
+        "//vendor/github.com/segmentio/kafka-go",
+        "//vendor/github.com/stretchr/testify/require",
+        "//vendor/go.uber.org/mock/gomock",
     ],
 )
diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go
new file mode 100644
index 0000000..bad31d1
--- /dev/null
+++ b/src/payload-processor/processor/db.go
@@ -0,0 +1,28 @@
+package processor
+
+import (
+	"context"
+	"time"
+
+	"github.com/jmoiron/sqlx"
+)
+
+//go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE
+type DB interface {
+	TestInsert(ctx context.Context, message string) error
+}
+
+type DBImpl struct {
+	db *sqlx.DB
+}
+
+var _ DB = (*DBImpl)(nil)
+
+func NewDB(db *sqlx.DB) DB {
+	return &DBImpl{db: db}
+}
+
+func (db DBImpl) TestInsert(ctx context.Context, message string) error {
+	_, err := db.db.ExecContext(ctx, "INSERT INTO test (created_at, test) VALUES ($1, $2)", time.Now(), message)
+	return err
+}
diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go
new file mode 100644
index 0000000..235657d
--- /dev/null
+++ b/src/payload-processor/processor/mock_db.go
@@ -0,0 +1,54 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: db.go
+//
+// Generated by this command:
+//
+//	mockgen -source=db.go -package=processor -destination=mock_db.go
+//
+
+// Package processor is a generated GoMock package.
+package processor
+
+import (
+	context "context"
+	reflect "reflect"
+
+	gomock "go.uber.org/mock/gomock"
+)
+
+// MockDB is a mock of DB interface.
+type MockDB struct {
+	ctrl     *gomock.Controller
+	recorder *MockDBMockRecorder
+}
+
+// MockDBMockRecorder is the mock recorder for MockDB.
+type MockDBMockRecorder struct {
+	mock *MockDB
+}
+
+// NewMockDB creates a new mock instance.
+func NewMockDB(ctrl *gomock.Controller) *MockDB {
+	mock := &MockDB{ctrl: ctrl}
+	mock.recorder = &MockDBMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockDB) EXPECT() *MockDBMockRecorder {
+	return m.recorder
+}
+
+// TestInsert mocks base method.
+func (m *MockDB) TestInsert(ctx context.Context, message string) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "TestInsert", ctx, message)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// TestInsert indicates an expected call of TestInsert.
+func (mr *MockDBMockRecorder) TestInsert(ctx, message any) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TestInsert", reflect.TypeOf((*MockDB)(nil).TestInsert), ctx, message)
+}
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 4b03bd5..5f2cf25 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -10,18 +10,21 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/jmoiron/sqlx"
 	"github.com/segmentio/kafka-go"
 )
 
 type Processor struct {
 	kafka_reader *kafka.Reader
+	db           DB
 	concurrency  int
 }
 
-func NewProcessor(kafka_reader *kafka.Reader, concurrency int) Processor {
+func NewProcessor(kafka_reader *kafka.Reader, db *sqlx.DB, concurrency int) Processor {
 	log.Println("Created processor with concurrency: ", concurrency)
 	return Processor{
 		kafka_reader: kafka_reader,
+		db:           NewDB(db),
 		concurrency:  concurrency,
 	}
 }
@@ -63,7 +66,7 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			cancel()
 			break
 		}
-		err = p.handleMessage(m)
+		err = p.handleMessage(ctx, m)
 		if err != nil {
 			log.Println("failed to handle message:", err)
 			continue
@@ -72,7 +75,12 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 	}
 }
 
-func (p Processor) handleMessage(m kafka.Message) error {
+func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value))
+
+	err := p.db.TestInsert(ctx, string(m.Value))
+	if err != nil {
+		return err
+	}
 	return nil
 }
diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go
index ee2ef5f..5e954c4 100644
--- a/src/payload-processor/processor/processor_test.go
+++ b/src/payload-processor/processor/processor_test.go
@@ -1,18 +1,26 @@
 package processor
 
 import (
+	"context"
 	"testing"
 
 	"github.com/segmentio/kafka-go"
 	"github.com/stretchr/testify/require"
+	gomock "go.uber.org/mock/gomock"
 )
 
 func TestProcessMessage(t *testing.T) {
-	processor := Processor{}
+	ctrl := gomock.NewController(t)
+	mockdb := NewMockDB(ctrl)
+	processor := Processor{
+		db: mockdb,
+	}
 
 	message := []byte("test")
 
-	err := processor.handleMessage(kafka.Message{Value: message})
+	mockdb.EXPECT().TestInsert(gomock.Any(), string(message)).Return(nil)
+
+	err := processor.handleMessage(context.Background(), kafka.Message{Value: message})
 
 	require.NoError(t, err)
 }