diff options
-rw-r--r-- | src/payload-processor/processor/processor.go | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 2fbb6eb..889f4c0 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -82,8 +82,10 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) + p.handleError(ctx, m, err) + return } - p.handleResult(ctx, m, err) + p.kafka_reader.CommitMessages(ctx, m) } } @@ -103,7 +105,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { return nil } -func (p Processor) handleResult(ctx context.Context, m kafka.Message, err error) { +func (p Processor) handleError(ctx context.Context, m kafka.Message, err error) { switch err { // TODO: If its a recoverable error, don't commit. default: |