diff options
| author | Baitinq <[email protected]> | 2024-05-02 22:17:36 +0200 |
|---|---|---|
| committer | Baitinq <[email protected]> | 2024-05-02 23:20:31 +0200 |
| commit | a49298ee8a3c3fd7426154c36052663392e60752 (patch) | |
| tree | 5f18bfcdef509c2c061025c416ce7d5076b6e147 /src | |
| parent | misc: add deploy.sh script (diff) | |
| download | fs-tracer-backend-a49298ee8a3c3fd7426154c36052663392e60752.tar.gz fs-tracer-backend-a49298ee8a3c3fd7426154c36052663392e60752.tar.bz2 fs-tracer-backend-a49298ee8a3c3fd7426154c36052663392e60752.zip | |
Switch to kafka :^)
Diffstat (limited to 'src')
| -rw-r--r-- | src/payload-processor/cmd/BUILD.bazel | 3 | ||||
| -rw-r--r-- | src/payload-processor/cmd/main.go | 49 | ||||
| -rw-r--r-- | src/payload-processor/processor/BUILD.bazel | 2 | ||||
| -rw-r--r-- | src/payload-processor/processor/processor.go | 33 | ||||
| -rw-r--r-- | src/rest-api/cmd/BUILD.bazel | 3 | ||||
| -rw-r--r-- | src/rest-api/cmd/main.go | 42 | ||||
| -rw-r--r-- | src/rest-api/handler/BUILD.bazel | 2 | ||||
| -rw-r--r-- | src/rest-api/handler/handler.go | 21 |
8 files changed, 68 insertions, 87 deletions
diff --git a/src/payload-processor/cmd/BUILD.bazel b/src/payload-processor/cmd/BUILD.bazel index 515c17e..a11c89c 100644 --- a/src/payload-processor/cmd/BUILD.bazel +++ b/src/payload-processor/cmd/BUILD.bazel @@ -9,7 +9,8 @@ go_library( visibility = ["//visibility:private"], deps = [ "//src/payload-processor/processor", - "@com_github_rabbitmq_amqp091_go//:amqp091-go", + "@com_github_segmentio_kafka_go//:kafka-go", + "@com_github_segmentio_kafka_go//sasl/plain", ], ) diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 1c9bfcb..8e158e9 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -1,43 +1,34 @@ package main import ( - "fmt" "log" "os" + "time" "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" ) func main() { - rabbitmq_password, ok := os.LookupEnv("RABBITMQ_PASSWORD") + kafka_password, ok := os.LookupEnv("KAFKA_PASSWORD") if !ok { - log.Fatal("RABBITMQ_PASSWORD not set") + log.Fatal("KAFKA_PASSWORD not set") } - log.Println("RabbitMQ password", rabbitmq_password) - conn, err := amqp.Dial(fmt.Sprintf("amqp://user:%s@rabbitmq:5672/", rabbitmq_password)) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - - ch, err := conn.Channel() - if err != nil { - log.Fatal(err) - } - defer ch.Close() - - q, err := ch.QueueDeclare( - "hello", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Fatal(err) - } - processor := processor.NewProcessor(ch, q.Name) + kafka_reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"kafka.default.svc.cluster.local:9092"}, + Dialer: &kafka.Dialer{ + SASLMechanism: plain.Mechanism{ + Username: "user1", + Password: kafka_password, + }, + Timeout: 10 * time.Second, + DualStack: true, + }, + Topic: "topic-A", + Partition: 0, //TODO: What + MaxBytes: 10e6, // 10MB + }) + processor := processor.NewProcessor(kafka_reader) processor.ProcessMessages() } diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index 64e3ac2..123059d 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -5,5 +5,5 @@ go_library( srcs = ["processor.go"], importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor", visibility = ["//visibility:public"], - deps = ["@com_github_rabbitmq_amqp091_go//:amqp091-go"], + deps = ["@com_github_segmentio_kafka_go//:kafka-go"], ) diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 3eb089b..435a954 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -1,39 +1,34 @@ package processor import ( + "context" + "fmt" "log" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" ) type Processor struct { - ch *amqp.Channel - queueName string + kafka_reader *kafka.Reader } -func NewProcessor(ch *amqp.Channel, queueName string) Processor { +func NewProcessor(kafka_reader *kafka.Reader) Processor { log.Println("Created processor") return Processor{ - ch: ch, - queueName: queueName, + kafka_reader: kafka_reader, } } func (p Processor) ProcessMessages() { - msgs, err := p.ch.Consume( - p.queueName, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - log.Fatal("Failed to register a consumer:", err) + for { + m, err := p.kafka_reader.ReadMessage(context.Background()) + if err != nil { + break + } + fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } - for msg := range msgs { - log.Printf("Received a message: %s", msg.Body) + if err := p.kafka_reader.Close(); err != nil { + log.Fatal("failed to close reader:", err) } } diff --git a/src/rest-api/cmd/BUILD.bazel b/src/rest-api/cmd/BUILD.bazel index 842f6f9..2a142c7 100644 --- a/src/rest-api/cmd/BUILD.bazel +++ b/src/rest-api/cmd/BUILD.bazel @@ -9,7 +9,8 @@ go_library( visibility = ["//visibility:private"], deps = [ "//src/rest-api/handler", - "@com_github_rabbitmq_amqp091_go//:amqp091-go", + "@com_github_segmentio_kafka_go//:kafka-go", + "@com_github_segmentio_kafka_go//sasl/plain", ], ) diff --git a/src/rest-api/cmd/main.go b/src/rest-api/cmd/main.go index f6d34e0..469957f 100644 --- a/src/rest-api/cmd/main.go +++ b/src/rest-api/cmd/main.go @@ -7,39 +7,31 @@ import ( "os" "github.com/Baitinq/fs-tracer-backend/src/rest-api/handler" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" ) func main() { - rabbitmq_password, ok := os.LookupEnv("RABBITMQ_PASSWORD") + kafka_password, ok := os.LookupEnv("KAFKA_PASSWORD") if !ok { - log.Fatal("RABBITMQ_PASSWORD not set") + log.Fatal("KAFKA_PASSWORD not set") } - log.Println("RabbitMQ password", rabbitmq_password) - conn, err := amqp.Dial(fmt.Sprintf("amqp://user:%s@rabbitmq:5672/", rabbitmq_password)) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - ch, err := conn.Channel() - if err != nil { - log.Fatal(err) + kafka_writer := &kafka.Writer{ + Addr: kafka.TCP("kafka.default.svc.cluster.local:9092"), + Transport: &kafka.Transport{ + SASL: plain.Mechanism{ + Username: "user1", + Password: kafka_password, + }, + }, + Topic: "topic-A", + Balancer: &kafka.LeastBytes{}, + // Async: true, //TODO: Creat the topic beforehand, if not this doesnt work + AllowAutoTopicCreation: true, } - defer ch.Close() - q, err := ch.QueueDeclare( - "hello", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Fatal(err) - } - handler := handler.NewHandler(ch, q.Name) + handler := handler.NewHandler(kafka_writer) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { diff --git a/src/rest-api/handler/BUILD.bazel b/src/rest-api/handler/BUILD.bazel index a5638f4..89adc69 100644 --- a/src/rest-api/handler/BUILD.bazel +++ b/src/rest-api/handler/BUILD.bazel @@ -5,5 +5,5 @@ go_library( srcs = ["handler.go"], importpath = "github.com/Baitinq/fs-tracer-backend/src/rest-api/handler", visibility = ["//visibility:public"], - deps = ["@com_github_rabbitmq_amqp091_go//:amqp091-go"], + deps = ["@com_github_segmentio_kafka_go//:kafka-go"], ) diff --git a/src/rest-api/handler/handler.go b/src/rest-api/handler/handler.go index b9bb80e..8985b43 100644 --- a/src/rest-api/handler/handler.go +++ b/src/rest-api/handler/handler.go @@ -8,18 +8,16 @@ import ( "net/http" "time" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" ) type Handler struct { - ch *amqp.Channel - queueName string + kafka_writer *kafka.Writer } -func NewHandler(ch *amqp.Channel, queueName string) Handler { +func NewHandler(kafka_writer *kafka.Writer) Handler { return Handler{ - ch: ch, - queueName: queueName, + kafka_writer: kafka_writer, } } @@ -34,11 +32,14 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - go h.ch.PublishWithContext(ctx, "", h.queueName, false, false, amqp.Publishing{ - ContentType: "text/plain", - Body: []byte(body), + err = h.kafka_writer.WriteMessages(ctx, kafka.Message{ + Key: []byte("key-A"), + Value: []byte(body), }) + if err != nil { + log.Fatal(err) + } fmt.Fprint(w, "Hello, World!", string(bytes)) - log.Println("Request received", r.RemoteAddr, string(bytes)) + log.Println("Request received :)", r.RemoteAddr, string(bytes)) } |