about summary refs log tree commit diff
path: root/src/payload-processor/processor/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r--src/payload-processor/processor/processor.go14
1 files changed, 11 insertions, 3 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 4b03bd5..5f2cf25 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -10,18 +10,21 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/jmoiron/sqlx"
 	"github.com/segmentio/kafka-go"
 )
 
 type Processor struct {
 	kafka_reader *kafka.Reader
+	db           DB
 	concurrency  int
 }
 
-func NewProcessor(kafka_reader *kafka.Reader, concurrency int) Processor {
+func NewProcessor(kafka_reader *kafka.Reader, db *sqlx.DB, concurrency int) Processor {
 	log.Println("Created processor with concurrency: ", concurrency)
 	return Processor{
 		kafka_reader: kafka_reader,
+		db:           NewDB(db),
 		concurrency:  concurrency,
 	}
 }
@@ -63,7 +66,7 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			cancel()
 			break
 		}
-		err = p.handleMessage(m)
+		err = p.handleMessage(ctx, m)
 		if err != nil {
 			log.Println("failed to handle message:", err)
 			continue
@@ -72,7 +75,12 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 	}
 }
 
-func (p Processor) handleMessage(m kafka.Message) error {
+func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value))
+
+	err := p.db.TestInsert(ctx, string(m.Value))
+	if err != nil {
+		return err
+	}
 	return nil
 }