diff options
Diffstat (limited to 'src/payload-processor/processor')
-rw-r--r-- | src/payload-processor/processor/BUILD.bazel | 2 | ||||
-rw-r--r-- | src/payload-processor/processor/processor.go | 33 |
2 files changed, 15 insertions, 20 deletions
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel index 64e3ac2..123059d 100644 --- a/src/payload-processor/processor/BUILD.bazel +++ b/src/payload-processor/processor/BUILD.bazel @@ -5,5 +5,5 @@ go_library( srcs = ["processor.go"], importpath = "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor", visibility = ["//visibility:public"], - deps = ["@com_github_rabbitmq_amqp091_go//:amqp091-go"], + deps = ["@com_github_segmentio_kafka_go//:kafka-go"], ) diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go index 3eb089b..435a954 100644 --- a/src/payload-processor/processor/processor.go +++ b/src/payload-processor/processor/processor.go @@ -1,39 +1,34 @@ package processor import ( + "context" + "fmt" "log" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" ) type Processor struct { - ch *amqp.Channel - queueName string + kafka_reader *kafka.Reader } -func NewProcessor(ch *amqp.Channel, queueName string) Processor { +func NewProcessor(kafka_reader *kafka.Reader) Processor { log.Println("Created processor") return Processor{ - ch: ch, - queueName: queueName, + kafka_reader: kafka_reader, } } func (p Processor) ProcessMessages() { - msgs, err := p.ch.Consume( - p.queueName, // queue - "", // consumer - true, // auto-ack - false, // exclusive - false, // no-local - false, // no-wait - nil, // args - ) - if err != nil { - log.Fatal("Failed to register a consumer:", err) + for { + m, err := p.kafka_reader.ReadMessage(context.Background()) + if err != nil { + break + } + fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) } - for msg := range msgs { - log.Printf("Received a message: %s", msg.Body) + if err := p.kafka_reader.Close(); err != nil { + log.Fatal("failed to close reader:", err) } } |