diff options
Diffstat (limited to 'src/payload-processor/processor/processor.go')
| -rw-r--r-- | src/payload-processor/processor/processor.go | 20 |
1 files changed, 18 insertions, 2 deletions
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 66bac0e..24d9330 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -4,6 +4,9 @@ import ( "context" "fmt" "log" + "os" + "os/signal" + "syscall" "time" "github.com/segmentio/kafka-go" @@ -21,12 +24,25 @@ func NewProcessor(kafka_reader *kafka.Reader) Processor { } func (p Processor) ProcessMessages() { + signals := make(chan os.Signal, 1) + + signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) + + ctx, cancel := context.WithCancel(context.Background()) + + // go routine for getting signals asynchronously + go func() { + sig := <-signals + log.Println("Got signal: ", sig) + cancel() + }() for { - m, err := p.kafka_reader.ReadMessage(context.Background()) + m, err := p.kafka_reader.FetchMessage(ctx) if err != nil { - break + log.Panic("failed to fetch message:", err) } fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value)) + p.kafka_reader.CommitMessages(ctx, m) } if err := p.kafka_reader.Close(); err != nil { |