about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-04 11:45:08 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-04 18:31:51 +0200
commitfcd5b0c6b86da778f0ba17b538c8bc1a05eedd66 (patch)
tree432750b54b08ac49fd87c25986c9eb3f119e3d16
parentpayload-processor: fmt (diff)
downloadfs-tracer-backend-fcd5b0c6b86da778f0ba17b538c8bc1a05eedd66.tar.gz
fs-tracer-backend-fcd5b0c6b86da778f0ba17b538c8bc1a05eedd66.tar.bz2
fs-tracer-backend-fcd5b0c6b86da778f0ba17b538c8bc1a05eedd66.zip
payload-processor: use consumer group and properly handle application shutdown
-rw-r--r--k8s/helmsman.yml3
-rw-r--r--src/payload-processor/cmd/main.go6
-rw-r--r--src/payload-processor/processor/processor.go20
3 files changed, 24 insertions, 5 deletions
diff --git a/k8s/helmsman.yml b/k8s/helmsman.yml
index 5579d82..47bee36 100644
--- a/k8s/helmsman.yml
+++ b/k8s/helmsman.yml
@@ -25,6 +25,9 @@ apps:
     set:
       controller.replicaCount: 1
       controller.livenessProbe.initialDelaySeconds: 120
+      extraConfig: |
+        offsets.topic.replication.factor=1
+        transaction.state.log.replication.factor=1
 
   payload-processor:
     namespace: default
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go
index 226700d..5faa293 100644
--- a/src/payload-processor/cmd/main.go
+++ b/src/payload-processor/cmd/main.go
@@ -25,9 +25,9 @@ func main() {
 			Timeout:   10 * time.Second,
 			DualStack: true,
 		},
-		Topic:     "topic-A",
-		Partition: 0,
-		MaxBytes:  10e6, // 10MB
+		Topic:    "topic-A",
+		GroupID:  "group-A",
+		MaxBytes: 10e6, // 10MB
 	})
 	processor := processor.NewProcessor(kafka_reader)
 	processor.ProcessMessages()
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 66bac0e..24d9330 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -4,6 +4,9 @@ import (
 	"context"
 	"fmt"
 	"log"
+	"os"
+	"os/signal"
+	"syscall"
 	"time"
 
 	"github.com/segmentio/kafka-go"
@@ -21,12 +24,25 @@ func NewProcessor(kafka_reader *kafka.Reader) Processor {
 }
 
 func (p Processor) ProcessMessages() {
+	signals := make(chan os.Signal, 1)
+
+	signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	// go routine for getting signals asynchronously
+	go func() {
+		sig := <-signals
+		log.Println("Got signal: ", sig)
+		cancel()
+	}()
 	for {
-		m, err := p.kafka_reader.ReadMessage(context.Background())
+		m, err := p.kafka_reader.FetchMessage(ctx)
 		if err != nil {
-			break
+			log.Panic("failed to fetch message:", err)
 		}
 		fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value))
+		p.kafka_reader.CommitMessages(ctx, m)
 	}
 
 	if err := p.kafka_reader.Close(); err != nil {