about summary refs log tree commit diff
path: root/src/rest-api/handler/handler.go
diff options
context:
space:
mode:
Diffstat (limited to 'src/rest-api/handler/handler.go')
-rw-r--r--src/rest-api/handler/handler.go21
1 files changed, 11 insertions, 10 deletions
diff --git a/src/rest-api/handler/handler.go b/src/rest-api/handler/handler.go
index b9bb80e..8985b43 100644
--- a/src/rest-api/handler/handler.go
+++ b/src/rest-api/handler/handler.go
@@ -8,18 +8,16 @@ import (
 	"net/http"
 	"time"
 
-	amqp "github.com/rabbitmq/amqp091-go"
+	"github.com/segmentio/kafka-go"
 )
 
 type Handler struct {
-	ch        *amqp.Channel
-	queueName string
+	kafka_writer *kafka.Writer
 }
 
-func NewHandler(ch *amqp.Channel, queueName string) Handler {
+func NewHandler(kafka_writer *kafka.Writer) Handler {
 	return Handler{
-		ch:        ch,
-		queueName: queueName,
+		kafka_writer: kafka_writer,
 	}
 }
 
@@ -34,11 +32,14 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
 	defer cancel()
 
-	go h.ch.PublishWithContext(ctx, "", h.queueName, false, false, amqp.Publishing{
-		ContentType: "text/plain",
-		Body:        []byte(body),
+	err = h.kafka_writer.WriteMessages(ctx, kafka.Message{
+		Key:   []byte("key-A"),
+		Value: []byte(body),
 	})
+	if err != nil {
+		log.Fatal(err)
+	}
 
 	fmt.Fprint(w, "Hello, World!", string(bytes))
-	log.Println("Request received", r.RemoteAddr, string(bytes))
+	log.Println("Request received :)", r.RemoteAddr, string(bytes))
 }