about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-27 20:05:04 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-27 20:06:15 +0200
commit6c7baf2bd126f98ced59ebd7ebd836ea5e639e79 (patch)
tree53f250aa8634eb61e9693156b903e6677a1a9b9e
parentpayload-processor: insert to the file table (diff)
downloadfs-tracer-backend-6c7baf2bd126f98ced59ebd7ebd836ea5e639e79.tar.gz
fs-tracer-backend-6c7baf2bd126f98ced59ebd7ebd836ea5e639e79.tar.bz2
fs-tracer-backend-6c7baf2bd126f98ced59ebd7ebd836ea5e639e79.zip
payload-processor: always commit to kafka (for now)
-rw-r--r--src/payload-processor/processor/processor.go11
1 files changed, 9 insertions, 2 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 072f0dd..2fbb6eb 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -82,9 +82,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 		err = p.handleMessage(ctx, m)
 		if err != nil {
 			log.Println("failed to handle message:", err)
-			continue
 		}
-		p.kafka_reader.CommitMessages(ctx, m)
+		p.handleResult(ctx, m, err)
 	}
 }
 
@@ -103,3 +102,11 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 	}
 	return nil
 }
+
+func (p Processor) handleResult(ctx context.Context, m kafka.Message, err error) {
+	switch err {
+	// TODO: If its a recoverable error, don't commit.
+	default:
+		p.kafka_reader.CommitMessages(ctx, m)
+	}
+}