diff options
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r-- | src/payload-processor/processor/processor.go | 21 |
1 files changed, 20 insertions, 1 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 5f2cf25..072f0dd 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -2,6 +2,7 @@ package processor import ( "context" + "encoding/json" "fmt" "log" "os" @@ -10,6 +11,7 @@ import ( "syscall" "time" + "github.com/google/uuid" "github.com/jmoiron/sqlx" "github.com/segmentio/kafka-go" ) @@ -66,6 +68,17 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { cancel() break } + + // TODO: Remove after testing + if string(m.Value) == "" { + m.Value = []byte(fmt.Sprintf(`{ + "user_id": "%s", + "absolute_path": "/home/user/file.txt", + "contents": "Hello, World!", + "timestamp": "%s" + }`, uuid.New(), time.Now().Format(time.RFC3339))) + } + err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) @@ -78,7 +91,13 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value)) - err := p.db.TestInsert(ctx, string(m.Value)) + var file File + err := json.Unmarshal(m.Value, &file) + if err != nil { + return err + } + + err = p.db.InsertFile(ctx, file) if err != nil { return err } |