about summary refs log tree commit diff
path: root/src/payload-processor/cmd/main.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/payload-processor/cmd/main.go')
-rw-r--r--src/payload-processor/cmd/main.go15
1 files changed, 14 insertions, 1 deletions
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go
index 2604d8e..b86f5d1 100644
--- a/src/payload-processor/cmd/main.go
+++ b/src/payload-processor/cmd/main.go
@@ -1,11 +1,14 @@
 package main
 
 import (
+	"fmt"
 	"log"
 	"os"
 	"time"
 
 	"github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor"
+	"github.com/jmoiron/sqlx"
+	_ "github.com/lib/pq"
 	"github.com/segmentio/kafka-go"
 	"github.com/segmentio/kafka-go/sasl/plain"
 )
@@ -29,6 +32,16 @@ func main() {
 		GroupID:  "group-A",
 		MaxBytes: 10e6, // 10MB
 	})
-	processor := processor.NewProcessor(kafka_reader, 4)
+
+	db_password, ok := os.LookupEnv("DB_PASSWORD")
+	if !ok {
+		log.Fatal("DB_PASSWORD not set")
+	}
+	db, err := sqlx.Connect("postgres", fmt.Sprintf("postgres://postgres.slpoocycjgqsuoedhkbn:%s@aws-0-eu-central-1.pooler.supabase.com:5432/postgres", db_password))
+	if err != nil {
+		log.Fatal("cannot initalize db client", err)
+	}
+
+	processor := processor.NewProcessor(kafka_reader, db, 4)
 	processor.ProcessMessages()
 }