about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-27 20:14:02 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-27 20:14:02 +0200
commit623d6c8e6ba642c665d5a6893eb8c62410111586 (patch)
treebb63f8b2683cbcd92e951569be54007e2828c0d8
parentpayload-processor: always commit to kafka (for now) (diff)
downloadfs-tracer-backend-623d6c8e6ba642c665d5a6893eb8c62410111586.tar.gz
fs-tracer-backend-623d6c8e6ba642c665d5a6893eb8c62410111586.tar.bz2
fs-tracer-backend-623d6c8e6ba642c665d5a6893eb8c62410111586.zip
payload-processor: fmt
-rw-r--r--src/payload-processor/processor/processor.go6
1 files changed, 4 insertions, 2 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 2fbb6eb..889f4c0 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -82,8 +82,10 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 		err = p.handleMessage(ctx, m)
 		if err != nil {
 			log.Println("failed to handle message:", err)
+			p.handleError(ctx, m, err)
+			return
 		}
-		p.handleResult(ctx, m, err)
+		p.kafka_reader.CommitMessages(ctx, m)
 	}
 }
 
@@ -103,7 +105,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 	return nil
 }
 
-func (p Processor) handleResult(ctx context.Context, m kafka.Message, err error) {
+func (p Processor) handleError(ctx context.Context, m kafka.Message, err error) {
 	switch err {
 	// TODO: If its a recoverable error, don't commit.
 	default: