diff options
Diffstat (limited to 'src/rest-api/handler/handler.go')
-rw-r--r-- | src/rest-api/handler/handler.go | 21 |
1 files changed, 11 insertions, 10 deletions
diff --git a/src/rest-api/handler/handler.go b/src/rest-api/handler/handler.go index b9bb80e..8985b43 100644 --- a/src/rest-api/handler/handler.go +++ b/src/rest-api/handler/handler.go @@ -8,18 +8,16 @@ import ( "net/http" "time" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" ) type Handler struct { - ch *amqp.Channel - queueName string + kafka_writer *kafka.Writer } -func NewHandler(ch *amqp.Channel, queueName string) Handler { +func NewHandler(kafka_writer *kafka.Writer) Handler { return Handler{ - ch: ch, - queueName: queueName, + kafka_writer: kafka_writer, } } @@ -34,11 +32,14 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) defer cancel() - go h.ch.PublishWithContext(ctx, "", h.queueName, false, false, amqp.Publishing{ - ContentType: "text/plain", - Body: []byte(body), + err = h.kafka_writer.WriteMessages(ctx, kafka.Message{ + Key: []byte("key-A"), + Value: []byte(body), }) + if err != nil { + log.Fatal(err) + } fmt.Fprint(w, "Hello, World!", string(bytes)) - log.Println("Request received", r.RemoteAddr, string(bytes)) + log.Println("Request received :)", r.RemoteAddr, string(bytes)) } |