From a49298ee8a3c3fd7426154c36052663392e60752 Mon Sep 17 00:00:00 2001 From: Baitinq Date: Thu, 2 May 2024 22:17:36 +0200 Subject: Switch to kafka :^) --- src/payload-processor/cmd/BUILD.bazel | 3 +- src/payload-processor/cmd/main.go | 49 ++++++++++++---------------- src/payload-processor/processor/BUILD.bazel | 2 +- src/payload-processor/processor/processor.go | 33 ++++++++----------- 4 files changed, 37 insertions(+), 50 deletions(-) (limited to 'src/payload-processor') diff --git a/src/payload-processor/cmd/BUILD.bazel b/src/payload-processor/cmd/BUILD.bazel index 515c17e..a11c89c 100644 --- a/src/payload-processor/cmd/BUILD.bazel +++ b/src/payload-processor/cmd/BUILD.bazel @@ -9,7 +9,8 @@ go_library( visibility = ["//visibility:private"], deps = [ "//src/payload-processor/processor", - "@com_github_rabbitmq_amqp091_go//:amqp091-go", + "@com_github_segmentio_kafka_go//:kafka-go", + "@com_github_segmentio_kafka_go//sasl/plain", ], ) diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 1c9bfcb..8e158e9 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -1,43 +1,34 @@ package main import ( - "fmt" "log" "os" + "time" "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" ) func main() { - rabbitmq_password, ok := os.LookupEnv("RABBITMQ_PASSWORD") + kafka_password, ok := os.LookupEnv("KAFKA_PASSWORD") if !ok { - log.Fatal("RABBITMQ_PASSWORD not set") + log.Fatal("KAFKA_PASSWORD not set") } - log.Println("RabbitMQ password", rabbitmq_password) - conn, err := amqp.Dial(fmt.Sprintf("amqp://user:%s@rabbitmq:5672/", rabbitmq_password)) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - - ch, err := conn.Channel() - if err != nil { - log.Fatal(err) - } - defer ch.Close() - - q, err := ch.QueueDeclare( - "hello", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Fatal(err) - } - processor := processor.NewProcessor(ch, q.Name) + kafka_reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"kafka.default.svc.cluster.local:9092"}, + Dialer: &kafka.Dialer{ + SASLMechanism: plain.Mechanism{ + Username: "user1", + Password: kafka_password, + }, + Timeout: 10 * time.Second, + DualStack: true, + }, + Topic: "topic-A", + Partition: 0, //TODO: What + MaxBytes: 10e6, // 10MB + }) + processor := processor.NewProcessor(kafka_reader) processor.ProcessMessages() } diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index 64e3ac2..123059d 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -5,5 +5,5 @@ go_library( srcs = ["processor.go"], importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor", visibility = ["//visibility:public"], - deps = ["@com_github_rabbitmq_amqp091_go//:amqp091-go"], + deps = ["@com_github_segmentio_kafka_go//:kafka-go"], ) diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 3eb089b..435a954 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -1,39 +1,34 @@ package processor import ( + "context" + "fmt" "log" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" ) type Processor struct { - ch *amqp.Channel - queueName string + kafka_reader *kafka.Reader } -func NewProcessor(ch *amqp.Channel, queueName string) Processor { +func NewProcessor(kafka_reader *kafka.Reader) Processor { log.Println("Created processor") return Processor{ - ch: ch, - queueName: queueName, + kafka_reader: kafka_reader, } } func (p Processor) ProcessMessages() { - msgs, err := p.ch.Consume( - p.queueName, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - log.Fatal("Failed to register a consumer:", err) + for { + m, err := p.kafka_reader.ReadMessage(context.Background()) + if err != nil { + break + } + fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } - for msg := range msgs { - log.Printf("Received a message: %s", msg.Body) + if err := p.kafka_reader.Close(); err != nil { + log.Fatal("failed to close reader:", err) } } -- cgit 1.4.1