about summary refs log tree commit diff
diff options
context:
space:
mode:
authorBaitinq <[email protected]>2024-05-02 01:04:45 +0200
committerBaitinq <[email protected]>2024-05-02 01:04:45 +0200
commit2432234549a8b1c0b6c3848bf7a48e412580fa9e (patch)
treef2319131bb19af953837df2fbea33985142969c1
parentAdd tests (diff)
downloadfs-tracer-2432234549a8b1c0b6c3848bf7a48e412580fa9e.tar.gz
fs-tracer-2432234549a8b1c0b6c3848bf7a48e412580fa9e.tar.bz2
fs-tracer-2432234549a8b1c0b6c3848bf7a48e412580fa9e.zip
Properly detach bpf program and report dummy data to backend
-rw-r--r--fs-tracer/Cargo.toml5
-rw-r--r--fs-tracer/src/main.rs44
2 files changed, 37 insertions, 12 deletions
diff --git a/fs-tracer/Cargo.toml b/fs-tracer/Cargo.toml
index b452bc5..3a74bf7 100644
--- a/fs-tracer/Cargo.toml
+++ b/fs-tracer/Cargo.toml
@@ -14,7 +14,10 @@ libc = "0.2"
 log = "0.4"
 bytes = "1.5.0"
 tokio = { version = "1.25", features = ["macros", "rt", "rt-multi-thread", "net", "signal"] }
+ureq = "2.9.7"
+ctrlc = "3.4.4"
+futures = "0.3.30"
 
 [[bin]]
 name = "fs-tracer"
-path = "src/main.rs"
\ No newline at end of file
+path = "src/main.rs"
diff --git a/fs-tracer/src/main.rs b/fs-tracer/src/main.rs
index db9e15d..0f8835f 100644
--- a/fs-tracer/src/main.rs
+++ b/fs-tracer/src/main.rs
@@ -1,5 +1,3 @@
-use std::ffi::CStr;
-
 use aya::maps::AsyncPerfEventArray;
 use aya::programs::TracePoint;
 use aya::util::online_cpus;
@@ -8,12 +6,19 @@ use aya_log::EbpfLogger;
 use bytes::BytesMut;
 use fs_tracer_common::SyscallInfo;
 use log::{debug, info, warn};
-use tokio::{signal, task};
+use std::env;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::sync::Arc;
+use tokio::task;
 
 #[tokio::main]
 async fn main() -> Result<(), anyhow::Error> {
     env_logger::init();
 
+    let fs_tracer_server_host = env::var("FS_TRACER_SERVER_HOST")
+        .expect("FS_TRACER_SERVER_HOST must be set");
+    let url = format!("http://{fs_tracer_server_host}/payload");
+
     // Bump the memlock rlimit. This is needed for older kernels that don't use the
     // new memcg based accounting, see https://lwn.net/Articles/837122/
     let rlim = libc::rlimit {
@@ -44,7 +49,7 @@ async fn main() -> Result<(), anyhow::Error> {
     let trace_enters_program: &mut TracePoint =
         bpf.program_mut("fs_tracer_enter").unwrap().try_into()?;
     trace_enters_program.load()?;
-    trace_enters_program.attach("syscalls", "sys_enter_openat")?;
+    // trace_enters_program.attach("syscalls", "sys_enter_openat")?;
     trace_enters_program.attach("syscalls", "sys_enter_write")?;
     // program.attach("syscalls", "sys_exit_write")?;
     //trace_enters_program.attach("syscalls", "sys_enter_lseek")?;
@@ -53,16 +58,27 @@ async fn main() -> Result<(), anyhow::Error> {
     let trace_exits_program: &mut TracePoint =
         bpf.program_mut("fs_tracer_exit").unwrap().try_into()?;
     trace_exits_program.load()?;
-    trace_exits_program.attach("syscalls", "sys_exit_openat")?;
+    // trace_exits_program.attach("syscalls", "sys_exit_openat")?;
     trace_exits_program.attach("syscalls", "sys_exit_write")?;
 
     println!("Num of cpus: {}", online_cpus()?.len());
 
+    let exit = Arc::new(AtomicBool::new(false));
+    let ctrlc_exit = exit.clone();
+    ctrlc::set_handler(move || {
+        println!("received Ctrl+C!");
+        ctrlc_exit.store(true, Ordering::Relaxed);
+    })
+    .expect("could not set Ctrl+C handler");
+
+    let mut handles = vec![];
     let mut perf_array = AsyncPerfEventArray::try_from(bpf.take_map("EVENTS").unwrap())?;
     for cpu_id in online_cpus()? {
         let mut buf = perf_array.open(cpu_id, None)?;
 
-        task::spawn(async move {
+        let thread_url = url.clone();
+        let thread_exit = exit.clone();
+        handles.push(task::spawn(async move {
             let mut buffers = (0..10)
                 .map(|_| BytesMut::with_capacity(1024))
                 .collect::<Vec<_>>();
@@ -70,11 +86,17 @@ async fn main() -> Result<(), anyhow::Error> {
             loop {
                 let events = buf.read_events(&mut buffers).await.unwrap();
                 for buf in buffers.iter_mut().take(events.read) {
+                    if thread_exit.load(Ordering::Relaxed) {
+                        return;
+                    }
                     let ptr = buf.as_ptr() as *const SyscallInfo;
                     let data = unsafe { ptr.read_unaligned() };
                     match data {
                         SyscallInfo::Write(x) => {
-                            println!("WRITE KERNEL: DATA {:?}", x)
+                            println!("WRITE KERNEL: DATA {:?}", x);
+                            let _ = ureq::post(thread_url.as_str())
+                                .send_string("hi world!")
+                                .expect("Failed to send request");
                         }
                         SyscallInfo::Open(x) => {
                             // if !CStr::from_bytes_until_nul(&x.filename)
@@ -89,12 +111,12 @@ async fn main() -> Result<(), anyhow::Error> {
                     }
                 }
             }
-        });
+        }));
     }
 
-    info!("Waiting for Ctrl-C...");
-    signal::ctrl_c().await?;
-    info!("Exiting...");
+    info!("Waiting for threads to stop");
+    futures::future::join_all(handles).await;
+    info!("All threads stopped, exiting now...");
 
     Ok(())
 }