Skip to content
Commits on Source (3)
......@@ -94,9 +94,9 @@ dependencies = [
[[package]]
name = "clap"
version = "3.0.0-rc.11"
version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6f243c7279f09ffed852a0a564c72091331651484cdbb32b7287f16df8611a7"
checksum = "7a30c3bf9ff12dfe5dae53f0a96e0febcd18420d1c0e7fad77796d9d5c4b5375"
dependencies = [
"atty",
"bitflags",
......@@ -120,9 +120,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa"
dependencies = [
"cfg-if 1.0.0",
"crossbeam-utils",
......@@ -130,9 +130,9 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
dependencies = [
"cfg-if 1.0.0",
"lazy_static",
......@@ -179,28 +179,16 @@ version = "0.1.0"
dependencies = [
"atomic",
"clap",
"gpt",
"libdfim",
"nix 0.23.1",
"num-derive",
"num-traits",
"regex",
"ron",
"serde",
"serde_json",
"serde_regex",
"serde_with",
"slog",
"slog-async",
"slog-syslog",
"slog-term",
"strum",
"strum_macros",
"thiserror",
"tokio",
"tokio-stream",
"tokio-udev",
"udev",
"uuid",
]
[[package]]
......@@ -238,9 +226,9 @@ checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7"
[[package]]
name = "getrandom"
version = "0.2.3"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c"
dependencies = [
"cfg-if 1.0.0",
"libc",
......@@ -290,9 +278,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "indexmap"
version = "1.7.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5"
checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [
"autocfg",
"hashbrown",
......@@ -325,6 +313,31 @@ version = "0.2.112"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
[[package]]
name = "libdfim"
version = "0.1.0"
dependencies = [
"gpt",
"nix 0.23.1",
"num-derive",
"num-traits",
"regex",
"ron",
"serde",
"serde_json",
"serde_regex",
"serde_with",
"slog",
"slog-async",
"slog-syslog",
"slog-term",
"strum",
"strum_macros",
"thiserror",
"udev",
"uuid",
]
[[package]]
name = "libudev-sys"
version = "0.1.4"
......@@ -611,18 +624,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.132"
version = "1.0.133"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9875c23cf305cd1fd7eb77234cbb705f21ea6a72c637a5c6db5fe4b8e7f008"
checksum = "97565067517b60e2d1ea8b268e59ce036de907ac523ad83a0475da04e818989a"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
version = "1.0.132"
version = "1.0.133"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc0db5cb2556c0e558887d9bbdcf6ac4471e83ff66cf696e5419024d1606276"
checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537"
dependencies = [
"proc-macro2",
"quote",
......@@ -631,9 +644,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.74"
version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2bb9cd061c5865d345bb02ca49fcef1391741b672b54a0bf7b679badec3142"
checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79"
dependencies = [
"itoa",
"ryu",
......@@ -726,9 +739,9 @@ dependencies = [
[[package]]
name = "smallvec"
version = "1.7.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "strsim"
......@@ -760,9 +773,9 @@ dependencies = [
[[package]]
name = "syn"
version = "1.0.84"
version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecb2e6da8ee5eb9a61068762a32fa9619cc591ceb055b3687f4cd4051ec2e06b"
checksum = "a684ac3dcd8913827e18cd09a68384ee66c1de24157e3c556c9ab16d85695fb7"
dependencies = [
"proc-macro2",
"quote",
......
[package]
name = "dfim"
version = "0.1.0"
edition = "2021"
authors = [ "Luca Fulchir <luker@fenrirproject.org>" ]
description = "Device&Filesystem Initialization&Monitoring"
[dependencies]
atomic = "0.5"
clap = { version = "3.0.0-rc", features = [ "cargo", "color", "suggestions", "wrap_help" ]}
#gpt = "3.0"
gpt = { version = "3.0", git = "https://github.com/LucaFulchir/gpt", branch = "custom_part_type" }
nix = "0.23"
num-traits = "0.2"
num-derive = "0.3"
regex = "1"
ron = "0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "1.11"
serde_regex = "1.1"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_warn"] }
slog-async= "2.6"
slog-term="2.6"
slog-syslog="0.12"
strum = { version = "0", features = ["derive"] } #used to convert enum to strings
strum_macros = { version = "0" } #used to convert enum to strings
thiserror = "1.0"
tokio = { version = "1.14", features = ["full"] }
tokio-udev = "0.7"
tokio-stream = "0.1"
udev ="0.6"
#users = { version = "0.11", git = "https://github.com/LucaFulchir/rust-users", branch = "fix_group_list" }
uuid = { version = "0.8", features = ["serde", "v4"] }
[workspace]
members = ["libdfim", "dfim"]
[profile.dev]
opt-level = 0
......
[package]
name = "dfim"
version = "0.1.0"
edition = "2021"
authors = [ "Luca Fulchir <luker@fenrirproject.org>" ]
description = "Device&Filesystem Initialization&Monitoring"
#[[bin]]
#name="dfim"
[dependencies]
libdfim = { version="0.1.0", path="../libdfim/" }
atomic = "0.5"
clap = { version = "3.0.0-rc", features = [ "cargo", "color", "suggestions", "wrap_help" ]}
#gpt = "3.0"
nix = "0.23"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_warn"] }
slog-async= "2.6"
slog-term="2.6"
slog-syslog="0.12"
#thiserror = "1.0"
tokio = { version = "1.14", features = ["full"] }
tokio-udev = "0.7"
tokio-stream = "0.1"
#users = { version = "0.11", git = "https://github.com/LucaFulchir/rust-users", branch = "fix_group_list" }
udev = "0.6"
......@@ -18,7 +18,7 @@
*/
extern crate clap;
extern crate dfim;
extern crate libdfim;
mod worker;
......@@ -42,7 +42,7 @@ fn main() -> Result<(), ::std::io::Error> {
cmdline.value_of("config").unwrap_or("/etc/d4fim.ron"),
);
let logger: ::std::sync::Arc<::slog::Logger> =
::std::sync::Arc::new(dfim::log::create_default_logger());
::std::sync::Arc::new(::libdfim::log::create_default_logger());
let is_metadata = ::std::fs::metadata(cfg_path);
......@@ -56,42 +56,43 @@ fn main() -> Result<(), ::std::io::Error> {
logger,
"Configuration file is world accessible"
);
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(1);
}
}
Err(meta_err) => {
::slog::error!(logger, "Error on configuration file: {}", meta_err);
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(1);
}
}
let raw_cfg = match dfim::config::FullConfig::parse(&logger, cfg_path) {
let raw_cfg = match ::libdfim::config::FullConfig::parse(&logger, cfg_path)
{
Ok(cfg) => cfg,
Err(_e) => {
::slog::error!(logger, "Error parsing conf file");
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(1);
}
};
let cfg = match dfim::state::cfg::Cfg::new(&logger, raw_cfg) {
let cfg = match ::libdfim::state::cfg::Cfg::new(&logger, raw_cfg) {
Ok(cfg) => cfg,
Err(_e) => {
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(1);
}
};
let logger: ::std::sync::Arc<::slog::Logger> = ::std::sync::Arc::new(
match dfim::log::create_logger(&cfg.main.logging) {
match ::libdfim::log::create_logger(&cfg.main.logging) {
Err(e) => {
::slog::error!(logger, "Error: can't setup logger: {}", e);
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(1);
}
Ok(new_logger) => {
dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
new_logger
}
},
......@@ -105,13 +106,13 @@ fn main() -> Result<(), ::std::io::Error> {
Ok(user_str) => user_str,
Err(_e) => {
::slog::error!(logger, "current user is not utf-8?");
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(1);
}
},
None => {
::slog::error!(logger, "Error getting current user");
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(1);
}
};
......@@ -121,7 +122,7 @@ fn main() -> Result<(), ::std::io::Error> {
Ok(group_list) => group_list,
Err(_e) => {
::slog::error!(logger, "Can't get list of current groups");
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(6);
}
};
......@@ -161,7 +162,7 @@ fn main() -> Result<(), ::std::io::Error> {
}
Err(e) => {
::slog::error!(logger, "Can't drop privileges: {}", e);
::dfim::log::drop_logger(logger);
::libdfim::log::drop_logger(logger);
::std::process::exit(7);
}
};
......
......@@ -19,54 +19,44 @@
use super::DeviceMsg;
use super::WorkState;
use ::std::sync::{mpsc, Arc, Condvar, Mutex};
use ::std::sync::Arc;
use ::tokio::sync::{mpsc, Mutex};
pub async fn drone(
logger: Arc<::slog::Logger>,
cfg: Arc<dfim::state::cfg::Cfg>,
mtx_rx: Arc<Mutex<mpsc::Receiver<DeviceMsg>>>,
cvar: Arc<Condvar>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
mtx_rx: Arc<Mutex<mpsc::UnboundedReceiver<DeviceMsg>>>,
) -> Result<(), ::std::io::Error> {
::slog::debug!(logger, "Running drone...");
'unlocked: loop {
let work: DeviceMsg;
{
// this anonymous block is used to drop the mutex asap
// we use the same conditional variable to notify both new work and
// a change in WORK_STATE
let mut rx = mtx_rx.lock().unwrap();
work = 'locked: loop {
// try to get work, but try to be efficient about it, blocking
// until there actually is some or until we are
// told to shut down
let curr_work_state = super::WORK_STATE
.load(::std::sync::atomic::Ordering::Relaxed);
if curr_work_state == WorkState::ShutdownImmediate {
break 'unlocked;
}
match rx.try_recv() {
Ok(msg) => break 'locked msg,
Err(_) => {
match curr_work_state {
WorkState::KeepWorking
| WorkState::ShutdownScannerGraceful => {
// empty queue, block until we find work
rx = cvar.wait(rx).unwrap();
continue 'locked;
}
WorkState::ShutdownAllGraceful => {
// queue empty, no more work to do
break 'unlocked;
}
WorkState::ShutdownImmediate => break 'unlocked,
}
}
}
};
let state =
super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed);
match state {
WorkState::KeepWorking
| WorkState::ShutdownScannerGraceful
| WorkState::ShutdownAllGraceful => {}
WorkState::ShutdownImmediate => {
break;
}
}
let mut rx = mtx_rx.lock().await;
let work = match rx.recv().await {
Some(work) => {
drop(rx);
work
}
None => {
::slog::trace!(logger, "Drone exiting: no more senders");
break;
}
};
::slog::debug!(logger, "Found device: {}- {:?}", work.id, work.path);
let dev = ::udev::Device::from_syspath(&work.path).unwrap();
let cfg_dev = cfg.devices.iter().find(|x| x.id == work.id).unwrap();
use crate::dfim::config::trgt::TargetCommon;
use ::libdfim::config::trgt::TargetCommon;
let cfg_target_id = cfg
.targets
.iter()
......@@ -75,7 +65,7 @@ pub async fn drone(
.id();
if cfg_dev.only_if_empty {
match dfim::state::is_empty(&dev) {
match ::libdfim::state::is_empty(&dev) {
Ok(is_empty) => {
if !is_empty {
::slog::debug!(
......@@ -97,17 +87,21 @@ pub async fn drone(
}
}
}
let check_state =
match dfim::state::get(&logger, &cfg, cfg_dev, cfg_target_id, &dev)
{
Ok(check_state) => check_state,
Err(_) => {
continue 'unlocked;
}
};
let check_state = match ::libdfim::state::get(
&logger,
&cfg,
cfg_dev,
cfg_target_id,
&dev,
) {
Ok(check_state) => check_state,
Err(_) => {
continue 'unlocked;
}
};
match check_state {
dfim::state::s_trgt::CheckStatus::DoNotApply
| dfim::state::s_trgt::CheckStatus::AlreadyApplied => {
::libdfim::state::s_trgt::CheckStatus::DoNotApply
| ::libdfim::state::s_trgt::CheckStatus::AlreadyApplied => {
// stop working on this device
// either something needed to be empty and wasn't
// or everything has already been applied
......@@ -119,9 +113,10 @@ pub async fn drone(
);
continue 'unlocked;
}
dfim::state::s_trgt::CheckStatus::CanBeApplied => {}
::libdfim::state::s_trgt::CheckStatus::CanBeApplied => {}
}
match dfim::state::set(&logger, &cfg, cfg_dev, cfg_target_id, &dev) {
match ::libdfim::state::set(&logger, &cfg, cfg_dev, cfg_target_id, &dev)
{
Ok(()) => {
// all done and set
}
......
......@@ -20,7 +20,7 @@
mod drone;
mod scanner;
use ::std::sync::{mpsc, Arc, Condvar, Mutex};
use ::std::sync::Arc;
pub struct DeviceMsg {
id: String,
......@@ -33,87 +33,71 @@ pub enum WorkState {
ShutdownAllGraceful,
ShutdownImmediate,
}
/* Note:
* Yes, we have a race condition on WORK_STATE
* because we don't put it behind a mutex and it can be changed
* by the signal handler and the scanner
* It is as intended.
* Pro:
* We don't lock mutexes every time we check WORK_STATE
* (which is a lot)
* Con:
* you have to send a new ctrl-c signal because
* the shutdown is actually working and it overwrote your second ctrl-c
*/
pub static WORK_STATE: ::atomic::Atomic<WorkState> =
::atomic::Atomic::new(WorkState::KeepWorking);
extern "C" fn handle_sigint(_: i32) {
/* NOTE:
* we have a race condition here,
* we load the value but then some other thread might set it again
* ... but we don't want to put this behind a mutex either
* so we just accept the race.
* Instead of going through all shutdown stages we go
* working -> shutdownscanner
* and if the user presses ctr-c again we set
* shutdownImmediate
* this way either:
* * the shutdown is continuing, give it another chance
* * the shutdown was not quick enough, the user presses ctr-c again
*/
match WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::KeepWorking => WORK_STATE.store(
WorkState::ShutdownScannerGraceful,
::std::sync::atomic::Ordering::Relaxed,
),
_ => WORK_STATE.store(
WorkState::ShutdownImmediate,
::std::sync::atomic::Ordering::Relaxed,
),
};
}
pub async fn entry(
logger: Arc<::slog::Logger>,
arc_cfg: Arc<dfim::state::cfg::Cfg>,
arc_cfg: Arc<::libdfim::state::cfg::Cfg>,
) -> Result<(), ::std::io::Error> {
let sig_action = ::nix::sys::signal::SigAction::new(
::nix::sys::signal::SigHandler::Handler(handle_sigint),
::nix::sys::signal::SaFlags::empty(),
::nix::sys::signal::SigSet::empty(),
);
unsafe {
::nix::sys::signal::sigaction(::nix::sys::signal::SIGINT, &sig_action);
}
let (tx, rx) = ::tokio::sync::mpsc::unbounded_channel::<DeviceMsg>();
let arc_rx = Arc::new(::tokio::sync::Mutex::new(rx));
let scanner_notify = Arc::new(::tokio::sync::Notify::new());
let (tx, rx) = mpsc::channel::<DeviceMsg>();
let cvar = Arc::new(Condvar::new());
let arc_rx = Arc::new(Mutex::new(rx));
let scan_sig = scanner_notify.clone();
let sig_handler = ::tokio::spawn(async move {
loop {
match ::tokio::signal::ctrl_c().await {
Err(_) => return,
Ok(_) => {}
}
println!("got signal!");
match WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::KeepWorking => WORK_STATE.store(
WorkState::ShutdownScannerGraceful,
::std::sync::atomic::Ordering::Relaxed,
),
_ => WORK_STATE.store(
WorkState::ShutdownImmediate,
::std::sync::atomic::Ordering::Relaxed,
),
};
scan_sig.notify_waiters();
}
});
let tk_local = ::tokio::task::LocalSet::new();
let scan_h = tk_local.spawn_local(scanner::scanner(
let scanner_h = scanner::scanner(
logger.clone(),
scanner_notify.clone(),
arc_cfg.clone(),
tx,
cvar.clone(),
));
let drone_h = tokio::spawn(drone::drone(
logger.clone(),
arc_cfg.clone(),
arc_rx,
cvar.clone(),
));
);
let drone_h =
::tokio::spawn(drone::drone(logger.clone(), arc_cfg.clone(), arc_rx));
tk_local.await;
let scan_exit = scan_h.await;
match WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::ShutdownImmediate => { /* don't downgrade the shutdown */ }
_ => WORK_STATE.store(
WorkState::ShutdownAllGraceful,
::std::sync::atomic::Ordering::Relaxed,
),
};
cvar.notify_all();
let drone_exit = drone_h.await;
let (scan_exit, drone_exit) = ::tokio::join!(scanner_h, drone_h);
sig_handler.abort();
match scan_exit {
Ok(scan_res) => match scan_res {
Ok(()) => match drone_exit {
Ok(drone_res) => match drone_res {
Ok(()) => Ok(()),
Err(drone_err) => Err(drone_err),
},
Err(drone_join) => Err(drone_join.into()),
Ok(()) => match drone_exit {
Ok(drone_res) => match drone_res {
Ok(()) => Ok(()),
Err(e) => Err(e),
},
Err(scan_err) => Err(scan_err),
Err(drone_join) => Err(drone_join.into()),
},
Err(scan_join) => Err(scan_join.into()),
}
......
......@@ -18,98 +18,150 @@
*/
use super::DeviceMsg;
use ::std::sync::{mpsc, Arc, Condvar};
use super::WorkState;
use ::std::sync::Arc;
use ::tokio::sync::mpsc;
pub async fn scanner(
logger: Arc<::slog::Logger>,
cfg: Arc<dfim::state::cfg::Cfg>,
tx: mpsc::Sender<DeviceMsg>,
cvar: Arc<Condvar>,
work_notify: Arc<::tokio::sync::Notify>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
tx: mpsc::UnboundedSender<DeviceMsg>,
) -> Result<(), ::std::io::Error> {
// First setup the udev monitors, then enumerate all devices one, then just
// wait for udev events
let count_initialized = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
let (mon_tx, mon_rx) = mpsc::unbounded_channel::<::std::path::PathBuf>();
// First setup the udev monitors, then enumerate all devices once,
// then just wait for udev events
::slog::debug!(logger, "Spawning local udev monitors");
let tk_local = ::tokio::task::LocalSet::new();
let mut mon_vec = Vec::with_capacity(cfg.devices.len());
for dev in &cfg.devices {
//let mut mon_builder = ::udev::MonitorBuilder::new().unwrap();
let mut mon_builder = ::tokio_udev::MonitorBuilder::new().unwrap();
match &dev.subsystem {
None => {}
Some(ss) => match &dev.devtype {
None => {
mon_builder = match mon_builder.match_subsystem(ss.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\"",
ss
);
return Err(e);
}
};
}
Some(dt) => {
mon_builder = match mon_builder
.match_subsystem_devtype(ss.to_str(), dt.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\" with devtype \
\"{}\"",
ss,
dt
);
return Err(e);
}
};
}
},
}
for tag in &dev.tags {
mon_builder = match mon_builder.match_tag(&tag) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't filter with tag \"{:?}\"",
&tag
);
return Err(e);
}
}
}
let mon_sock = match mon_builder.listen() {
Ok(sock) => sock,
Err(e) => {
::slog::error!(logger, "Can't create socket to monitor dev");
return Err(e);
}
};
let async_mon =
::tokio_udev::AsyncMonitorSocket::new(mon_sock).unwrap();
// let th_join = ::tokio::spawn(monitor_udev(
let th_join = ::tokio::task::spawn_local(monitor_udev(
for dev_idx in 0..cfg.devices.len() {
let th_join = tk_local.spawn_local(monitor_udev(
logger.clone(),
cfg.clone(),
async_mon,
tx.clone(),
cvar.clone(),
dev_idx,
count_initialized.clone(),
work_notify.clone(),
mon_tx.clone(),
));
mon_vec.push(th_join);
}
let _enumerate_th = tk_local.spawn_local(enumerate(
logger.clone(),
cfg.clone(),
count_initialized.clone(),
tx.clone(),
));
let _get_filter_join = tk_local.spawn_local(get_and_filter(
logger.clone(),
cfg.clone(),
count_initialized.clone(),
work_notify,
mon_rx,
tx,
mon_vec,
));
tk_local.await;
::slog::debug!(logger, "Scanner exiting");
Ok(())
}
async fn get_and_filter(
logger: Arc<::slog::Logger>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
work_notify: Arc<::tokio::sync::Notify>,
mut mon_rx: mpsc::UnboundedReceiver<::std::path::PathBuf>,
tx: mpsc::UnboundedSender<DeviceMsg>,
mon_vec: Vec<::tokio::task::JoinHandle<Result<(), ::std::io::Error>>>,
) -> Result<(), ::std::io::Error> {
// don't process anything unless the dev monitor have started
while count_initialized.load(atomic::Ordering::Relaxed) != cfg.devices.len()
{
::std::thread::sleep(::std::time::Duration::from_millis(50));
}
loop {
// This looks complicated, but basically:
// * we have two mpsc queues:
// * one from the udev socket listeners,
// * one to give work to the drone(s)
// * we use a conditional variable/mutex to wake up when the udev
// sockets have a new device
// * we use the same cond.var/mutex to be notified on the WORK_STATE
// changes
if super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed)
!= WorkState::KeepWorking
{
break;
}
let work = match mon_rx.try_recv() {
Ok(path) => ::udev::Device::from_syspath(&path).unwrap(),
Err(::tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
break;
}
Err(::tokio::sync::mpsc::error::TryRecvError::Empty) => {
::slog::trace!(logger, "waiting for dev to filter");
work_notify.notified().await;
//let lock = work_notify.0.lock().unwrap();
//let _ = work_notify.1.wait(lock).unwrap();
continue;
}
};
::slog::trace!(logger, "Got device to filter: {:?}", work.syspath());
match filter_devices(&logger, cfg.clone(), work) {
Ok(dev) => {
match tx.send(DeviceMsg {
id: dev.0,
path: dev.1.syspath().into(),
}) {
Ok(()) => {}
Err(_) => {
::slog::error!(
logger,
"drones died before the scanner"
);
break;
}
}
}
Err(_) => {} // device did not match the filter
}
}
match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::KeepWorking | WorkState::ShutdownScannerGraceful => {
super::WORK_STATE.store(
WorkState::ShutdownAllGraceful,
atomic::Ordering::Relaxed,
);
}
_ => {}
}
::slog::trace!(logger, "Dropping all udev monitors");
for th in mon_vec {
th.abort();
}
Ok(())
}
async fn enumerate(
logger: Arc<::slog::Logger>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
tx: mpsc::UnboundedSender<DeviceMsg>,
) -> Result<(), ::std::io::Error> {
// before scanning, make sure the monitoring have already started
while count_initialized.load(atomic::Ordering::Relaxed) != cfg.devices.len()
{
::std::thread::sleep(::std::time::Duration::from_millis(50));
}
// enumerate all devices
let udev_ctx = ::udev::Udev::new().unwrap();
let mut devices = ::udev::Enumerator::with_udev(udev_ctx.clone())?;
devices.match_is_initialized().unwrap();
let dev_list = devices.scan_devices()?;
::slog::debug!(logger, "Scanning all devices...");
for dev in dev_list {
match filter_devices(&logger, cfg.clone(), dev) {
Ok(dev) => {
......@@ -127,81 +179,134 @@ pub async fn scanner(
));
}
}
cvar.notify_one();
}
Err(()) => {}
}
}
for th in mon_vec {
th.await;
}
::slog::trace!(logger, "All devicies enumerated");
Ok(())
}
async fn monitor_udev(
logger: Arc<::slog::Logger>,
cfg: Arc<dfim::state::cfg::Cfg>,
mut async_mon: ::tokio_udev::AsyncMonitorSocket,
tx: mpsc::Sender<DeviceMsg>,
cvar: Arc<Condvar>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
dev_idx: usize,
count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
work_notify: Arc<::tokio::sync::Notify>,
mon_tx: mpsc::UnboundedSender<::std::path::PathBuf>,
) -> Result<(), ::std::io::Error> {
let dev = &cfg.devices[dev_idx];
let mut mon_builder = ::tokio_udev::MonitorBuilder::new().unwrap();
match &dev.subsystem {
None => {}
Some(ss) => match &dev.devtype {
None => {
mon_builder = match mon_builder.match_subsystem(ss.to_str()) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\"",
ss
);
return Err(e);
}
};
}
Some(dt) => {
mon_builder = match mon_builder
.match_subsystem_devtype(ss.to_str(), dt.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\" with devtype \
\"{}\"",
ss,
dt
);
return Err(e);
}
};
}
},
}
for tag in &dev.tags {
mon_builder = match mon_builder.match_tag(&tag) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(logger, "Can't filter with tag \"{:?}\"", &tag);
return Err(e);
}
}
}
let mon_sock = match mon_builder.listen() {
Ok(sock) => sock,
Err(e) => {
::slog::error!(logger, "Can't create socket to monitor dev");
return Err(e);
}
};
let mut async_mon =
::tokio_udev::AsyncMonitorSocket::new(mon_sock).unwrap();
count_initialized.fetch_add(1, atomic::Ordering::Relaxed);
loop {
match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
super::WorkState::KeepWorking => {}
super::WorkState::ShutdownImmediate
| super::WorkState::ShutdownAllGraceful
| super::WorkState::ShutdownScannerGraceful => break,
WorkState::KeepWorking => {}
WorkState::ShutdownImmediate
| WorkState::ShutdownAllGraceful
| WorkState::ShutdownScannerGraceful => break,
}
use ::tokio_stream::StreamExt;
while let Some(res_event) = async_mon.next().await {
match res_event {
Ok(event) => {
match filter_devices(&logger, cfg.clone(), event.device()) {
Ok(dev) => {
match tx.send(DeviceMsg {
id: dev.0,
path: dev.1.syspath().into(),
}) {
Ok(()) => {}
Err(_) => {
// the receiver has been closed
// it's pointless to do any more work
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"work queue receiver has been closed",
));
}
}
cvar.notify_one();
}
Err(_) => {}
let res_event = match async_mon.next().await {
Some(res_event) => res_event,
None => continue,
};
match res_event {
Ok(event) => {
match mon_tx.send(event.device().syspath().to_owned()) {
Ok(()) => {}
Err(_) => {
::slog::error!(
logger,
"monitor queue reader dropped before the writer"
);
// the receiver has been closed
// it's pointless to do any more work
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"work queue receiver has been closed",
));
}
}
Err(_) => {
// the receiver has been closed
// it's pointless to do any more work
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"udev socket reset",
));
}
work_notify.notify_one();
}
Err(_) => {
// the receiver has been closed
// it's pointless to do any more work
::slog::error!(logger, "monitor udev socket reset");
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"udev socket reset",
));
}
}
}
::slog::trace!(logger, "udev monitor exited");
Ok(())
}
fn filter_devices(
logger: &::slog::Logger,
cfg: Arc<dfim::state::cfg::Cfg>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
dev: ::udev::Device,
) -> Result<(String, ::udev::Device), ()> {
if !dev.is_initialized() {
return Err(());
}
use dfim::config::device::Device;
use ::libdfim::config::device::Device;
let mut last_matched: Option<&Device> = None;
'devloop: for cfg_dev in &cfg.devices {
......@@ -238,7 +343,7 @@ fn filter_devices(
Some(str_value) => str_value,
None => continue 'devloop,
};
use dfim::config::device::AttributeCheck;
use ::libdfim::config::device::AttributeCheck;
match attr {
AttributeCheck::Ok(a) => {
if !a.value.is_match(str_value) {
......@@ -274,7 +379,7 @@ fn filter_devices(
Some(str_value) => str_value,
None => continue 'devloop,
};
use dfim::config::device::PropertyCheck;
use ::libdfim::config::device::PropertyCheck;
match prop {
PropertyCheck::Ok(p) => {
if !p.value.is_match(str_value) {
......@@ -302,7 +407,7 @@ fn filter_devices(
None => continue 'devloop,
};
for val in list_str.split(':') {
use dfim::config::device::TagCheck;
use ::libdfim::config::device::TagCheck;
match tag {
TagCheck::Ok(raw_tag) => {
if val == raw_tag {
......@@ -329,7 +434,7 @@ fn filter_devices(
None => continue 'devloop,
};
for val in list_str.split(':') {
use dfim::config::device::TagCheck;
use ::libdfim::config::device::TagCheck;
match ctag {
TagCheck::Ok(raw_tag) => {
if val == raw_tag {
......
[package]
name = "libdfim"
version = "0.1.0"
edition = "2021"
authors = [ "Luca Fulchir <luker@fenrirproject.org>" ]
description = "Device&Filesystem Initialization&Monitoring"
[lib]
crate-type = ["rlib","dylib"]
[dependencies]
#gpt = "3.0"
gpt = { version = "3.0", git = "https://github.com/LucaFulchir/gpt", branch = "custom_part_type" }
nix = "0.23"
num-traits = "0.2"
num-derive = "0.3"
regex = "1"
ron = "0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "1.11"
serde_regex = "1.1"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_warn"] }
slog-async= "2.6"
slog-term="2.6"
slog-syslog="0.12"
strum = { version = "0", features = ["derive"] } #used to convert enum to strings
strum_macros = { version = "0" } #used to convert enum to strings
thiserror = "1.0"
udev ="0.6"
uuid = { version = "0.8", features = ["serde", "v4"] }