diff options
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r-- | src/payload-processor/processor/processor.go | 10 |
1 files changed, 5 insertions, 5 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 1203fea..37632ea 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -76,11 +76,11 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { // TODO: Remove after testing if string(m.Value) == "" { - m.Value = []byte(fmt.Sprintf(`{ + m.Value = []byte(fmt.Sprintf(`[{ "absolute_path": "/home/user/file.txt", "contents": "Hello, World!", "timestamp": "%s" - }`, time.Now().Format(time.RFC3339))) + }]`, time.Now().Format(time.RFC3339))) } err = p.handleMessage(ctx, m, string(user_id)) @@ -96,13 +96,13 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { func (p Processor) handleMessage(ctx context.Context, m kafka.Message, user_id string) error { fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s, user_id = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value), user_id) - var file lib.File - err := json.Unmarshal(m.Value, &file) + var files []lib.File + err := json.Unmarshal(m.Value, &files) if err != nil { return err } - err = p.db.InsertFile(ctx, file, user_id) + err = p.db.InsertFiles(ctx, files, user_id) if err != nil { return err } |