about summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--src/payload-processor/processor/processor.go11
1 files changed, 9 insertions, 2 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 072f0dd..2fbb6eb 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -82,9 +82,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 		err = p.handleMessage(ctx, m)
 		if err != nil {
 			log.Println("failed to handle message:", err)
-			continue
 		}
-		p.kafka_reader.CommitMessages(ctx, m)
+		p.handleResult(ctx, m, err)
 	}
 }
 
@@ -103,3 +102,11 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 	}
 	return nil
 }
+
+func (p Processor) handleResult(ctx context.Context, m kafka.Message, err error) {
+	switch err {
+	// TODO: If its a recoverable error, don't commit.
+	default:
+		p.kafka_reader.CommitMessages(ctx, m)
+	}
+}