From 623d6c8e6ba642c665d5a6893eb8c62410111586 Mon Sep 17 00:00:00 2001 From: Baitinq Date: Mon, 27 May 2024 20:14:02 +0200 Subject: payload-processor: fmt --- src/payload-processor/processor/processor.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 2fbb6eb..889f4c0 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -82,8 +82,10 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) + p.handleError(ctx, m, err) + return } - p.handleResult(ctx, m, err) + p.kafka_reader.CommitMessages(ctx, m) } } @@ -103,7 +105,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { return nil } -func (p Processor) handleResult(ctx context.Context, m kafka.Message, err error) { +func (p Processor) handleError(ctx context.Context, m kafka.Message, err error) { switch err { // TODO: If its a recoverable error, don't commit. default: -- cgit 1.4.1