about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-04 21:02:04 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-04 21:02:04 +0200
commit7347e1d37464bbe9477d10bac70f102b43585dc1 (patch)
tree3fbb5b4b346173b74e01c61e32400007d299699d
parentgo mod tidy (diff)
downloadfs-tracer-backend-7347e1d37464bbe9477d10bac70f102b43585dc1.tar.gz
fs-tracer-backend-7347e1d37464bbe9477d10bac70f102b43585dc1.tar.bz2
fs-tracer-backend-7347e1d37464bbe9477d10bac70f102b43585dc1.zip
payload-processor: add concurrency
-rw-r--r--src/payload-processor/cmd/main.go2
-rw-r--r--src/payload-processor/processor/processor.go34
2 files changed, 27 insertions, 9 deletions
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go
index 5faa293..2604d8e 100644
--- a/src/payload-processor/cmd/main.go
+++ b/src/payload-processor/cmd/main.go
@@ -29,6 +29,6 @@ func main() {
 		GroupID:  "group-A",
 		MaxBytes: 10e6, // 10MB
 	})
-	processor := processor.NewProcessor(kafka_reader)
+	processor := processor.NewProcessor(kafka_reader, 4)
 	processor.ProcessMessages()
 }
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index 24d9330..f2e4e80 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -6,6 +6,7 @@ import (
 	"log"
 	"os"
 	"os/signal"
+	"sync"
 	"syscall"
 	"time"
 
@@ -14,18 +15,19 @@ import (
 
 type Processor struct {
 	kafka_reader *kafka.Reader
+	concurrency  int
 }
 
-func NewProcessor(kafka_reader *kafka.Reader) Processor {
-	log.Println("Created processor")
+func NewProcessor(kafka_reader *kafka.Reader, concurrency int) Processor {
+	log.Println("Created processor with concurrency: ", concurrency)
 	return Processor{
 		kafka_reader: kafka_reader,
+		concurrency:  concurrency,
 	}
 }
 
 func (p Processor) ProcessMessages() {
 	signals := make(chan os.Signal, 1)
-
 	signal.Notify(signals, syscall.SIGINT, syscall.SIGKILL)
 
 	ctx, cancel := context.WithCancel(context.Background())
@@ -36,16 +38,32 @@ func (p Processor) ProcessMessages() {
 		log.Println("Got signal: ", sig)
 		cancel()
 	}()
+
+	wg := sync.WaitGroup{}
+	wg.Add(p.concurrency)
+	for i := 0; i < p.concurrency; i++ {
+		go func() {
+			defer wg.Done()
+			p.process(ctx, cancel)
+		}()
+	}
+
+	wg.Wait()
+
+	if err := p.kafka_reader.Close(); err != nil {
+		log.Fatal("failed to close reader:", err)
+	}
+}
+
+func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 	for {
 		m, err := p.kafka_reader.FetchMessage(ctx)
 		if err != nil {
-			log.Panic("failed to fetch message:", err)
+			log.Println("failed to fetch message:", err)
+			cancel()
+			break
 		}
 		fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value))
 		p.kafka_reader.CommitMessages(ctx, m)
 	}
-
-	if err := p.kafka_reader.Close(); err != nil {
-		log.Fatal("failed to close reader:", err)
-	}
 }