From fcd5b0c6b86da778f0ba17b538c8bc1a05eedd66 Mon Sep 17 00:00:00 2001 From: Baitinq Date: Sat, 4 May 2024 11:45:08 +0200 Subject: payload-processor: use consumer group and properly handle application shutdown --- src/payload-processor/cmd/main.go | 6 +++--- src/payload-processor/processor/processor.go | 20 ++++++++++++++++++-- 2 files changed, 21 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 226700d..5faa293 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -25,9 +25,9 @@ func main() { Timeout: 10 * time.Second, DualStack: true, }, - Topic: "topic-A", - Partition: 0, - MaxBytes: 10e6, // 10MB + Topic: "topic-A", + GroupID: "group-A", + MaxBytes: 10e6, // 10MB }) processor := processor.NewProcessor(kafka_reader) processor.ProcessMessages() diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 66bac0e..24d9330 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "log" + "os" + "os/signal" + "syscall" "time" "github.com/segmentio/kafka-go" @@ -21,12 +24,25 @@ func NewProcessor(kafka_reader *kafka.Reader) Processor { } func (p Processor) ProcessMessages() { + signals := make(chan os.Signal, 1) + + signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) + + ctx, cancel := context.WithCancel(context.Background()) + + // go routine for getting signals asynchronously + go func() { + sig := <-signals + log.Println("Got signal: ", sig) + cancel() + }() for { - m, err := p.kafka_reader.ReadMessage(context.Background()) + m, err := p.kafka_reader.FetchMessage(ctx) if err != nil { - break + log.Panic("failed to fetch message:", err) } fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value)) + p.kafka_reader.CommitMessages(ctx, m) } if err := p.kafka_reader.Close(); err != nil { -- cgit 1.4.1