about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <[email protected]>2024-06-12 23:47:56 +0200
committerBaitinq <[email protected]>2024-06-12 23:48:30 +0200
commited7180c62b905c816ee628fac8dce1bc2d48b7c0 (patch)
treeea86a01c289eda772b3e63650e357a8c37910886
parentfs-tracer: use hashmap with ttl for storing syscall state (diff)
downloadfs-tracer-ed7180c62b905c816ee628fac8dce1bc2d48b7c0.tar.gz
fs-tracer-ed7180c62b905c816ee628fac8dce1bc2d48b7c0.tar.bz2
fs-tracer-ed7180c62b905c816ee628fac8dce1bc2d48b7c0.zip
fs-tracer: batch messages acoording to kafka max message size
-rw-r--r--fs-tracer/src/main.rs28
1 files changed, 15 insertions, 13 deletions
diff --git a/fs-tracer/src/main.rs b/fs-tracer/src/main.rs
index ca1c914..ba5f77c 100644
--- a/fs-tracer/src/main.rs
+++ b/fs-tracer/src/main.rs
@@ -8,8 +8,10 @@ use bytes::BytesMut;
 use fs_tracer_common::SyscallInfo;
 use log::{debug, info, warn};
 use std::env;
-use std::sync::atomic::{AtomicBool, Ordering};
-use std::sync::Arc;
+use std::sync::{
+    atomic::{AtomicBool, Ordering},
+    Arc,
+};
 
 #[tokio::main]
 async fn main() -> Result<(), anyhow::Error> {
@@ -103,26 +105,26 @@ async fn main() -> Result<(), anyhow::Error> {
 
     drop(resolved_files_send);
 
-    let mut batched_req = vec![];
+    let mut request_body = String::from("[");
     for elt in &resolved_files_recv {
-        batched_req.push(elt);
-        // Batching. TODO: we can probably increase this value but we need to increase max message
-        // in kafka or compress or smth. We should probably batch taking into account the message
-        // size.
-        if batched_req.len() < 2700 {
+        request_body.push_str(&elt);
+        request_body.push_str(",");
+        // 1000000 bytes = 1MB (max kafka message size)
+        if request_body.len() < 999999 {
             continue;
         }
 
-        let request_body = format!("[{}]", batched_req.join(","));
+        request_body.pop(); // remove trailing ','
+        request_body.push_str("]");
         send_request(&url, &fs_tracer_api_key, &request_body);
-        info!("SENT REQUEST! {:?}, {:?}", batched_req.len(), request_body);
-        batched_req.clear();
+        info!("SENT REQUEST! {:?}", request_body);
+        request_body = String::from("[");
     }
 
     info!("All threads stopped, exiting now...");
 
-    if !batched_req.is_empty() {
-        let request_body = format!("[{}]", batched_req.join(","));
+    if !request_body.len() > 1 {
+        request_body.push_str("]");
         send_request(&url, &fs_tracer_api_key, &request_body);
     }