about summary refs log tree commit diff
path: root/src/payload-processor/processor/processor.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r--src/payload-processor/processor/processor.go11
1 files changed, 10 insertions, 1 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index f2e4e80..fc6792a 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -63,7 +63,16 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			cancel()
 			break
 		}
-		fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value))
+		err = p.handleMessage(m)
+		if err != nil {
+			log.Println("failed to handle message:", err)
+			continue
+		}
 		p.kafka_reader.CommitMessages(ctx, m)
 	}
 }
+
+func (p Processor) handleMessage(m kafka.Message) error {
+	fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value))
+	return nil
+}