From 2432234549a8b1c0b6c3848bf7a48e412580fa9e Mon Sep 17 00:00:00 2001 From: Baitinq Date: Thu, 2 May 2024 01:04:45 +0200 Subject: Properly detach bpf program and report dummy data to backend --- fs-tracer/Cargo.toml | 5 ++++- fs-tracer/src/main.rs | 44 +++++++++++++++++++++++++++++++++----------- 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/fs-tracer/Cargo.toml b/fs-tracer/Cargo.toml index b452bc5..3a74bf7 100644 --- a/fs-tracer/Cargo.toml +++ b/fs-tracer/Cargo.toml @@ -14,7 +14,10 @@ libc = "0.2" log = "0.4" bytes = "1.5.0" tokio = { version = "1.25", features = ["macros", "rt", "rt-multi-thread", "net", "signal"] } +ureq = "2.9.7" +ctrlc = "3.4.4" +futures = "0.3.30" [[bin]] name = "fs-tracer" -path = "src/main.rs" \ No newline at end of file +path = "src/main.rs" diff --git a/fs-tracer/src/main.rs b/fs-tracer/src/main.rs index db9e15d..0f8835f 100644 --- a/fs-tracer/src/main.rs +++ b/fs-tracer/src/main.rs @@ -1,5 +1,3 @@ -use std::ffi::CStr; - use aya::maps::AsyncPerfEventArray; use aya::programs::TracePoint; use aya::util::online_cpus; @@ -8,12 +6,19 @@ use aya_log::EbpfLogger; use bytes::BytesMut; use fs_tracer_common::SyscallInfo; use log::{debug, info, warn}; -use tokio::{signal, task}; +use std::env; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use tokio::task; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { env_logger::init(); + let fs_tracer_server_host = env::var("FS_TRACER_SERVER_HOST") + .expect("FS_TRACER_SERVER_HOST must be set"); + let url = format!("http://{fs_tracer_server_host}/payload"); + // Bump the memlock rlimit. This is needed for older kernels that don't use the // new memcg based accounting, see https://lwn.net/Articles/837122/ let rlim = libc::rlimit { @@ -44,7 +49,7 @@ async fn main() -> Result<(), anyhow::Error> { let trace_enters_program: &mut TracePoint = bpf.program_mut("fs_tracer_enter").unwrap().try_into()?; trace_enters_program.load()?; - trace_enters_program.attach("syscalls", "sys_enter_openat")?; + // trace_enters_program.attach("syscalls", "sys_enter_openat")?; trace_enters_program.attach("syscalls", "sys_enter_write")?; // program.attach("syscalls", "sys_exit_write")?; //trace_enters_program.attach("syscalls", "sys_enter_lseek")?; @@ -53,16 +58,27 @@ async fn main() -> Result<(), anyhow::Error> { let trace_exits_program: &mut TracePoint = bpf.program_mut("fs_tracer_exit").unwrap().try_into()?; trace_exits_program.load()?; - trace_exits_program.attach("syscalls", "sys_exit_openat")?; + // trace_exits_program.attach("syscalls", "sys_exit_openat")?; trace_exits_program.attach("syscalls", "sys_exit_write")?; println!("Num of cpus: {}", online_cpus()?.len()); + let exit = Arc::new(AtomicBool::new(false)); + let ctrlc_exit = exit.clone(); + ctrlc::set_handler(move || { + println!("received Ctrl+C!"); + ctrlc_exit.store(true, Ordering::Relaxed); + }) + .expect("could not set Ctrl+C handler"); + + let mut handles = vec![]; let mut perf_array = AsyncPerfEventArray::try_from(bpf.take_map("EVENTS").unwrap())?; for cpu_id in online_cpus()? { let mut buf = perf_array.open(cpu_id, None)?; - task::spawn(async move { + let thread_url = url.clone(); + let thread_exit = exit.clone(); + handles.push(task::spawn(async move { let mut buffers = (0..10) .map(|_| BytesMut::with_capacity(1024)) .collect::>(); @@ -70,11 +86,17 @@ async fn main() -> Result<(), anyhow::Error> { loop { let events = buf.read_events(&mut buffers).await.unwrap(); for buf in buffers.iter_mut().take(events.read) { + if thread_exit.load(Ordering::Relaxed) { + return; + } let ptr = buf.as_ptr() as *const SyscallInfo; let data = unsafe { ptr.read_unaligned() }; match data { SyscallInfo::Write(x) => { - println!("WRITE KERNEL: DATA {:?}", x) + println!("WRITE KERNEL: DATA {:?}", x); + let _ = ureq::post(thread_url.as_str()) + .send_string("hi world!") + .expect("Failed to send request"); } SyscallInfo::Open(x) => { // if !CStr::from_bytes_until_nul(&x.filename) @@ -89,12 +111,12 @@ async fn main() -> Result<(), anyhow::Error> { } } } - }); + })); } - info!("Waiting for Ctrl-C..."); - signal::ctrl_c().await?; - info!("Exiting..."); + info!("Waiting for threads to stop"); + futures::future::join_all(handles).await; + info!("All threads stopped, exiting now..."); Ok(()) } -- cgit 1.4.1