diff options
Diffstat (limited to 'src/rest-api/cmd/main.go')
-rw-r--r-- | src/rest-api/cmd/main.go | 42 |
1 files changed, 17 insertions, 25 deletions
diff --git a/src/rest-api/cmd/main.go b/src/rest-api/cmd/main.go index f6d34e0..469957f 100644 --- a/src/rest-api/cmd/main.go +++ b/src/rest-api/cmd/main.go @@ -7,39 +7,31 @@ import ( "os" "github.com/Baitinq/fs-tracer-backend/src/rest-api/handler" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" ) func main() { - rabbitmq_password, ok := os.LookupEnv("RABBITMQ_PASSWORD") + kafka_password, ok := os.LookupEnv("KAFKA_PASSWORD") if !ok { - log.Fatal("RABBITMQ_PASSWORD not set") + log.Fatal("KAFKA_PASSWORD not set") } - log.Println("RabbitMQ password", rabbitmq_password) - conn, err := amqp.Dial(fmt.Sprintf("amqp://user:%s@rabbitmq:5672/", rabbitmq_password)) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - ch, err := conn.Channel() - if err != nil { - log.Fatal(err) + kafka_writer := &kafka.Writer{ + Addr: kafka.TCP("kafka.default.svc.cluster.local:9092"), + Transport: &kafka.Transport{ + SASL: plain.Mechanism{ + Username: "user1", + Password: kafka_password, + }, + }, + Topic: "topic-A", + Balancer: &kafka.LeastBytes{}, + // Async: true, //TODO: Creat the topic beforehand, if not this doesnt work + AllowAutoTopicCreation: true, } - defer ch.Close() - q, err := ch.QueueDeclare( - "hello", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Fatal(err) - } - handler := handler.NewHandler(ch, q.Name) + handler := handler.NewHandler(kafka_writer) mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { |