From ed7180c62b905c816ee628fac8dce1bc2d48b7c0 Mon Sep 17 00:00:00 2001 From: Baitinq Date: Wed, 12 Jun 2024 23:47:56 +0200 Subject: fs-tracer: batch messages acoording to kafka max message size --- fs-tracer/src/main.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/fs-tracer/src/main.rs b/fs-tracer/src/main.rs index ca1c914..ba5f77c 100644 --- a/fs-tracer/src/main.rs +++ b/fs-tracer/src/main.rs @@ -8,8 +8,10 @@ use bytes::BytesMut; use fs_tracer_common::SyscallInfo; use log::{debug, info, warn}; use std::env; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { @@ -103,26 +105,26 @@ async fn main() -> Result<(), anyhow::Error> { drop(resolved_files_send); - let mut batched_req = vec![]; + let mut request_body = String::from("["); for elt in &resolved_files_recv { - batched_req.push(elt); - // Batching. TODO: we can probably increase this value but we need to increase max message - // in kafka or compress or smth. We should probably batch taking into account the message - // size. - if batched_req.len() < 2700 { + request_body.push_str(&elt); + request_body.push_str(","); + // 1000000 bytes = 1MB (max kafka message size) + if request_body.len() < 999999 { continue; } - let request_body = format!("[{}]", batched_req.join(",")); + request_body.pop(); // remove trailing ',' + request_body.push_str("]"); send_request(&url, &fs_tracer_api_key, &request_body); - info!("SENT REQUEST! {:?}, {:?}", batched_req.len(), request_body); - batched_req.clear(); + info!("SENT REQUEST! {:?}", request_body); + request_body = String::from("["); } info!("All threads stopped, exiting now..."); - if !batched_req.is_empty() { - let request_body = format!("[{}]", batched_req.join(",")); + if !request_body.len() > 1 { + request_body.push_str("]"); send_request(&url, &fs_tracer_api_key, &request_body); } -- cgit 1.4.1