about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-02 23:33:22 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-02 23:45:04 +0200
commitff6d61ac3888a023d0be424f76c7b50c8379a75f (patch)
tree9874603db6411dca1a5d2916eff57eef9f198914
parentSwitch to kafka :^) (diff)
downloadfs-tracer-backend-ff6d61ac3888a023d0be424f76c7b50c8379a75f.tar.gz
fs-tracer-backend-ff6d61ac3888a023d0be424f76c7b50c8379a75f.tar.bz2
fs-tracer-backend-ff6d61ac3888a023d0be424f76c7b50c8379a75f.zip
rest-api: Write to kafka asynchronously
-rw-r--r--src/payload-processor/cmd/main.go2
-rw-r--r--src/rest-api/cmd/main.go22
2 files changed, 20 insertions, 4 deletions
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go
index 8e158e9..226700d 100644
--- a/src/payload-processor/cmd/main.go
+++ b/src/payload-processor/cmd/main.go
@@ -26,7 +26,7 @@ func main() {
 			DualStack: true,
 		},
 		Topic:     "topic-A",
-		Partition: 0,    //TODO: What
+		Partition: 0,
 		MaxBytes:  10e6, // 10MB
 	})
 	processor := processor.NewProcessor(kafka_reader)
diff --git a/src/rest-api/cmd/main.go b/src/rest-api/cmd/main.go
index 469957f..b21115d 100644
--- a/src/rest-api/cmd/main.go
+++ b/src/rest-api/cmd/main.go
@@ -1,10 +1,12 @@
 package main
 
 import (
+	"context"
 	"fmt"
 	"log"
 	"net/http"
 	"os"
+	"time"
 
 	"github.com/Baitinq/fs-tracer-backend/src/rest-api/handler"
 	"github.com/segmentio/kafka-go"
@@ -17,6 +19,20 @@ func main() {
 		log.Fatal("KAFKA_PASSWORD not set")
 	}
 
+	dialer := &kafka.Dialer{
+		Timeout:   10 * time.Second,
+		DualStack: true,
+		SASLMechanism: plain.Mechanism{
+			Username: "user1",
+			Password: kafka_password,
+		},
+	}
+	// Create topic
+	_, err := dialer.DialLeader(context.Background(), "tcp", "kafka.default.svc.cluster.local:9092", "topic-A", 0)
+	if err != nil {
+		log.Fatal(err)
+	}
+
 	kafka_writer := &kafka.Writer{
 		Addr: kafka.TCP("kafka.default.svc.cluster.local:9092"),
 		Transport: &kafka.Transport{
@@ -25,9 +41,9 @@ func main() {
 				Password: kafka_password,
 			},
 		},
-		Topic:    "topic-A",
-		Balancer: &kafka.LeastBytes{},
-		// Async:                  true, //TODO: Creat the topic beforehand, if not this doesnt work
+		Topic:                  "topic-A",
+		Balancer:               &kafka.LeastBytes{},
+		Async:                  true,
 		AllowAutoTopicCreation: true,
 	}