diff options
| author | Baitinq <[email protected]> | 2024-05-02 22:17:36 +0200 |
|---|---|---|
| committer | Baitinq <[email protected]> | 2024-05-02 23:20:31 +0200 |
| commit | a49298ee8a3c3fd7426154c36052663392e60752 (patch) | |
| tree | 5f18bfcdef509c2c061025c416ce7d5076b6e147 /src/rest-api | |
| parent | misc: add deploy.sh script (diff) | |
| download | fs-tracer-backend-a49298ee8a3c3fd7426154c36052663392e60752.tar.gz fs-tracer-backend-a49298ee8a3c3fd7426154c36052663392e60752.tar.bz2 fs-tracer-backend-a49298ee8a3c3fd7426154c36052663392e60752.zip | |
Switch to kafka :^)
Diffstat (limited to 'src/rest-api')
| -rw-r--r-- | src/rest-api/cmd/BUILD.bazel | 3 | ||||
| -rw-r--r-- | src/rest-api/cmd/main.go | 42 | ||||
| -rw-r--r-- | src/rest-api/handler/BUILD.bazel | 2 | ||||
| -rw-r--r-- | src/rest-api/handler/handler.go | 21 |
4 files changed, 31 insertions, 37 deletions
diff --git a/src/rest-api/cmd/BUILD.bazel b/src/rest-api/cmd/BUILD.bazel index 842f6f9..2a142c7 100644 --- a/src/rest-api/cmd/BUILD.bazel +++ b/src/rest-api/cmd/BUILD.bazel @@ -9,7 +9,8 @@ go_library( visibility = ["//visibility:private"], deps = [ "//src/rest-api/handler", - "@com_github_rabbitmq_amqp091_go//:amqp091-go", + "@com_github_segmentio_kafka_go//:kafka-go", + "@com_github_segmentio_kafka_go//sasl/plain", ], ) diff --git a/src/rest-api/cmd/main.go b/src/rest-api/cmd/main.go index f6d34e0..469957f 100644 --- a/src/rest-api/cmd/main.go +++ b/src/rest-api/cmd/main.go @@ -7,39 +7,31 @@ import ( "os" "github.com/Baitinq/fs-tracer-backend/src/rest-api/handler" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" ) func main() { - rabbitmq_password, ok := os.LookupEnv("RABBITMQ_PASSWORD") + kafka_password, ok := os.LookupEnv("KAFKA_PASSWORD") if !ok { - log.Fatal("RABBITMQ_PASSWORD not set") + log.Fatal("KAFKA_PASSWORD not set") } - log.Println("RabbitMQ password", rabbitmq_password) - conn, err := amqp.Dial(fmt.Sprintf("amqp://user:%s@rabbitmq:5672/", rabbitmq_password)) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - ch, err := conn.Channel() - if err != nil { - log.Fatal(err) + kafka_writer := &kafka.Writer{ + Addr: kafka.TCP("kafka.default.svc.cluster.local:9092"), + Transport: &kafka.Transport{ + SASL: plain.Mechanism{ + Username: "user1", + Password: kafka_password, + }, + }, + Topic: "topic-A", + Balancer: &kafka.LeastBytes{}, + // Async: true, //TODO: Creat the topic beforehand, if not this doesnt work + AllowAutoTopicCreation: true, } - defer ch.Close() - q, err := ch.QueueDeclare( - "hello", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Fatal(err) - } - handler := handler.NewHandler(ch, q.Name) + handler := handler.NewHandler(kafka_writer) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { diff --git a/src/rest-api/handler/BUILD.bazel b/src/rest-api/handler/BUILD.bazel index a5638f4..89adc69 100644 --- a/src/rest-api/handler/BUILD.bazel +++ b/src/rest-api/handler/BUILD.bazel @@ -5,5 +5,5 @@ go_library( srcs = ["handler.go"], importpath = "github.com/Baitinq/fs-tracer-backend/src/rest-api/handler", visibility = ["//visibility:public"], - deps = ["@com_github_rabbitmq_amqp091_go//:amqp091-go"], + deps = ["@com_github_segmentio_kafka_go//:kafka-go"], ) diff --git a/src/rest-api/handler/handler.go b/src/rest-api/handler/handler.go index b9bb80e..8985b43 100644 --- a/src/rest-api/handler/handler.go +++ b/src/rest-api/handler/handler.go @@ -8,18 +8,16 @@ import ( "net/http" "time" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" ) type Handler struct { - ch *amqp.Channel - queueName string + kafka_writer *kafka.Writer } -func NewHandler(ch *amqp.Channel, queueName string) Handler { +func NewHandler(kafka_writer *kafka.Writer) Handler { return Handler{ - ch: ch, - queueName: queueName, + kafka_writer: kafka_writer, } } @@ -34,11 +32,14 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - go h.ch.PublishWithContext(ctx, "", h.queueName, false, false, amqp.Publishing{ - ContentType: "text/plain", - Body: []byte(body), + err = h.kafka_writer.WriteMessages(ctx, kafka.Message{ + Key: []byte("key-A"), + Value: []byte(body), }) + if err != nil { + log.Fatal(err) + } fmt.Fprint(w, "Hello, World!", string(bytes)) - log.Println("Request received", r.RemoteAddr, string(bytes)) + log.Println("Request received :)", r.RemoteAddr, string(bytes)) } |