about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <[email protected]>2024-06-11 23:35:44 +0200
committerBaitinq <[email protected]>2024-06-11 23:35:44 +0200
commite91dcce094fda9640b4515997e64059e056678b6 (patch)
tree88700abf637dc76b0a1869533e9256829c6f9fea
parentfs-tracer: cleanup (diff)
downloadfs-tracer-e91dcce094fda9640b4515997e64059e056678b6.tar.gz
fs-tracer-e91dcce094fda9640b4515997e64059e056678b6.tar.bz2
fs-tracer-e91dcce094fda9640b4515997e64059e056678b6.zip
fs-tracer: fix threads not exiting
-rw-r--r--fs-tracer/Cargo.toml1
-rw-r--r--fs-tracer/src/main.rs18
-rw-r--r--fs-tracer/src/syscall_handler.rs7
3 files changed, 16 insertions, 10 deletions
diff --git a/fs-tracer/Cargo.toml b/fs-tracer/Cargo.toml
index 68c318c..94f2a53 100644
--- a/fs-tracer/Cargo.toml
+++ b/fs-tracer/Cargo.toml
@@ -18,6 +18,7 @@ ureq = "2.9.7"
 ctrlc = "3.4.4"
 futures = "0.3.30"
 chrono = "0.4.38"
+crossbeam-channel = "0.5.13"
 
 [[bin]]
 name = "fs-tracer"
diff --git a/fs-tracer/src/main.rs b/fs-tracer/src/main.rs
index bb4b6db..d1adc19 100644
--- a/fs-tracer/src/main.rs
+++ b/fs-tracer/src/main.rs
@@ -73,21 +73,19 @@ async fn main() -> Result<(), anyhow::Error> {
     })
     .expect("could not set Ctrl+C handler");
 
-    let (resolved_files_send, resolved_files_recv) = mpsc::channel();
+    let (resolved_files_send, resolved_files_recv) = crossbeam_channel::unbounded();
 
     // Create arcmutex for the syscall handler in order to use it in threads
-    let syscall_handler = Arc::new(Mutex::new(syscall_handler::SyscallHandler::new(
-        resolved_files_send,
-    )));
 
     let mut handles = vec![];
     let mut perf_array = AsyncPerfEventArray::try_from(bpf.take_map("EVENTS").unwrap())?;
     for cpu_id in online_cpus()? {
         let mut buf = perf_array.open(cpu_id, None)?;
 
-        let thread_syscall_handler = syscall_handler.clone();
         let thread_exit = exit.clone();
-        handles.push(task::spawn(async move {
+        let thread_sender = resolved_files_send.clone();
+        handles.push(tokio::spawn(async move {
+            let mut syscall_handler = syscall_handler::SyscallHandler::new(thread_sender);
             let mut buffers = (0..10)
                 .map(|_| BytesMut::with_capacity(1024))
                 .collect::<Vec<_>>();
@@ -101,15 +99,16 @@ async fn main() -> Result<(), anyhow::Error> {
                     }
                     let ptr = buf.as_ptr() as *const SyscallInfo;
                     let data = unsafe { ptr.read_unaligned() };
-                    thread_syscall_handler.lock().unwrap().handle_syscall(data);
+                    syscall_handler.handle_syscall(data);
                 }
             }
         }));
     }
 
-    info!("Waiting for threads to stop");
+    drop(resolved_files_send);
+
     let mut batched_req = vec![];
-    for elt in resolved_files_recv {
+    for elt in &resolved_files_recv {
         batched_req.push(elt);
         // Batching. TODO: we can probably increase this value but we need to increase max message
         // in kafka or compress or smth. We should probably batch taking into account the message
@@ -118,6 +117,7 @@ async fn main() -> Result<(), anyhow::Error> {
         if batched_req.len() < 4000 {
             continue;
         }
+
         let request_body = format!("[{}]", batched_req.join(","));
         //TODO: Retries
         let resp = ureq::post(&url)
diff --git a/fs-tracer/src/syscall_handler.rs b/fs-tracer/src/syscall_handler.rs
index 12c218e..1148575 100644
--- a/fs-tracer/src/syscall_handler.rs
+++ b/fs-tracer/src/syscall_handler.rs
@@ -1,10 +1,12 @@
-use std::{collections::HashMap, ffi::CStr, sync::mpsc::Sender};
+use crossbeam_channel::Sender;
+use std::{collections::HashMap, ffi::CStr};
 
 use fs_tracer_common::SyscallInfo;
 
 pub struct SyscallHandler {
     resolved_files: Sender<String>,
     open_files: HashMap<i32, String>,
+    total: u64,
 }
 
 impl SyscallHandler {
@@ -12,6 +14,7 @@ impl SyscallHandler {
         Self {
             resolved_files,
             open_files: HashMap::new(),
+            total: 0,
         }
     }
 
@@ -43,6 +46,8 @@ impl SyscallHandler {
                     filename,
                     contents,
                 ));
+                self.total += 1;
+                println!("Total: {:?}", self.total);
                 return 0;
             }
             SyscallInfo::Open(x) => {