diff options
author | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-02 23:33:22 +0200 |
---|---|---|
committer | Baitinq <manuelpalenzuelamerino@gmail.com> | 2024-05-02 23:45:04 +0200 |
commit | ff6d61ac3888a023d0be424f76c7b50c8379a75f (patch) | |
tree | 9874603db6411dca1a5d2916eff57eef9f198914 | |
parent | Switch to kafka :^) (diff) | |
download | fs-tracer-backend-ff6d61ac3888a023d0be424f76c7b50c8379a75f.tar.gz fs-tracer-backend-ff6d61ac3888a023d0be424f76c7b50c8379a75f.tar.bz2 fs-tracer-backend-ff6d61ac3888a023d0be424f76c7b50c8379a75f.zip |
rest-api: Write to kafka asynchronously
-rw-r--r-- | src/payload-processor/cmd/main.go | 2 | ||||
-rw-r--r-- | src/rest-api/cmd/main.go | 22 |
2 files changed, 20 insertions, 4 deletions
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 8e158e9..226700d 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -26,7 +26,7 @@ func main() { DualStack: true, }, Topic: "topic-A", - Partition: 0, //TODO: What + Partition: 0, MaxBytes: 10e6, // 10MB }) processor := processor.NewProcessor(kafka_reader) diff --git a/src/rest-api/cmd/main.go b/src/rest-api/cmd/main.go index 469957f..b21115d 100644 --- a/src/rest-api/cmd/main.go +++ b/src/rest-api/cmd/main.go @@ -1,10 +1,12 @@ package main import ( + "context" "fmt" "log" "net/http" "os" + "time" "github.com/Baitinq/fs-tracer-backend/src/rest-api/handler" "github.com/segmentio/kafka-go" @@ -17,6 +19,20 @@ func main() { log.Fatal("KAFKA_PASSWORD not set") } + dialer := &kafka.Dialer{ + Timeout: 10 * time.Second, + DualStack: true, + SASLMechanism: plain.Mechanism{ + Username: "user1", + Password: kafka_password, + }, + } + // Create topic + _, err := dialer.DialLeader(context.Background(), "tcp", "kafka.default.svc.cluster.local:9092", "topic-A", 0) + if err != nil { + log.Fatal(err) + } + kafka_writer := &kafka.Writer{ Addr: kafka.TCP("kafka.default.svc.cluster.local:9092"), Transport: &kafka.Transport{ @@ -25,9 +41,9 @@ func main() { Password: kafka_password, }, }, - Topic: "topic-A", - Balancer: &kafka.LeastBytes{}, - // Async: true, //TODO: Creat the topic beforehand, if not this doesnt work + Topic: "topic-A", + Balancer: &kafka.LeastBytes{}, + Async: true, AllowAutoTopicCreation: true, } |