about summary refs log tree commit diff
path: root/src/payload-processor/processor/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r--src/payload-processor/processor/processor.go26
1 files changed, 19 insertions, 7 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index ab128cb..1203fea 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -12,7 +12,6 @@ import (
 	"time"
 
 	"github.com/Baitinq/fs-tracer-backend/lib"
-	"github.com/google/uuid"
 	"github.com/jmoiron/sqlx"
 	"github.com/segmentio/kafka-go"
 )
@@ -70,17 +69,21 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			break
 		}
 
+		user_id, err := getHeaderValue(m.Headers, "user_id")
+		if err != nil {
+			log.Fatal("failed to get user_id from headers:", err)
+		}
+
 		// TODO: Remove after testing
 		if string(m.Value) == "" {
 			m.Value = []byte(fmt.Sprintf(`{
-				"user_id": "%s",
 				"absolute_path": "/home/user/file.txt",
 				"contents": "Hello, World!",
 				"timestamp": "%s"
-			}`, uuid.New(), time.Now().Format(time.RFC3339)))
+			}`, time.Now().Format(time.RFC3339)))
 		}
 
-		err = p.handleMessage(ctx, m)
+		err = p.handleMessage(ctx, m, string(user_id))
 		if err != nil {
 			log.Println("failed to handle message:", err)
 			p.handleError(ctx, m, err)
@@ -90,8 +93,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 	}
 }
 
-func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
-	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value))
+func (p Processor) handleMessage(ctx context.Context, m kafka.Message, user_id string) error {
+	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s, user_id = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value), user_id)
 
 	var file lib.File
 	err := json.Unmarshal(m.Value, &file)
@@ -99,7 +102,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 		return err
 	}
 
-	err = p.db.InsertFile(ctx, file)
+	err = p.db.InsertFile(ctx, file, user_id)
 	if err != nil {
 		return err
 	}
@@ -113,3 +116,12 @@ func (p Processor) handleError(ctx context.Context, m kafka.Message, err error)
 		p.kafka_reader.CommitMessages(ctx, m)
 	}
 }
+
+func getHeaderValue(headers []kafka.Header, key string) ([]byte, error) {
+	for _, header := range headers {
+		if header.Key == key {
+			return header.Value, nil
+		}
+	}
+	return []byte{}, fmt.Errorf("Header %s not found", key)
+}