diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-27 20:05:04 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-27 20:06:15 +0200 |
commit | 6c7baf2bd126f98ced59ebd7ebd836ea5e639e79 (patch) | |
tree | 53f250aa8634eb61e9693156b903e6677a1a9b9e | |
parent | payload-processor: insert to the file table (diff) | |
download | fs-tracer-backend-6c7baf2bd126f98ced59ebd7ebd836ea5e639e79.tar.gz fs-tracer-backend-6c7baf2bd126f98ced59ebd7ebd836ea5e639e79.tar.bz2 fs-tracer-backend-6c7baf2bd126f98ced59ebd7ebd836ea5e639e79.zip |
payload-processor: always commit to kafka (for now)
-rw-r--r-- | src/payload-processor/processor/processor.go | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 072f0dd..2fbb6eb 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -82,9 +82,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { err = p.handleMessage(ctx, m) if err != nil { log.Println("failed to handle message:", err) - continue } - p.kafka_reader.CommitMessages(ctx, m) + p.handleResult(ctx, m, err) } } @@ -103,3 +102,11 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error { } return nil } + +func (p Processor) handleResult(ctx context.Context, m kafka.Message, err error) { + switch err { + // TODO: If its a recoverable error, don't commit. + default: + p.kafka_reader.CommitMessages(ctx, m) + } +} |