diff options
| author | Baitinq <[email protected]> | 2024-06-11 23:35:44 +0200 |
|---|---|---|
| committer | Baitinq <[email protected]> | 2024-06-11 23:35:44 +0200 |
| commit | e91dcce094fda9640b4515997e64059e056678b6 (patch) | |
| tree | 88700abf637dc76b0a1869533e9256829c6f9fea | |
| parent | fs-tracer: cleanup (diff) | |
| download | fs-tracer-e91dcce094fda9640b4515997e64059e056678b6.tar.gz fs-tracer-e91dcce094fda9640b4515997e64059e056678b6.tar.bz2 fs-tracer-e91dcce094fda9640b4515997e64059e056678b6.zip | |
fs-tracer: fix threads not exiting
| -rw-r--r-- | fs-tracer/Cargo.toml | 1 | ||||
| -rw-r--r-- | fs-tracer/src/main.rs | 18 | ||||
| -rw-r--r-- | fs-tracer/src/syscall_handler.rs | 7 |
3 files changed, 16 insertions, 10 deletions
diff --git a/fs-tracer/Cargo.toml b/fs-tracer/Cargo.toml index 68c318c..94f2a53 100644 --- a/fs-tracer/Cargo.toml +++ b/fs-tracer/Cargo.toml @@ -18,6 +18,7 @@ ureq = "2.9.7" ctrlc = "3.4.4" futures = "0.3.30" chrono = "0.4.38" +crossbeam-channel = "0.5.13" [[bin]] name = "fs-tracer" diff --git a/fs-tracer/src/main.rs b/fs-tracer/src/main.rs index bb4b6db..d1adc19 100644 --- a/fs-tracer/src/main.rs +++ b/fs-tracer/src/main.rs @@ -73,21 +73,19 @@ async fn main() -> Result<(), anyhow::Error> { }) .expect("could not set Ctrl+C handler"); - let (resolved_files_send, resolved_files_recv) = mpsc::channel(); + let (resolved_files_send, resolved_files_recv) = crossbeam_channel::unbounded(); // Create arcmutex for the syscall handler in order to use it in threads - let syscall_handler = Arc::new(Mutex::new(syscall_handler::SyscallHandler::new( - resolved_files_send, - ))); let mut handles = vec![]; let mut perf_array = AsyncPerfEventArray::try_from(bpf.take_map("EVENTS").unwrap())?; for cpu_id in online_cpus()? { let mut buf = perf_array.open(cpu_id, None)?; - let thread_syscall_handler = syscall_handler.clone(); let thread_exit = exit.clone(); - handles.push(task::spawn(async move { + let thread_sender = resolved_files_send.clone(); + handles.push(tokio::spawn(async move { + let mut syscall_handler = syscall_handler::SyscallHandler::new(thread_sender); let mut buffers = (0..10) .map(|_| BytesMut::with_capacity(1024)) .collect::<Vec<_>>(); @@ -101,15 +99,16 @@ async fn main() -> Result<(), anyhow::Error> { } let ptr = buf.as_ptr() as *const SyscallInfo; let data = unsafe { ptr.read_unaligned() }; - thread_syscall_handler.lock().unwrap().handle_syscall(data); + syscall_handler.handle_syscall(data); } } })); } - info!("Waiting for threads to stop"); + drop(resolved_files_send); + let mut batched_req = vec![]; - for elt in resolved_files_recv { + for elt in &resolved_files_recv { batched_req.push(elt); // Batching. TODO: we can probably increase this value but we need to increase max message // in kafka or compress or smth. We should probably batch taking into account the message @@ -118,6 +117,7 @@ async fn main() -> Result<(), anyhow::Error> { if batched_req.len() < 4000 { continue; } + let request_body = format!("[{}]", batched_req.join(",")); //TODO: Retries let resp = ureq::post(&url) diff --git a/fs-tracer/src/syscall_handler.rs b/fs-tracer/src/syscall_handler.rs index 12c218e..1148575 100644 --- a/fs-tracer/src/syscall_handler.rs +++ b/fs-tracer/src/syscall_handler.rs @@ -1,10 +1,12 @@ -use std::{collections::HashMap, ffi::CStr, sync::mpsc::Sender}; +use crossbeam_channel::Sender; +use std::{collections::HashMap, ffi::CStr}; use fs_tracer_common::SyscallInfo; pub struct SyscallHandler { resolved_files: Sender<String>, open_files: HashMap<i32, String>, + total: u64, } impl SyscallHandler { @@ -12,6 +14,7 @@ impl SyscallHandler { Self { resolved_files, open_files: HashMap::new(), + total: 0, } } @@ -43,6 +46,8 @@ impl SyscallHandler { filename, contents, )); + self.total += 1; + println!("Total: {:?}", self.total); return 0; } SyscallInfo::Open(x) => { |