about summary refs log tree commit diff
path: root/src/payload-processor/processor/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r--src/payload-processor/processor/processor.go10
1 files changed, 5 insertions, 5 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 1203fea..37632ea 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -76,11 +76,11 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 
 		// TODO: Remove after testing
 		if string(m.Value) == "" {
-			m.Value = []byte(fmt.Sprintf(`{
+			m.Value = []byte(fmt.Sprintf(`[{
 				"absolute_path": "/home/user/file.txt",
 				"contents": "Hello, World!",
 				"timestamp": "%s"
-			}`, time.Now().Format(time.RFC3339)))
+			}]`, time.Now().Format(time.RFC3339)))
 		}
 
 		err = p.handleMessage(ctx, m, string(user_id))
@@ -96,13 +96,13 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 func (p Processor) handleMessage(ctx context.Context, m kafka.Message, user_id string) error {
 	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s, user_id = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value), user_id)
 
-	var file lib.File
-	err := json.Unmarshal(m.Value, &file)
+	var files []lib.File
+	err := json.Unmarshal(m.Value, &files)
 	if err != nil {
 		return err
 	}
 
-	err = p.db.InsertFile(ctx, file, user_id)
+	err = p.db.InsertFiles(ctx, files, user_id)
 	if err != nil {
 		return err
 	}