diff options
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r-- | src/payload-processor/processor/processor.go | 26 |
1 files changed, 19 insertions, 7 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index ab128cb..1203fea 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -12,7 +12,6 @@ import ( "time" "github.com/Baitinq/fs-tracer-backend/lib" - "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/segmentio/kafka-go" ) @@ -70,17 +69,21 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { break } + user_id, err := getHeaderValue(m.Headers, "user_id") + if err != nil { + log.Fatal("failed to get user_id from headers:", err) + } + // TODO: Remove after testing if string(m.Value) == "" { m.Value = []byte(fmt.Sprintf(`{ - "user_id": "%s", "absolute_path": "/home/user/file.txt", "contents": "Hello, World!", "timestamp": "%s" - }`, uuid.New(), time.Now().Format(time.RFC3339))) + }`, time.Now().Format(time.RFC3339))) } - err = p.handleMessage(ctx, m) + err = p.handleMessage(ctx, m, string(user_id)) if err != nil { log.Println("failed to handle message:", err) p.handleError(ctx, m, err) @@ -90,8 +93,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { } } -func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { - fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value)) +func (p Processor) handleMessage(ctx context.Context, m kafka.Message, user_id string) error { + fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s, user_id = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value), user_id) var file lib.File err := json.Unmarshal(m.Value, &file) @@ -99,7 +102,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { return err } - err = p.db.InsertFile(ctx, file) + err = p.db.InsertFile(ctx, file, user_id) if err != nil { return err } @@ -113,3 +116,12 @@ func (p Processor) handleError(ctx context.Context, m kafka.Message, err error) p.kafka_reader.CommitMessages(ctx, m) } } + +func getHeaderValue(headers []kafka.Header, key string) ([]byte, error) { + for _, header := range headers { + if header.Key == key { + return header.Value, nil + } + } + return []byte{}, fmt.Errorf("Header %s not found", key) +} |