diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-27 20:14:02 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-27 20:14:02 +0200 |
commit | 623d6c8e6ba642c665d5a6893eb8c62410111586 (patch) | |
tree | bb63f8b2683cbcd92e951569be54007e2828c0d8 | |
parent | payload-processor: always commit to kafka (for now) (diff) | |
download | fs-tracer-backend-623d6c8e6ba642c665d5a6893eb8c62410111586.tar.gz fs-tracer-backend-623d6c8e6ba642c665d5a6893eb8c62410111586.tar.bz2 fs-tracer-backend-623d6c8e6ba642c665d5a6893eb8c62410111586.zip |
payload-processor: fmt
-rw-r--r-- | src/payload-processor/processor/processor.go | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 2fbb6eb..889f4c0 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -82,8 +82,10 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) + p.handleError(ctx, m, err) + return } - p.handleResult(ctx, m, err) + p.kafka_reader.CommitMessages(ctx, m) } } @@ -103,7 +105,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { return nil } -func (p Processor) handleResult(ctx context.Context, m kafka.Message, err error) { +func (p Processor) handleError(ctx context.Context, m kafka.Message, err error) { switch err { // TODO: If its a recoverable error, don't commit. default: |