diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-04 11:45:08 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-04 18:31:51 +0200 |
commit | fcd5b0c6b86da778f0ba17b538c8bc1a05eedd66 (patch) | |
tree | 432750b54b08ac49fd87c25986c9eb3f119e3d16 | |
parent | payload-processor: fmt (diff) | |
download | fs-tracer-backend-fcd5b0c6b86da778f0ba17b538c8bc1a05eedd66.tar.gz fs-tracer-backend-fcd5b0c6b86da778f0ba17b538c8bc1a05eedd66.tar.bz2 fs-tracer-backend-fcd5b0c6b86da778f0ba17b538c8bc1a05eedd66.zip |
payload-processor: use consumer group and properly handle application shutdown
-rw-r--r-- | k8s/helmsman.yml | 3 | ||||
-rw-r--r-- | src/payload-processor/cmd/main.go | 6 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 20 |
3 files changed, 24 insertions, 5 deletions
diff --git a/k8s/helmsman.yml b/k8s/helmsman.yml index 5579d82..47bee36 100644 --- a/k8s/helmsman.yml +++ b/k8s/helmsman.yml @@ -25,6 +25,9 @@ apps: set: controller.replicaCount: 1 controller.livenessProbe.initialDelaySeconds: 120 + extraConfig: | + offsets.topic.replication.factor=1 + transaction.state.log.replication.factor=1 payload-processor: namespace: default diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 226700d..5faa293 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -25,9 +25,9 @@ func main() { Timeout: 10 * time.Second, DualStack: true, }, - Topic: "topic-A", - Partition: 0, - MaxBytes: 10e6, // 10MB + Topic: "topic-A", + GroupID: "group-A", + MaxBytes: 10e6, // 10MB }) processor := processor.NewProcessor(kafka_reader) processor.ProcessMessages() diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 66bac0e..24d9330 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "log" + "os" + "os/signal" + "syscall" "time" "github.com/segmentio/kafka-go" @@ -21,12 +24,25 @@ func NewProcessor(kafka_reader *kafka.Reader) Processor { } func (p Processor) ProcessMessages() { + signals := make(chan os.Signal, 1) + + signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) + + ctx, cancel := context.WithCancel(context.Background()) + + // go routine for getting signals asynchronously + go func() { + sig := <-signals + log.Println("Got signal: ", sig) + cancel() + }() for { - m, err := p.kafka_reader.ReadMessage(context.Background()) + m, err := p.kafka_reader.FetchMessage(ctx) if err != nil { - break + log.Panic("failed to fetch message:", err) } fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value)) + p.kafka_reader.CommitMessages(ctx, m) } if err := p.kafka_reader.Close(); err != nil { |