diff options
Diffstat (limited to 'src/payload-processor/cmd/main.go')
-rw-r--r-- | src/payload-processor/cmd/main.go | 49 |
1 files changed, 20 insertions, 29 deletions
diff --git a/src/payload-processor/cmd/main.go b/src/payload-processor/cmd/main.go index 1c9bfcb..8e158e9 100644 --- a/src/payload-processor/cmd/main.go +++ b/src/payload-processor/cmd/main.go @@ -1,43 +1,34 @@ package main import ( - "fmt" "log" "os" + "time" "github.com/Baitinq/fs-tracer-backend/src/payload-processor/processor" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" ) func main() { - rabbitmq_password, ok := os.LookupEnv("RABBITMQ_PASSWORD") + kafka_password, ok := os.LookupEnv("KAFKA_PASSWORD") if !ok { - log.Fatal("RABBITMQ_PASSWORD not set") + log.Fatal("KAFKA_PASSWORD not set") } - log.Println("RabbitMQ password", rabbitmq_password) - conn, err := amqp.Dial(fmt.Sprintf("amqp://user:%s@rabbitmq:5672/", rabbitmq_password)) - if err != nil { - log.Fatal(err) - } - defer conn.Close() - - ch, err := conn.Channel() - if err != nil { - log.Fatal(err) - } - defer ch.Close() - - q, err := ch.QueueDeclare( - "hello", // name - false, // durable - false, // delete when unused - false, // exclusive - false, // no-wait - nil, // arguments - ) - if err != nil { - log.Fatal(err) - } - processor := processor.NewProcessor(ch, q.Name) + kafka_reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{"kafka.default.svc.cluster.local:9092"}, + Dialer: &kafka.Dialer{ + SASLMechanism: plain.Mechanism{ + Username: "user1", + Password: kafka_password, + }, + Timeout: 10 * time.Second, + DualStack: true, + }, + Topic: "topic-A", + Partition: 0, //TODO: What + MaxBytes: 10e6, // 10MB + }) + processor := processor.NewProcessor(kafka_reader) processor.ProcessMessages() } |