diff options
-rw-r--r-- | k8s/helmsman.yml | 3 | ||||
-rw-r--r-- | src/payload-processor/cmd/main.go | 6 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 20 |
3 files changed, 24 insertions, 5 deletions
diff --git a/k8s/helmsman.yml b/k8s/helmsman.yml index 5579d82..47bee36 100644 --- a/k8s/helmsman.yml +++ b/k8s/helmsman.yml @@ -25,6 +25,9 @@ apps: set: controller.replicaCount: 1 controller.livenessProbe.initialDelaySeconds: 120 + extraConfig: | + offsets.topic.replication.factor=1 + transaction.state.log.replication.factor=1 payload-processor: namespace: default diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 226700d..5faa293 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -25,9 +25,9 @@ func main() { Timeout: 10 * time.Second, DualStack: true, }, - Topic: "topic-A", - Partition: 0, - MaxBytes: 10e6, // 10MB + Topic: "topic-A", + GroupID: "group-A", + MaxBytes: 10e6, // 10MB }) processor := processor.NewProcessor(kafka_reader) processor.ProcessMessages() diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 66bac0e..24d9330 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "log" + "os" + "os/signal" + "syscall" "time" "github.com/segmentio/kafka-go" @@ -21,12 +24,25 @@ func NewProcessor(kafka_reader *kafka.Reader) Processor { } func (p Processor) ProcessMessages() { + signals := make(chan os.Signal, 1) + + signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) + + ctx, cancel := context.WithCancel(context.Background()) + + // go routine for getting signals asynchronously + go func() { + sig := <-signals + log.Println("Got signal: ", sig) + cancel() + }() for { - m, err := p.kafka_reader.ReadMessage(context.Background()) + m, err := p.kafka_reader.FetchMessage(ctx) if err != nil { - break + log.Panic("failed to fetch message:", err) } fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value)) + p.kafka_reader.CommitMessages(ctx, m) } if err := p.kafka_reader.Close(); err != nil { |