diff options
Diffstat (limited to 'src/payload-processor/processor/processor.go')
-rw-r--r-- | src/payload-processor/processor/processor.go | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 072f0dd..2fbb6eb 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -82,9 +82,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) - continue } - p.kafka_reader.CommitMessages(ctx, m) + p.handleResult(ctx, m, err) } } @@ -103,3 +102,11 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { } return nil } + +func (p Processor) handleResult(ctx context.Context, m kafka.Message, err error) { + switch err { + // TODO: If its a recoverable error, don't commit. + default: + p.kafka_reader.CommitMessages(ctx, m) + } +} |