about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-28 23:39:41 +0200
committerBaitinq <manuelpalenzuelamerino@gmail.com>2024-05-29 00:26:27 +0200
commit06465c85dbb29c8ed6b8809a34ceb44fcafe2beb (patch)
tree134880edab0a0a76be329a57f4800f587bb10aee
parentsupabase: add api_keys table migration (diff)
downloadfs-tracer-backend-06465c85dbb29c8ed6b8809a34ceb44fcafe2beb.tar.gz
fs-tracer-backend-06465c85dbb29c8ed6b8809a34ceb44fcafe2beb.tar.bz2
fs-tracer-backend-06465c85dbb29c8ed6b8809a34ceb44fcafe2beb.zip
rest-api: handle payloads with an api_key
-rw-r--r--TODO2
-rwxr-xr-xrequests_examples.sh5
-rw-r--r--src/payload-processor/processor/BUILD.bazel1
-rw-r--r--src/payload-processor/processor/db.go5
-rw-r--r--src/payload-processor/processor/mock_db.go8
-rw-r--r--src/payload-processor/processor/processor.go26
-rw-r--r--src/payload-processor/processor/processor_test.go6
-rw-r--r--src/rest-api/handler/db.go29
-rw-r--r--src/rest-api/handler/handler.go30
-rw-r--r--src/rest-api/handler/handler_test.go6
-rw-r--r--src/rest-api/handler/mock_db.go23
11 files changed, 106 insertions, 35 deletions
diff --git a/TODO b/TODO
index 07f3a1d..1adf4ef 100644
--- a/TODO
+++ b/TODO
@@ -1,4 +1,4 @@
 maybe we can remove helmsman with bazel
 better managing of secrets (maybe we shouldnt have environment variables)
-add db migrations
+test kafka with interface
 nixos-k3s repo
diff --git a/requests_examples.sh b/requests_examples.sh
new file mode 100755
index 0000000..45ef59d
--- /dev/null
+++ b/requests_examples.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+curl -H "API_KEY: ${API_KEY}" -X POST -d '{"timestamp": "2017-01-02T15:04:05Z"}' http://leunam.dev:9999/file/
+
+curl -H "API_KEY: ${API_KEY}" -X GET http://leunam.dev:9999/file/
diff --git a/src/payload-processor/processor/BUILD.bazel b/src/payload-processor/processor/BUILD.bazel
index 411300d..384c88b 100644
--- a/src/payload-processor/processor/BUILD.bazel
+++ b/src/payload-processor/processor/BUILD.bazel
@@ -11,7 +11,6 @@ go_library(
     visibility = ["//visibility:public"],
     deps = [
         "//lib",
-        "@com_github_google_uuid//:uuid",
         "@com_github_jmoiron_sqlx//:sqlx",
         "@com_github_segmentio_kafka_go//:kafka-go",
         "@org_uber_go_mock//gomock",
diff --git a/src/payload-processor/processor/db.go b/src/payload-processor/processor/db.go
index 45ec484..2200937 100644
--- a/src/payload-processor/processor/db.go
+++ b/src/payload-processor/processor/db.go
@@ -9,7 +9,7 @@ import (
 
 //go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE
 type DB interface {
-	InsertFile(ctx context.Context, file lib.File) error
+	InsertFile(ctx context.Context, file lib.File, user_id string) error
 }
 
 type DBImpl struct {
@@ -22,7 +22,8 @@ func NewDB(db *sqlx.DB) DB {
 	return &DBImpl{db: db}
 }
 
-func (db DBImpl) InsertFile(ctx context.Context, file lib.File) error {
+func (db DBImpl) InsertFile(ctx context.Context, file lib.File, user_id string) error {
+	file.User_id = user_id
 	_, err := db.db.NamedExecContext(ctx, "INSERT INTO private.file (user_id, absolute_path, contents, timestamp) VALUES (:user_id, :absolute_path, :contents, :timestamp)", file)
 	return err
 }
diff --git a/src/payload-processor/processor/mock_db.go b/src/payload-processor/processor/mock_db.go
index 1b283bd..f6347d7 100644
--- a/src/payload-processor/processor/mock_db.go
+++ b/src/payload-processor/processor/mock_db.go
@@ -41,15 +41,15 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder {
 }
 
 // InsertFile mocks base method.
-func (m *MockDB) InsertFile(ctx context.Context, file lib.File) error {
+func (m *MockDB) InsertFile(ctx context.Context, file lib.File, user_id string) error {
 	m.ctrl.T.Helper()
-	ret := m.ctrl.Call(m, "InsertFile", ctx, file)
+	ret := m.ctrl.Call(m, "InsertFile", ctx, file, user_id)
 	ret0, _ := ret[0].(error)
 	return ret0
 }
 
 // InsertFile indicates an expected call of InsertFile.
-func (mr *MockDBMockRecorder) InsertFile(ctx, file any) *gomock.Call {
+func (mr *MockDBMockRecorder) InsertFile(ctx, file, user_id any) *gomock.Call {
 	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InsertFile", reflect.TypeOf((*MockDB)(nil).InsertFile), ctx, file, user_id)
 }
diff --git a/src/payload-processor/processor/processor.go b/src/payload-processor/processor/processor.go
index ab128cb..1203fea 100644
--- a/src/payload-processor/processor/processor.go
+++ b/src/payload-processor/processor/processor.go
@@ -12,7 +12,6 @@ import (
 	"time"
 
 	"github.com/Baitinq/fs-tracer-backend/lib"
-	"github.com/google/uuid"
 	"github.com/jmoiron/sqlx"
 	"github.com/segmentio/kafka-go"
 )
@@ -70,17 +69,21 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 			break
 		}
 
+		user_id, err := getHeaderValue(m.Headers, "user_id")
+		if err != nil {
+			log.Fatal("failed to get user_id from headers:", err)
+		}
+
 		// TODO: Remove after testing
 		if string(m.Value) == "" {
 			m.Value = []byte(fmt.Sprintf(`{
-				"user_id": "%s",
 				"absolute_path": "/home/user/file.txt",
 				"contents": "Hello, World!",
 				"timestamp": "%s"
-			}`, uuid.New(), time.Now().Format(time.RFC3339)))
+			}`, time.Now().Format(time.RFC3339)))
 		}
 
-		err = p.handleMessage(ctx, m)
+		err = p.handleMessage(ctx, m, string(user_id))
 		if err != nil {
 			log.Println("failed to handle message:", err)
 			p.handleError(ctx, m, err)
@@ -90,8 +93,8 @@ func (p Processor) process(ctx context.Context, cancel context.CancelFunc) {
 	}
 }
 
-func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
-	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value))
+func (p Processor) handleMessage(ctx context.Context, m kafka.Message, user_id string) error {
+	fmt.Printf("(%s): message at paritition %d: offset %d: %s = %s, user_id = %s\n", time.Now().String(), m.Partition, m.Offset, string(m.Key), string(m.Value), user_id)
 
 	var file lib.File
 	err := json.Unmarshal(m.Value, &file)
@@ -99,7 +102,7 @@ func (p Processor) handleMessage(ctx context.Context, m kafka.Message) error {
 		return err
 	}
 
-	err = p.db.InsertFile(ctx, file)
+	err = p.db.InsertFile(ctx, file, user_id)
 	if err != nil {
 		return err
 	}
@@ -113,3 +116,12 @@ func (p Processor) handleError(ctx context.Context, m kafka.Message, err error)
 		p.kafka_reader.CommitMessages(ctx, m)
 	}
 }
+
+func getHeaderValue(headers []kafka.Header, key string) ([]byte, error) {
+	for _, header := range headers {
+		if header.Key == key {
+			return header.Value, nil
+		}
+	}
+	return []byte{}, fmt.Errorf("Header %s not found", key)
+}
diff --git a/src/payload-processor/processor/processor_test.go b/src/payload-processor/processor/processor_test.go
index 39e965e..4a55e48 100644
--- a/src/payload-processor/processor/processor_test.go
+++ b/src/payload-processor/processor/processor_test.go
@@ -20,7 +20,6 @@ func TestProcessMessage(t *testing.T) {
 
 	message := []byte(`
 	{
-		"user_id": "1",
 		"absolute_path": "/tmp/file.txt",
 		"contents": "hello world",
 		"timestamp": "2021-01-01T00:00:00Z"
@@ -30,13 +29,12 @@ func TestProcessMessage(t *testing.T) {
 	ctx := context.Background()
 
 	mockdb.EXPECT().InsertFile(ctx, lib.File{
-		User_id:       "1",
 		Absolute_path: "/tmp/file.txt",
 		Contents:      "hello world",
 		Timestamp:     time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC),
-	}).Return(nil)
+	}, "USER_ID").Return(nil)
 
-	err := processor.handleMessage(ctx, kafka.Message{Value: message})
+	err := processor.handleMessage(ctx, kafka.Message{Value: message}, "USER_ID")
 
 	require.NoError(t, err)
 }
diff --git a/src/rest-api/handler/db.go b/src/rest-api/handler/db.go
index 0093c41..49d8e35 100644
--- a/src/rest-api/handler/db.go
+++ b/src/rest-api/handler/db.go
@@ -9,7 +9,8 @@ import (
 
 //go:generate mockgen -source=$GOFILE -package=$GOPACKAGE -destination=mock_$GOFILE
 type DB interface {
-	GetLatestFileByPath(ctx context.Context, path string) (*lib.File, error)
+	GetLatestFileByPath(ctx context.Context, path string, user_id string) (*lib.File, error)
+	GetUserIDByAPIKey(ctx context.Context, apiKey string) (string, error)
 }
 
 type DBImpl struct {
@@ -22,16 +23,36 @@ func NewDB(db *sqlx.DB) DB {
 	return &DBImpl{db: db}
 }
 
-func (db DBImpl) GetLatestFileByPath(ctx context.Context, path string) (*lib.File, error) {
+func (db DBImpl) GetLatestFileByPath(ctx context.Context, path string, user_id string) (*lib.File, error) {
 	var file lib.File
 	err := db.db.GetContext(ctx, &file, `
 		SELECT * FROM private.file
-		WHERE absolute_path = $1
+		WHERE
+			user_id = $1
+			AND absolute_path = $2
 		ORDER BY timestamp DESC
 		LIMIT 1
-	`, path)
+	`, user_id, path)
 	if err != nil {
 		return nil, err
 	}
 	return &file, nil
 }
+
+// TODO: Add test
+func (db DBImpl) GetUserIDByAPIKey(ctx context.Context, apiKey string) (string, error) {
+	if len(apiKey) != 44 {
+		return "", nil
+	}
+
+	var userID string
+	err := db.db.GetContext(ctx, &userID, `
+		SELECT id FROM private.api_keys
+		WHERE api_key = $1
+		LIMIT 1
+	`, apiKey)
+	if err != nil {
+		return "", err
+	}
+	return userID, nil
+}
diff --git a/src/rest-api/handler/handler.go b/src/rest-api/handler/handler.go
index 558e773..4b9a426 100644
--- a/src/rest-api/handler/handler.go
+++ b/src/rest-api/handler/handler.go
@@ -27,17 +27,33 @@ func NewHandler(db *sqlx.DB, kafka_writer *kafka.Writer) Handler {
 }
 
 func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	api_key := r.Header.Get("API_KEY")
+
+	log.Println("API KEY: ", api_key)
+
+	user_id, err := h.db.GetUserIDByAPIKey(r.Context(), api_key)
+	if err != nil {
+		http.Error(w, fmt.Sprintf("Internal server error: %s", err), http.StatusInternalServerError)
+		return
+	}
+	if user_id == "" {
+		http.Error(w, "Unauthorized", http.StatusUnauthorized)
+		return
+	}
+
+	log.Println("User ID: ", user_id)
+
 	switch r.Method {
 	case http.MethodGet:
-		h.handleGet(w, r)
+		h.handleGet(w, r, user_id)
 	case http.MethodPost:
-		h.handlePost(w, r)
+		h.handlePost(w, r, user_id)
 	default:
 		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
 	}
 }
 
-func (h Handler) handleGet(w http.ResponseWriter, r *http.Request) {
+func (h Handler) handleGet(w http.ResponseWriter, r *http.Request, user_id string) {
 	_, filePath, ok := strings.Cut(r.URL.Path, "/file/")
 	if !ok {
 		http.Error(w, "Invalid file path", http.StatusBadRequest)
@@ -48,7 +64,7 @@ func (h Handler) handleGet(w http.ResponseWriter, r *http.Request) {
 
 	ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
 	defer cancel()
-	file, err := h.db.GetLatestFileByPath(ctx, filePath)
+	file, err := h.db.GetLatestFileByPath(ctx, filePath, user_id)
 	if err != nil {
 		http.Error(w, fmt.Sprintf("Internal server error: %s", err), http.StatusInternalServerError)
 		return
@@ -57,7 +73,7 @@ func (h Handler) handleGet(w http.ResponseWriter, r *http.Request) {
 	fmt.Fprintln(w, "File: ", file)
 }
 
-func (h Handler) handlePost(w http.ResponseWriter, r *http.Request) {
+func (h Handler) handlePost(w http.ResponseWriter, r *http.Request, user_id string) {
 	bytes, err := io.ReadAll(io.Reader(r.Body))
 	if err != nil {
 		log.Fatal(err)
@@ -69,6 +85,10 @@ func (h Handler) handlePost(w http.ResponseWriter, r *http.Request) {
 	err = h.kafka_writer.WriteMessages(ctx, kafka.Message{
 		Key:   []byte("key"), //TODO:This routes to a partition. We should probably route by agent UUID TODO: wont this negate having multiple topics
 		Value: bytes,
+		Headers: []kafka.Header{{
+			Key:   "user_id",
+			Value: []byte(user_id),
+		}},
 	})
 	if err != nil {
 		log.Fatal(err)
diff --git a/src/rest-api/handler/handler_test.go b/src/rest-api/handler/handler_test.go
index 94b58e4..4709959 100644
--- a/src/rest-api/handler/handler_test.go
+++ b/src/rest-api/handler/handler_test.go
@@ -4,7 +4,6 @@ import (
 	"fmt"
 	"net/http"
 	"net/http/httptest"
-	"strings"
 	"testing"
 
 	"github.com/Baitinq/fs-tracer-backend/lib"
@@ -20,11 +19,12 @@ func TestHandleGet(t *testing.T) {
 	handler := Handler{db: db}
 
 	file := &lib.File{
+		User_id:       "USER_ID",
 		Absolute_path: "/tmp/file.txt",
 	}
-	db.EXPECT().GetLatestFileByPath(gomock.Any(), "/tmp/file.txt").Return(file, nil)
+	db.EXPECT().GetLatestFileByPath(gomock.Any(), "/tmp/file.txt", "USER_ID").Return(file, nil)
 
-	handler.handleGet(recorder, httptest.NewRequest(http.MethodGet, "/file/%2ftmp%2Ffile.txt", nil))
+	handler.handleGet(recorder, httptest.NewRequest(http.MethodGet, "/file/%2ftmp%2Ffile.txt", nil), "USER_ID")
 
 	require.Equal(t, http.StatusOK, recorder.Code)
 	require.Equal(t, fmt.Sprintln("File: ", file), recorder.Body.String())
diff --git a/src/rest-api/handler/mock_db.go b/src/rest-api/handler/mock_db.go
index 2d51a8f..542fd62 100644
--- a/src/rest-api/handler/mock_db.go
+++ b/src/rest-api/handler/mock_db.go
@@ -41,16 +41,31 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder {
 }
 
 // GetLatestFileByPath mocks base method.
-func (m *MockDB) GetLatestFileByPath(ctx context.Context, path string) (*lib.File, error) {
+func (m *MockDB) GetLatestFileByPath(ctx context.Context, path, user_id string) (*lib.File, error) {
 	m.ctrl.T.Helper()
-	ret := m.ctrl.Call(m, "GetLatestFileByPath", ctx, path)
+	ret := m.ctrl.Call(m, "GetLatestFileByPath", ctx, path, user_id)
 	ret0, _ := ret[0].(*lib.File)
 	ret1, _ := ret[1].(error)
 	return ret0, ret1
 }
 
 // GetLatestFileByPath indicates an expected call of GetLatestFileByPath.
-func (mr *MockDBMockRecorder) GetLatestFileByPath(ctx, path any) *gomock.Call {
+func (mr *MockDBMockRecorder) GetLatestFileByPath(ctx, path, user_id any) *gomock.Call {
 	mr.mock.ctrl.T.Helper()
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestFileByPath", reflect.TypeOf((*MockDB)(nil).GetLatestFileByPath), ctx, path)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLatestFileByPath", reflect.TypeOf((*MockDB)(nil).GetLatestFileByPath), ctx, path, user_id)
+}
+
+// GetUserIDByAPIKey mocks base method.
+func (m *MockDB) GetUserIDByAPIKey(ctx context.Context, apiKey string) (string, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "GetUserIDByAPIKey", ctx, apiKey)
+	ret0, _ := ret[0].(string)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// GetUserIDByAPIKey indicates an expected call of GetUserIDByAPIKey.
+func (mr *MockDBMockRecorder) GetUserIDByAPIKey(ctx, apiKey any) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUserIDByAPIKey", reflect.TypeOf((*MockDB)(nil).GetUserIDByAPIKey), ctx, apiKey)
 }