about summary refs log tree commit diff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/payload-processor/processor/BUILD.bazel12
-rw-r--r--src/payload-processor/processor/processor.go11
-rw-r--r--src/payload-processor/processor/processor_test.go18
3 files changed, 39 insertions, 2 deletions
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel
index 123059d..f957fd3 100644
--- a/src/payload-processor/processor/BUILD.bazel
+++ b/src/payload-processor/processor/BUILD.bazel
@@ -1,4 +1,4 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "processor",
@@ -7,3 +7,13 @@ go_library(
     visibility = ["//visibility:public"],
     deps = ["@com_github_segmentio_kafka_go//:kafka-go"],
 )
+
+go_test(
+    name = "processor_test",
+    srcs = ["processor_test.go"],
+    embed = [":processor"],
+    deps = [
+        "@com_github_segmentio_kafka_go//:kafka-go",
+        "@com_github_stretchr_testify//require",
+    ],
+)
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index f2e4e80..fc6792a 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -63,7 +63,16 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			cancel()
 			break
 		}
-		fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value))
+		err = p.handleMessage(m)
+		if err != nil {
+			log.Println("failed to handle message:", err)
+			continue
+		}
 		p.kafka_reader.CommitMessages(ctx, m)
 	}
 }
+
+func (p Processor) handleMessage(m kafka.Message) error {
+	fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value))
+	return nil
+}
diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go
new file mode 100644
index 0000000..ee2ef5f
--- /dev/null
+++ b/src/payload-processor/processor/processor_test.go
@@ -0,0 +1,18 @@
+package processor
+
+import (
+	"testing"
+
+	"github.com/segmentio/kafka-go"
+	"github.com/stretchr/testify/require"
+)
+
+func TestProcessMessage(t *testing.T) {
+	processor := Processor{}
+
+	message := []byte("test")
+
+	err := processor.handleMessage(kafka.Message{Value: message})
+
+	require.NoError(t, err)
+}