about summary refs log tree commit diff
path: root/src/payload-processor/processor/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r--src/payload-processor/processor/processor.go21
1 files changed, 20 insertions, 1 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 5f2cf25..072f0dd 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -2,6 +2,7 @@ package processor
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"log"
 	"os"
@@ -10,6 +11,7 @@ import (
 	"syscall"
 	"time"
 
+	"github.com/google/uuid"
 	"github.com/jmoiron/sqlx"
 	"github.com/segmentio/kafka-go"
 )
@@ -66,6 +68,17 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			cancel()
 			break
 		}
+
+		// TODO: Remove after testing
+		if string(m.Value) == "" {
+			m.Value = []byte(fmt.Sprintf(`{
+				"user_id": "%s",
+				"absolute_path": "/home/user/file.txt",
+				"contents": "Hello, World!",
+				"timestamp": "%s"
+			}`, uuid.New(), time.Now().Format(time.RFC3339)))
+		}
+
 		err = p.handleMessage(ctx, m)
 		if err != nil {
 			log.Println("failed to handle message:", err)
@@ -78,7 +91,13 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value))
 
-	err := p.db.TestInsert(ctx, string(m.Value))
+	var file File
+	err := json.Unmarshal(m.Value, &file)
+	if err != nil {
+		return err
+	}
+
+	err = p.db.InsertFile(ctx, file)
 	if err != nil {
 		return err
 	}