diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-04 21:02:04 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-04 21:02:04 +0200 |
commit | 7347e1d37464bbe9477d10bac70f102b43585dc1 (patch) | |
tree | 3fbb5b4b346173b74e01c61e32400007d299699d | |
parent | go mod tidy (diff) | |
download | fs-tracer-backend-7347e1d37464bbe9477d10bac70f102b43585dc1.tar.gz fs-tracer-backend-7347e1d37464bbe9477d10bac70f102b43585dc1.tar.bz2 fs-tracer-backend-7347e1d37464bbe9477d10bac70f102b43585dc1.zip |
payload-processor: add concurrency
-rw-r--r-- | src/payload-processor/cmd/main.go | 2 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 34 |
2 files changed, 27 insertions, 9 deletions
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 5faa293..2604d8e 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -29,6 +29,6 @@ func main() { GroupID: "group-A", MaxBytes: 10e6, // 10MB }) - processor := processor.NewProcessor(kafka_reader) + processor := processor.NewProcessor(kafka_reader, 4) processor.ProcessMessages() } diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 24d9330..f2e4e80 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "sync" "syscall" "time" @@ -14,18 +15,19 @@ import ( type Processor struct { kafka_reader *kafka.Reader + concurrency int } -func NewProcessor(kafka_reader *kafka.Reader) Processor { - log.Println("Created processor") +func NewProcessor(kafka_reader *kafka.Reader, concurrency int) Processor { + log.Println("Created processor with concurrency: ", concurrency) return Processor{ kafka_reader: kafka_reader, + concurrency: concurrency, } } func (p Processor) ProcessMessages() { signals := make(chan os.Signal, 1) - signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL) ctx, cancel := context.WithCancel(context.Background()) @@ -36,16 +38,32 @@ func (p Processor) ProcessMessages() { log.Println("Got signal: ", sig) cancel() }() + + wg := sync.WaitGroup{} + wg.Add(p.concurrency) + for i := 0; i < p.concurrency; i++ { + go func() { + defer wg.Done() + p.process(ctx, cancel) + }() + } + + wg.Wait() + + if err := p.kafka_reader.Close(); err != nil { + log.Fatal("failed to close reader:", err) + } +} + +func (p Processor) process(ctx context.Context, cancel context.CancelFunc) { for { m, err := p.kafka_reader.FetchMessage(ctx) if err != nil { - log.Panic("failed to fetch message:", err) + log.Println("failed to fetch message:", err) + cancel() + break } fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value)) p.kafka_reader.CommitMessages(ctx, m) } - - if err := p.kafka_reader.Close(); err != nil { - log.Fatal("failed to close reader:", err) - } } |