diff options
Diffstat (limited to 'src/payload-processor')
-rw-r--r-- | src/payload-processor/cmd/BUILD.bazel | 6 | ||||
-rw-r--r-- | src/payload-processor/cmd/main.go | 15 | ||||
-rw-r--r-- | src/payload-processor/processor/BUILD.bazel | 17 | ||||
-rw-r--r-- | src/payload-processor/processor/db.go | 28 | ||||
-rw-r--r-- | src/payload-processor/processor/mock_db.go | 54 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 14 | ||||
-rw-r--r-- | src/payload-processor/processor/processor_test.go | 12 |
7 files changed, 134 insertions, 12 deletions
diff --git a/src/payload-processor/cmd/BUILD.bazel b/src/payload-processor/cmd/BUILD.bazel index a11c89c..61b6d50 100644 --- a/src/payload-processor/cmd/BUILD.bazel +++ b/src/payload-processor/cmd/BUILD.bazel @@ -9,8 +9,10 @@ go_library( visibility = ["//visibility:private"], deps = [ "//src/payload-processor/processor", - "@com_github_segmentio_kafka_go//:kafka-go", - "@com_github_segmentio_kafka_go//sasl/plain", + "//vendor/github.com/jmoiron/sqlx", + "//vendor/github.com/lib/pq", + "//vendor/github.com/segmentio/kafka-go", + "//vendor/github.com/segmentio/kafka-go/sasl/plain", ], ) diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 2604d8e..b86f5d1 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -1,11 +1,14 @@ package main import ( + "fmt" "log" "os" "time" "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor" + "github.com/jmoiron/sqlx" + _ "github.com/lib/pq" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl/plain" ) @@ -29,6 +32,16 @@ func main() { GroupID: "group-A", MaxBytes: 10e6, // 10MB }) - processor := processor.NewProcessor(kafka_reader, 4) + + db_password, ok := os.LookupEnv("DB_PASSWORD") + if !ok { + log.Fatal("DB_PASSWORD not set") + } + db, err := sqlx.Connect("postgres", fmt.Sprintf("postgres://postgres.slpoocycjgqsuoedhkbn:%s@aws-0-eu-central-1.pooler.supabase.com:5432/postgres", db_password)) + if err != nil { + log.Fatal("cannot initalize db client", err) + } + + processor := processor.NewProcessor(kafka_reader, db, 4) processor.ProcessMessages() } diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index f957fd3..b01383a 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -2,10 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "processor", - srcs = ["processor.go"], + srcs = [ + "db.go", + "mock_db.go", + "processor.go", + ], importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor", visibility = ["//visibility:public"], - deps = ["@com_github_segmentio_kafka_go//:kafka-go"], + deps = [ + "//vendor/github.com/jmoiron/sqlx", + "//vendor/github.com/segmentio/kafka-go", + "//vendor/go.uber.org/mock/gomock", + ], ) go_test( @@ -13,7 +21,8 @@ go_test( srcs = ["processor_test.go"], embed = [":processor"], deps = [ - "@com_github_segmentio_kafka_go//:kafka-go", - "@com_github_stretchr_testify//require", + "//vendor/github.com/segmentio/kafka-go", + "//vendor/github.com/stretchr/testify/require", + "//vendor/go.uber.org/mock/gomock", ], ) diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go new file mode 100644 index 0000000..bad31d1 --- /dev/null +++ b/src/payload-processor/processor/db.go @@ -0,0 +1,28 @@ +package processor + +import ( + "context" + "time" + + "github.com/jmoiron/sqlx" +) + +//go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE +type DB interface { + TestInsert(ctx context.Context, message string) error +} + +type DBImpl struct { + db *sqlx.DB +} + +var _ DB = (*DBImpl)(nil) + +func NewDB(db *sqlx.DB) DB { + return &DBImpl{db: db} +} + +func (db DBImpl) TestInsert(ctx context.Context, message string) error { + _, err := db.db.ExecContext(ctx, "INSERT INTO test (created_at, test) VALUES ($1, $2)", time.Now(), message) + return err +} diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go new file mode 100644 index 0000000..235657d --- /dev/null +++ b/src/payload-processor/processor/mock_db.go @@ -0,0 +1,54 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: db.go +// +// Generated by this command: +// +// mockgen -source=db.go -package=processor -destination=mock_db.go +// + +// Package processor is a generated GoMock package. +package processor + +import ( + context "context" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockDB is a mock of DB interface. +type MockDB struct { + ctrl *gomock.Controller + recorder *MockDBMockRecorder +} + +// MockDBMockRecorder is the mock recorder for MockDB. +type MockDBMockRecorder struct { + mock *MockDB +} + +// NewMockDB creates a new mock instance. +func NewMockDB(ctrl *gomock.Controller) *MockDB { + mock := &MockDB{ctrl: ctrl} + mock.recorder = &MockDBMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDB) EXPECT() *MockDBMockRecorder { + return m.recorder +} + +// TestInsert mocks base method. +func (m *MockDB) TestInsert(ctx context.Context, message string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TestInsert", ctx, message) + ret0, _ := ret[0].(error) + return ret0 +} + +// TestInsert indicates an expected call of TestInsert. +func (mr *MockDBMockRecorder) TestInsert(ctx, message any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TestInsert", reflect.TypeOf((*MockDB)(nil).TestInsert), ctx, message) +} diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 4b03bd5..5f2cf25 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -10,18 +10,21 @@ import ( "syscall" "time" + "github.com/jmoiron/sqlx" "github.com/segmentio/kafka-go" ) type Processor struct { kafka_reader *kafka.Reader + db DB concurrency int } -func NewProcessor(kafka_reader *kafka.Reader, concurrency int) Processor { +func NewProcessor(kafka_reader *kafka.Reader, db *sqlx.DB, concurrency int) Processor { log.Println("Created processor with concurrency: ", concurrency) return Processor{ kafka_reader: kafka_reader, + db: NewDB(db), concurrency: concurrency, } } @@ -63,7 +66,7 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { cancel() break } - err = p.handleMessage(m) + err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) continue @@ -72,7 +75,12 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { } } -func (p Processor) handleMessage(m kafka.Message) error { +func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value)) + + err := p.db.TestInsert(ctx, string(m.Value)) + if err != nil { + return err + } return nil } diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go index ee2ef5f..5e954c4 100644 --- a/src/payload-processor/processor/processor_test.go +++ b/src/payload-processor/processor/processor_test.go @@ -1,18 +1,26 @@ package processor import ( + "context" "testing" "github.com/segmentio/kafka-go" "github.com/stretchr/testify/require" + gomock "go.uber.org/mock/gomock" ) func TestProcessMessage(t *testing.T) { - processor := Processor{} + ctrl := gomock.NewController(t) + mockdb := NewMockDB(ctrl) + processor := Processor{ + db: mockdb, + } message := []byte("test") - err := processor.handleMessage(kafka.Message{Value: message}) + mockdb.EXPECT().TestInsert(gomock.Any(), string(message)).Return(nil) + + err := processor.handleMessage(context.Background(), kafka.Message{Value: message}) require.NoError(t, err) } |