about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-04 21:22:54 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-04 21:22:54 +0200
commit642195d99e0e8fa862ec476573a7064d91ffce36 (patch)
treeb178f045fbf6730161b9f4335604df1825f14c23
parentpayload-processor: add concurrency (diff)
downloadfs-tracer-backend-642195d99e0e8fa862ec476573a7064d91ffce36.tar.gz
fs-tracer-backend-642195d99e0e8fa862ec476573a7064d91ffce36.tar.bz2
fs-tracer-backend-642195d99e0e8fa862ec476573a7064d91ffce36.zip
payload-processor: set up testing
-rw-r--r--MODULE.bazel1
-rw-r--r--go.mod9
-rw-r--r--go.sum4
-rw-r--r--src/payload-processor/processor/BUILD.bazel12
-rw-r--r--src/payload-processor/processor/processor.go11
-rw-r--r--src/payload-processor/processor/processor_test.go18
6 files changed, 51 insertions, 4 deletions
diff --git a/MODULE.bazel b/MODULE.bazel
index caa1d1b..c4a151b 100644
--- a/MODULE.bazel
+++ b/MODULE.bazel
@@ -28,4 +28,5 @@ go_deps.from_file(go_mod = "//:go.mod")
 use_repo(
     go_deps,
     "com_github_segmentio_kafka_go",
+    "com_github_stretchr_testify",
 )
diff --git a/go.mod b/go.mod
index f42c9cf..855f601 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,14 @@ module github.com/Baitinq/fs-tracer-backend
 go 1.22.2
 
 require (
+	github.com/segmentio/kafka-go v0.4.47
+	github.com/stretchr/testify v1.9.0
+)
+
+require (
+	github.com/davecgh/go-spew v1.1.1 // indirect
 	github.com/klauspost/compress v1.15.9 // indirect
 	github.com/pierrec/lz4/v4 v4.1.15 // indirect
-	github.com/segmentio/kafka-go v0.4.47
+	github.com/pmezard/go-difflib v1.0.0 // indirect
+	gopkg.in/yaml.v3 v3.0.1 // indirect
 )
diff --git a/go.sum b/go.sum
index 3c1b808..ed6fcfd 100644
--- a/go.sum
+++ b/go.sum
@@ -12,8 +12,9 @@ github.com/segmentio/kafka-go v0.4.47/go.mod h1:HjF6XbOKh0Pjlkr5GVZxt6CsjjwnmhVO
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
-github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
+github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
+github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
 github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
 github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
 github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
@@ -62,6 +63,7 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn
 golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
 golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
 gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel
index 123059d..f957fd3 100644
--- a/src/payload-processor/processor/BUILD.bazel
+++ b/src/payload-processor/processor/BUILD.bazel
@@ -1,4 +1,4 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "processor",
@@ -7,3 +7,13 @@ go_library(
     visibility = ["//visibility:public"],
     deps = ["@com_github_segmentio_kafka_go//:kafka-go"],
 )
+
+go_test(
+    name = "processor_test",
+    srcs = ["processor_test.go"],
+    embed = [":processor"],
+    deps = [
+        "@com_github_segmentio_kafka_go//:kafka-go",
+        "@com_github_stretchr_testify//require",
+    ],
+)
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index f2e4e80..fc6792a 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -63,7 +63,16 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			cancel()
 			break
 		}
-		fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value))
+		err = p.handleMessage(m)
+		if err != nil {
+			log.Println("failed to handle message:", err)
+			continue
+		}
 		p.kafka_reader.CommitMessages(ctx, m)
 	}
 }
+
+func (p Processor) handleMessage(m kafka.Message) error {
+	fmt.Printf("(%s): message at offset %d: %s = %s\n", time.Now().String(), m.Offset, string(m.Key), string(m.Value))
+	return nil
+}
diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go
new file mode 100644
index 0000000..ee2ef5f
--- /dev/null
+++ b/src/payload-processor/processor/processor_test.go
@@ -0,0 +1,18 @@
+package processor
+
+import (
+	"testing"
+
+	"github.com/segmentio/kafka-go"
+	"github.com/stretchr/testify/require"
+)
+
+func TestProcessMessage(t *testing.T) {
+	processor := Processor{}
+
+	message := []byte("test")
+
+	err := processor.handleMessage(kafka.Message{Value: message})
+
+	require.NoError(t, err)
+}