Skip to content
Commits on Source (3)
...@@ -94,9 +94,9 @@ dependencies = [ ...@@ -94,9 +94,9 @@ dependencies = [
[[package]] [[package]]
name = "clap" name = "clap"
version = "3.0.0-rc.11" version = "3.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6f243c7279f09ffed852a0a564c72091331651484cdbb32b7287f16df8611a7" checksum = "7a30c3bf9ff12dfe5dae53f0a96e0febcd18420d1c0e7fad77796d9d5c4b5375"
dependencies = [ dependencies = [
"atty", "atty",
"bitflags", "bitflags",
...@@ -120,9 +120,9 @@ dependencies = [ ...@@ -120,9 +120,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.1" version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4" checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"crossbeam-utils", "crossbeam-utils",
...@@ -130,9 +130,9 @@ dependencies = [ ...@@ -130,9 +130,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.5" version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db" checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"lazy_static", "lazy_static",
...@@ -179,28 +179,16 @@ version = "0.1.0" ...@@ -179,28 +179,16 @@ version = "0.1.0"
dependencies = [ dependencies = [
"atomic", "atomic",
"clap", "clap",
"gpt", "libdfim",
"nix 0.23.1", "nix 0.23.1",
"num-derive",
"num-traits",
"regex",
"ron",
"serde",
"serde_json",
"serde_regex",
"serde_with",
"slog", "slog",
"slog-async", "slog-async",
"slog-syslog", "slog-syslog",
"slog-term", "slog-term",
"strum",
"strum_macros",
"thiserror",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-udev", "tokio-udev",
"udev", "udev",
"uuid",
] ]
[[package]] [[package]]
...@@ -238,9 +226,9 @@ checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" ...@@ -238,9 +226,9 @@ checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7"
[[package]] [[package]]
name = "getrandom" name = "getrandom"
version = "0.2.3" version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753" checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"libc", "libc",
...@@ -290,9 +278,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" ...@@ -290,9 +278,9 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "1.7.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc633605454125dec4b66843673f01c7df2b89479b32e0ed634e43a91cff62a5" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown", "hashbrown",
...@@ -325,6 +313,31 @@ version = "0.2.112" ...@@ -325,6 +313,31 @@ version = "0.2.112"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125" checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
[[package]]
name = "libdfim"
version = "0.1.0"
dependencies = [
"gpt",
"nix 0.23.1",
"num-derive",
"num-traits",
"regex",
"ron",
"serde",
"serde_json",
"serde_regex",
"serde_with",
"slog",
"slog-async",
"slog-syslog",
"slog-term",
"strum",
"strum_macros",
"thiserror",
"udev",
"uuid",
]
[[package]] [[package]]
name = "libudev-sys" name = "libudev-sys"
version = "0.1.4" version = "0.1.4"
...@@ -611,18 +624,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" ...@@ -611,18 +624,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.132" version = "1.0.133"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9875c23cf305cd1fd7eb77234cbb705f21ea6a72c637a5c6db5fe4b8e7f008" checksum = "97565067517b60e2d1ea8b268e59ce036de907ac523ad83a0475da04e818989a"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.132" version = "1.0.133"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc0db5cb2556c0e558887d9bbdcf6ac4471e83ff66cf696e5419024d1606276" checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
...@@ -631,9 +644,9 @@ dependencies = [ ...@@ -631,9 +644,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.74" version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2bb9cd061c5865d345bb02ca49fcef1391741b672b54a0bf7b679badec3142" checksum = "c059c05b48c5c0067d4b4b2b4f0732dd65feb52daf7e0ea09cd87e7dadc1af79"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
...@@ -726,9 +739,9 @@ dependencies = [ ...@@ -726,9 +739,9 @@ dependencies = [
[[package]] [[package]]
name = "smallvec" name = "smallvec"
version = "1.7.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]] [[package]]
name = "strsim" name = "strsim"
...@@ -760,9 +773,9 @@ dependencies = [ ...@@ -760,9 +773,9 @@ dependencies = [
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.84" version = "1.0.85"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecb2e6da8ee5eb9a61068762a32fa9619cc591ceb055b3687f4cd4051ec2e06b" checksum = "a684ac3dcd8913827e18cd09a68384ee66c1de24157e3c556c9ab16d85695fb7"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
......
[package] [workspace]
name = "dfim" members = ["libdfim", "dfim"]
version = "0.1.0"
edition = "2021"
authors = [ "Luca Fulchir <luker@fenrirproject.org>" ]
description = "Device&Filesystem Initialization&Monitoring"
[dependencies]
atomic = "0.5"
clap = { version = "3.0.0-rc", features = [ "cargo", "color", "suggestions", "wrap_help" ]}
#gpt = "3.0"
gpt = { version = "3.0", git = "https://github.com/LucaFulchir/gpt", branch = "custom_part_type" }
nix = "0.23"
num-traits = "0.2"
num-derive = "0.3"
regex = "1"
ron = "0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "1.11"
serde_regex = "1.1"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_warn"] }
slog-async= "2.6"
slog-term="2.6"
slog-syslog="0.12"
strum = { version = "0", features = ["derive"] } #used to convert enum to strings
strum_macros = { version = "0" } #used to convert enum to strings
thiserror = "1.0"
tokio = { version = "1.14", features = ["full"] }
tokio-udev = "0.7"
tokio-stream = "0.1"
udev ="0.6"
#users = { version = "0.11", git = "https://github.com/LucaFulchir/rust-users", branch = "fix_group_list" }
uuid = { version = "0.8", features = ["serde", "v4"] }
[profile.dev] [profile.dev]
opt-level = 0 opt-level = 0
......
[package]
name = "dfim"
version = "0.1.0"
edition = "2021"
authors = [ "Luca Fulchir <luker@fenrirproject.org>" ]
description = "Device&Filesystem Initialization&Monitoring"
#[[bin]]
#name="dfim"
[dependencies]
libdfim = { version="0.1.0", path="../libdfim/" }
atomic = "0.5"
clap = { version = "3.0.0-rc", features = [ "cargo", "color", "suggestions", "wrap_help" ]}
#gpt = "3.0"
nix = "0.23"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_warn"] }
slog-async= "2.6"
slog-term="2.6"
slog-syslog="0.12"
#thiserror = "1.0"
tokio = { version = "1.14", features = ["full"] }
tokio-udev = "0.7"
tokio-stream = "0.1"
#users = { version = "0.11", git = "https://github.com/LucaFulchir/rust-users", branch = "fix_group_list" }
udev = "0.6"
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
*/ */
extern crate clap; extern crate clap;
extern crate dfim; extern crate libdfim;
mod worker; mod worker;
...@@ -42,7 +42,7 @@ fn main() -> Result<(), ::std::io::Error> { ...@@ -42,7 +42,7 @@ fn main() -> Result<(), ::std::io::Error> {
cmdline.value_of("config").unwrap_or("/etc/d4fim.ron"), cmdline.value_of("config").unwrap_or("/etc/d4fim.ron"),
); );
let logger: ::std::sync::Arc<::slog::Logger> = let logger: ::std::sync::Arc<::slog::Logger> =
::std::sync::Arc::new(dfim::log::create_default_logger()); ::std::sync::Arc::new(::libdfim::log::create_default_logger());
let is_metadata = ::std::fs::metadata(cfg_path); let is_metadata = ::std::fs::metadata(cfg_path);
...@@ -56,42 +56,43 @@ fn main() -> Result<(), ::std::io::Error> { ...@@ -56,42 +56,43 @@ fn main() -> Result<(), ::std::io::Error> {
logger, logger,
"Configuration file is world accessible" "Configuration file is world accessible"
); );
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(1); ::std::process::exit(1);
} }
} }
Err(meta_err) => { Err(meta_err) => {
::slog::error!(logger, "Error on configuration file: {}", meta_err); ::slog::error!(logger, "Error on configuration file: {}", meta_err);
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(1); ::std::process::exit(1);
} }
} }
let raw_cfg = match dfim::config::FullConfig::parse(&logger, cfg_path) { let raw_cfg = match ::libdfim::config::FullConfig::parse(&logger, cfg_path)
{
Ok(cfg) => cfg, Ok(cfg) => cfg,
Err(_e) => { Err(_e) => {
::slog::error!(logger, "Error parsing conf file"); ::slog::error!(logger, "Error parsing conf file");
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(1); ::std::process::exit(1);
} }
}; };
let cfg = match dfim::state::cfg::Cfg::new(&logger, raw_cfg) { let cfg = match ::libdfim::state::cfg::Cfg::new(&logger, raw_cfg) {
Ok(cfg) => cfg, Ok(cfg) => cfg,
Err(_e) => { Err(_e) => {
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(1); ::std::process::exit(1);
} }
}; };
let logger: ::std::sync::Arc<::slog::Logger> = ::std::sync::Arc::new( let logger: ::std::sync::Arc<::slog::Logger> = ::std::sync::Arc::new(
match dfim::log::create_logger(&cfg.main.logging) { match ::libdfim::log::create_logger(&cfg.main.logging) {
Err(e) => { Err(e) => {
::slog::error!(logger, "Error: can't setup logger: {}", e); ::slog::error!(logger, "Error: can't setup logger: {}", e);
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(1); ::std::process::exit(1);
} }
Ok(new_logger) => { Ok(new_logger) => {
dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
new_logger new_logger
} }
}, },
...@@ -105,13 +106,13 @@ fn main() -> Result<(), ::std::io::Error> { ...@@ -105,13 +106,13 @@ fn main() -> Result<(), ::std::io::Error> {
Ok(user_str) => user_str, Ok(user_str) => user_str,
Err(_e) => { Err(_e) => {
::slog::error!(logger, "current user is not utf-8?"); ::slog::error!(logger, "current user is not utf-8?");
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(1); ::std::process::exit(1);
} }
}, },
None => { None => {
::slog::error!(logger, "Error getting current user"); ::slog::error!(logger, "Error getting current user");
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(1); ::std::process::exit(1);
} }
}; };
...@@ -121,7 +122,7 @@ fn main() -> Result<(), ::std::io::Error> { ...@@ -121,7 +122,7 @@ fn main() -> Result<(), ::std::io::Error> {
Ok(group_list) => group_list, Ok(group_list) => group_list,
Err(_e) => { Err(_e) => {
::slog::error!(logger, "Can't get list of current groups"); ::slog::error!(logger, "Can't get list of current groups");
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(6); ::std::process::exit(6);
} }
}; };
...@@ -161,7 +162,7 @@ fn main() -> Result<(), ::std::io::Error> { ...@@ -161,7 +162,7 @@ fn main() -> Result<(), ::std::io::Error> {
} }
Err(e) => { Err(e) => {
::slog::error!(logger, "Can't drop privileges: {}", e); ::slog::error!(logger, "Can't drop privileges: {}", e);
::dfim::log::drop_logger(logger); ::libdfim::log::drop_logger(logger);
::std::process::exit(7); ::std::process::exit(7);
} }
}; };
......
...@@ -19,54 +19,44 @@ ...@@ -19,54 +19,44 @@
use super::DeviceMsg; use super::DeviceMsg;
use super::WorkState; use super::WorkState;
use ::std::sync::{mpsc, Arc, Condvar, Mutex}; use ::std::sync::Arc;
use ::tokio::sync::{mpsc, Mutex};
pub async fn drone( pub async fn drone(
logger: Arc<::slog::Logger>, logger: Arc<::slog::Logger>,
cfg: Arc<dfim::state::cfg::Cfg>, cfg: Arc<::libdfim::state::cfg::Cfg>,
mtx_rx: Arc<Mutex<mpsc::Receiver<DeviceMsg>>>, mtx_rx: Arc<Mutex<mpsc::UnboundedReceiver<DeviceMsg>>>,
cvar: Arc<Condvar>,
) -> Result<(), ::std::io::Error> { ) -> Result<(), ::std::io::Error> {
::slog::debug!(logger, "Running drone..."); ::slog::debug!(logger, "Running drone...");
'unlocked: loop { 'unlocked: loop {
let work: DeviceMsg; // we use the same conditional variable to notify both new work and
{ // a change in WORK_STATE
// this anonymous block is used to drop the mutex asap
let mut rx = mtx_rx.lock().unwrap(); let state =
work = 'locked: loop { super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed);
// try to get work, but try to be efficient about it, blocking match state {
// until there actually is some or until we are WorkState::KeepWorking
// told to shut down | WorkState::ShutdownScannerGraceful
let curr_work_state = super::WORK_STATE | WorkState::ShutdownAllGraceful => {}
.load(::std::sync::atomic::Ordering::Relaxed); WorkState::ShutdownImmediate => {
if curr_work_state == WorkState::ShutdownImmediate { break;
break 'unlocked; }
}
match rx.try_recv() {
Ok(msg) => break 'locked msg,
Err(_) => {
match curr_work_state {
WorkState::KeepWorking
| WorkState::ShutdownScannerGraceful => {
// empty queue, block until we find work
rx = cvar.wait(rx).unwrap();
continue 'locked;
}
WorkState::ShutdownAllGraceful => {
// queue empty, no more work to do
break 'unlocked;
}
WorkState::ShutdownImmediate => break 'unlocked,
}
}
}
};
} }
let mut rx = mtx_rx.lock().await;
let work = match rx.recv().await {
Some(work) => {
drop(rx);
work
}
None => {
::slog::trace!(logger, "Drone exiting: no more senders");
break;
}
};
::slog::debug!(logger, "Found device: {}- {:?}", work.id, work.path); ::slog::debug!(logger, "Found device: {}- {:?}", work.id, work.path);
let dev = ::udev::Device::from_syspath(&work.path).unwrap(); let dev = ::udev::Device::from_syspath(&work.path).unwrap();
let cfg_dev = cfg.devices.iter().find(|x| x.id == work.id).unwrap(); let cfg_dev = cfg.devices.iter().find(|x| x.id == work.id).unwrap();
use crate::dfim::config::trgt::TargetCommon; use ::libdfim::config::trgt::TargetCommon;
let cfg_target_id = cfg let cfg_target_id = cfg
.targets .targets
.iter() .iter()
...@@ -75,7 +65,7 @@ pub async fn drone( ...@@ -75,7 +65,7 @@ pub async fn drone(
.id(); .id();
if cfg_dev.only_if_empty { if cfg_dev.only_if_empty {
match dfim::state::is_empty(&dev) { match ::libdfim::state::is_empty(&dev) {
Ok(is_empty) => { Ok(is_empty) => {
if !is_empty { if !is_empty {
::slog::debug!( ::slog::debug!(
...@@ -97,17 +87,21 @@ pub async fn drone( ...@@ -97,17 +87,21 @@ pub async fn drone(
} }
} }
} }
let check_state = let check_state = match ::libdfim::state::get(
match dfim::state::get(&logger, &cfg, cfg_dev, cfg_target_id, &dev) &logger,
{ &cfg,
Ok(check_state) => check_state, cfg_dev,
Err(_) => { cfg_target_id,
continue 'unlocked; &dev,
} ) {
}; Ok(check_state) => check_state,
Err(_) => {
continue 'unlocked;
}
};
match check_state { match check_state {
dfim::state::s_trgt::CheckStatus::DoNotApply ::libdfim::state::s_trgt::CheckStatus::DoNotApply
| dfim::state::s_trgt::CheckStatus::AlreadyApplied => { | ::libdfim::state::s_trgt::CheckStatus::AlreadyApplied => {
// stop working on this device // stop working on this device
// either something needed to be empty and wasn't // either something needed to be empty and wasn't
// or everything has already been applied // or everything has already been applied
...@@ -119,9 +113,10 @@ pub async fn drone( ...@@ -119,9 +113,10 @@ pub async fn drone(
); );
continue 'unlocked; continue 'unlocked;
} }
dfim::state::s_trgt::CheckStatus::CanBeApplied => {} ::libdfim::state::s_trgt::CheckStatus::CanBeApplied => {}
} }
match dfim::state::set(&logger, &cfg, cfg_dev, cfg_target_id, &dev) { match ::libdfim::state::set(&logger, &cfg, cfg_dev, cfg_target_id, &dev)
{
Ok(()) => { Ok(()) => {
// all done and set // all done and set
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
mod drone; mod drone;
mod scanner; mod scanner;
use ::std::sync::{mpsc, Arc, Condvar, Mutex}; use ::std::sync::Arc;
pub struct DeviceMsg { pub struct DeviceMsg {
id: String, id: String,
...@@ -33,87 +33,71 @@ pub enum WorkState { ...@@ -33,87 +33,71 @@ pub enum WorkState {
ShutdownAllGraceful, ShutdownAllGraceful,
ShutdownImmediate, ShutdownImmediate,
} }
/* Note:
* Yes, we have a race condition on WORK_STATE
* because we don't put it behind a mutex and it can be changed
* by the signal handler and the scanner
* It is as intended.
* Pro:
* We don't lock mutexes every time we check WORK_STATE
* (which is a lot)
* Con:
* you have to send a new ctrl-c signal because
* the shutdown is actually working and it overwrote your second ctrl-c
*/
pub static WORK_STATE: ::atomic::Atomic<WorkState> = pub static WORK_STATE: ::atomic::Atomic<WorkState> =
::atomic::Atomic::new(WorkState::KeepWorking); ::atomic::Atomic::new(WorkState::KeepWorking);
extern "C" fn handle_sigint(_: i32) {
/* NOTE:
* we have a race condition here,
* we load the value but then some other thread might set it again
* ... but we don't want to put this behind a mutex either
* so we just accept the race.
* Instead of going through all shutdown stages we go
* working -> shutdownscanner
* and if the user presses ctr-c again we set
* shutdownImmediate
* this way either:
* * the shutdown is continuing, give it another chance
* * the shutdown was not quick enough, the user presses ctr-c again
*/
match WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::KeepWorking => WORK_STATE.store(
WorkState::ShutdownScannerGraceful,
::std::sync::atomic::Ordering::Relaxed,
),
_ => WORK_STATE.store(
WorkState::ShutdownImmediate,
::std::sync::atomic::Ordering::Relaxed,
),
};
}
pub async fn entry( pub async fn entry(
logger: Arc<::slog::Logger>, logger: Arc<::slog::Logger>,
arc_cfg: Arc<dfim::state::cfg::Cfg>, arc_cfg: Arc<::libdfim::state::cfg::Cfg>,
) -> Result<(), ::std::io::Error> { ) -> Result<(), ::std::io::Error> {
let sig_action = ::nix::sys::signal::SigAction::new( let (tx, rx) = ::tokio::sync::mpsc::unbounded_channel::<DeviceMsg>();
::nix::sys::signal::SigHandler::Handler(handle_sigint), let arc_rx = Arc::new(::tokio::sync::Mutex::new(rx));
::nix::sys::signal::SaFlags::empty(),
::nix::sys::signal::SigSet::empty(), let scanner_notify = Arc::new(::tokio::sync::Notify::new());
);
unsafe {
::nix::sys::signal::sigaction(::nix::sys::signal::SIGINT, &sig_action);
}
let (tx, rx) = mpsc::channel::<DeviceMsg>(); let scan_sig = scanner_notify.clone();
let cvar = Arc::new(Condvar::new()); let sig_handler = ::tokio::spawn(async move {
let arc_rx = Arc::new(Mutex::new(rx)); loop {
match ::tokio::signal::ctrl_c().await {
Err(_) => return,
Ok(_) => {}
}
println!("got signal!");
match WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::KeepWorking => WORK_STATE.store(
WorkState::ShutdownScannerGraceful,
::std::sync::atomic::Ordering::Relaxed,
),
_ => WORK_STATE.store(
WorkState::ShutdownImmediate,
::std::sync::atomic::Ordering::Relaxed,
),
};
scan_sig.notify_waiters();
}
});
let tk_local = ::tokio::task::LocalSet::new(); let scanner_h = scanner::scanner(
let scan_h = tk_local.spawn_local(scanner::scanner(
logger.clone(), logger.clone(),
scanner_notify.clone(),
arc_cfg.clone(), arc_cfg.clone(),
tx, tx,
cvar.clone(), );
)); let drone_h =
let drone_h = tokio::spawn(drone::drone( ::tokio::spawn(drone::drone(logger.clone(), arc_cfg.clone(), arc_rx));
logger.clone(),
arc_cfg.clone(),
arc_rx,
cvar.clone(),
));
tk_local.await; let (scan_exit, drone_exit) = ::tokio::join!(scanner_h, drone_h);
let scan_exit = scan_h.await; sig_handler.abort();
match WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::ShutdownImmediate => { /* don't downgrade the shutdown */ }
_ => WORK_STATE.store(
WorkState::ShutdownAllGraceful,
::std::sync::atomic::Ordering::Relaxed,
),
};
cvar.notify_all();
let drone_exit = drone_h.await;
match scan_exit { match scan_exit {
Ok(scan_res) => match scan_res { Ok(()) => match drone_exit {
Ok(()) => match drone_exit { Ok(drone_res) => match drone_res {
Ok(drone_res) => match drone_res { Ok(()) => Ok(()),
Ok(()) => Ok(()), Err(e) => Err(e),
Err(drone_err) => Err(drone_err),
},
Err(drone_join) => Err(drone_join.into()),
}, },
Err(scan_err) => Err(scan_err), Err(drone_join) => Err(drone_join.into()),
}, },
Err(scan_join) => Err(scan_join.into()), Err(scan_join) => Err(scan_join.into()),
} }
......
...@@ -18,98 +18,150 @@ ...@@ -18,98 +18,150 @@
*/ */
use super::DeviceMsg; use super::DeviceMsg;
use ::std::sync::{mpsc, Arc, Condvar}; use super::WorkState;
use ::std::sync::Arc;
use ::tokio::sync::mpsc;
pub async fn scanner( pub async fn scanner(
logger: Arc<::slog::Logger>, logger: Arc<::slog::Logger>,
cfg: Arc<dfim::state::cfg::Cfg>, work_notify: Arc<::tokio::sync::Notify>,
tx: mpsc::Sender<DeviceMsg>, cfg: Arc<::libdfim::state::cfg::Cfg>,
cvar: Arc<Condvar>, tx: mpsc::UnboundedSender<DeviceMsg>,
) -> Result<(), ::std::io::Error> { ) -> Result<(), ::std::io::Error> {
// First setup the udev monitors, then enumerate all devices one, then just let count_initialized = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
// wait for udev events
let (mon_tx, mon_rx) = mpsc::unbounded_channel::<::std::path::PathBuf>();
// First setup the udev monitors, then enumerate all devices once,
// then just wait for udev events
::slog::debug!(logger, "Spawning local udev monitors"); ::slog::debug!(logger, "Spawning local udev monitors");
let tk_local = ::tokio::task::LocalSet::new();
let mut mon_vec = Vec::with_capacity(cfg.devices.len()); let mut mon_vec = Vec::with_capacity(cfg.devices.len());
for dev in &cfg.devices { for dev_idx in 0..cfg.devices.len() {
//let mut mon_builder = ::udev::MonitorBuilder::new().unwrap(); let th_join = tk_local.spawn_local(monitor_udev(
let mut mon_builder = ::tokio_udev::MonitorBuilder::new().unwrap();
match &dev.subsystem {
None => {}
Some(ss) => match &dev.devtype {
None => {
mon_builder = match mon_builder.match_subsystem(ss.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\"",
ss
);
return Err(e);
}
};
}
Some(dt) => {
mon_builder = match mon_builder
.match_subsystem_devtype(ss.to_str(), dt.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\" with devtype \
\"{}\"",
ss,
dt
);
return Err(e);
}
};
}
},
}
for tag in &dev.tags {
mon_builder = match mon_builder.match_tag(&tag) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't filter with tag \"{:?}\"",
&tag
);
return Err(e);
}
}
}
let mon_sock = match mon_builder.listen() {
Ok(sock) => sock,
Err(e) => {
::slog::error!(logger, "Can't create socket to monitor dev");
return Err(e);
}
};
let async_mon =
::tokio_udev::AsyncMonitorSocket::new(mon_sock).unwrap();
// let th_join = ::tokio::spawn(monitor_udev(
let th_join = ::tokio::task::spawn_local(monitor_udev(
logger.clone(), logger.clone(),
cfg.clone(), cfg.clone(),
async_mon, dev_idx,
tx.clone(), count_initialized.clone(),
cvar.clone(), work_notify.clone(),
mon_tx.clone(),
)); ));
mon_vec.push(th_join); mon_vec.push(th_join);
} }
let _enumerate_th = tk_local.spawn_local(enumerate(
logger.clone(),
cfg.clone(),
count_initialized.clone(),
tx.clone(),
));
let _get_filter_join = tk_local.spawn_local(get_and_filter(
logger.clone(),
cfg.clone(),
count_initialized.clone(),
work_notify,
mon_rx,
tx,
mon_vec,
));
tk_local.await;
::slog::debug!(logger, "Scanner exiting");
Ok(())
}
async fn get_and_filter(
logger: Arc<::slog::Logger>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
work_notify: Arc<::tokio::sync::Notify>,
mut mon_rx: mpsc::UnboundedReceiver<::std::path::PathBuf>,
tx: mpsc::UnboundedSender<DeviceMsg>,
mon_vec: Vec<::tokio::task::JoinHandle<Result<(), ::std::io::Error>>>,
) -> Result<(), ::std::io::Error> {
// don't process anything unless the dev monitor have started
while count_initialized.load(atomic::Ordering::Relaxed) != cfg.devices.len()
{
::std::thread::sleep(::std::time::Duration::from_millis(50));
}
loop {
// This looks complicated, but basically:
// * we have two mpsc queues:
// * one from the udev socket listeners,
// * one to give work to the drone(s)
// * we use a conditional variable/mutex to wake up when the udev
// sockets have a new device
// * we use the same cond.var/mutex to be notified on the WORK_STATE
// changes
if super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed)
!= WorkState::KeepWorking
{
break;
}
let work = match mon_rx.try_recv() {
Ok(path) => ::udev::Device::from_syspath(&path).unwrap(),
Err(::tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
break;
}
Err(::tokio::sync::mpsc::error::TryRecvError::Empty) => {
::slog::trace!(logger, "waiting for dev to filter");
work_notify.notified().await;
//let lock = work_notify.0.lock().unwrap();
//let _ = work_notify.1.wait(lock).unwrap();
continue;
}
};
::slog::trace!(logger, "Got device to filter: {:?}", work.syspath());
match filter_devices(&logger, cfg.clone(), work) {
Ok(dev) => {
match tx.send(DeviceMsg {
id: dev.0,
path: dev.1.syspath().into(),
}) {
Ok(()) => {}
Err(_) => {
::slog::error!(
logger,
"drones died before the scanner"
);
break;
}
}
}
Err(_) => {} // device did not match the filter
}
}
match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::KeepWorking | WorkState::ShutdownScannerGraceful => {
super::WORK_STATE.store(
WorkState::ShutdownAllGraceful,
atomic::Ordering::Relaxed,
);
}
_ => {}
}
::slog::trace!(logger, "Dropping all udev monitors");
for th in mon_vec {
th.abort();
}
Ok(())
}
async fn enumerate(
logger: Arc<::slog::Logger>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
tx: mpsc::UnboundedSender<DeviceMsg>,
) -> Result<(), ::std::io::Error> {
// before scanning, make sure the monitoring have already started
while count_initialized.load(atomic::Ordering::Relaxed) != cfg.devices.len()
{
::std::thread::sleep(::std::time::Duration::from_millis(50));
}
// enumerate all devices // enumerate all devices
let udev_ctx = ::udev::Udev::new().unwrap(); let udev_ctx = ::udev::Udev::new().unwrap();
let mut devices = ::udev::Enumerator::with_udev(udev_ctx.clone())?; let mut devices = ::udev::Enumerator::with_udev(udev_ctx.clone())?;
devices.match_is_initialized().unwrap(); devices.match_is_initialized().unwrap();
let dev_list = devices.scan_devices()?; let dev_list = devices.scan_devices()?;
::slog::debug!(logger, "Scanning all devices...");
for dev in dev_list { for dev in dev_list {
match filter_devices(&logger, cfg.clone(), dev) { match filter_devices(&logger, cfg.clone(), dev) {
Ok(dev) => { Ok(dev) => {
...@@ -127,81 +179,134 @@ pub async fn scanner( ...@@ -127,81 +179,134 @@ pub async fn scanner(
)); ));
} }
} }
cvar.notify_one();
} }
Err(()) => {} Err(()) => {}
} }
} }
for th in mon_vec { ::slog::trace!(logger, "All devicies enumerated");
th.await;
}
Ok(()) Ok(())
} }
async fn monitor_udev( async fn monitor_udev(
logger: Arc<::slog::Logger>, logger: Arc<::slog::Logger>,
cfg: Arc<dfim::state::cfg::Cfg>, cfg: Arc<::libdfim::state::cfg::Cfg>,
mut async_mon: ::tokio_udev::AsyncMonitorSocket, dev_idx: usize,
tx: mpsc::Sender<DeviceMsg>, count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
cvar: Arc<Condvar>, work_notify: Arc<::tokio::sync::Notify>,
mon_tx: mpsc::UnboundedSender<::std::path::PathBuf>,
) -> Result<(), ::std::io::Error> { ) -> Result<(), ::std::io::Error> {
let dev = &cfg.devices[dev_idx];
let mut mon_builder = ::tokio_udev::MonitorBuilder::new().unwrap();
match &dev.subsystem {
None => {}
Some(ss) => match &dev.devtype {
None => {
mon_builder = match mon_builder.match_subsystem(ss.to_str()) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\"",
ss
);
return Err(e);
}
};
}
Some(dt) => {
mon_builder = match mon_builder
.match_subsystem_devtype(ss.to_str(), dt.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\" with devtype \
\"{}\"",
ss,
dt
);
return Err(e);
}
};
}
},
}
for tag in &dev.tags {
mon_builder = match mon_builder.match_tag(&tag) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(logger, "Can't filter with tag \"{:?}\"", &tag);
return Err(e);
}
}
}
let mon_sock = match mon_builder.listen() {
Ok(sock) => sock,
Err(e) => {
::slog::error!(logger, "Can't create socket to monitor dev");
return Err(e);
}
};
let mut async_mon =
::tokio_udev::AsyncMonitorSocket::new(mon_sock).unwrap();
count_initialized.fetch_add(1, atomic::Ordering::Relaxed);
loop { loop {
match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) { match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
super::WorkState::KeepWorking => {} WorkState::KeepWorking => {}
super::WorkState::ShutdownImmediate WorkState::ShutdownImmediate
| super::WorkState::ShutdownAllGraceful | WorkState::ShutdownAllGraceful
| super::WorkState::ShutdownScannerGraceful => break, | WorkState::ShutdownScannerGraceful => break,
} }
use ::tokio_stream::StreamExt; use ::tokio_stream::StreamExt;
while let Some(res_event) = async_mon.next().await { let res_event = match async_mon.next().await {
match res_event { Some(res_event) => res_event,
Ok(event) => { None => continue,
match filter_devices(&logger, cfg.clone(), event.device()) { };
Ok(dev) => { match res_event {
match tx.send(DeviceMsg { Ok(event) => {
id: dev.0, match mon_tx.send(event.device().syspath().to_owned()) {
path: dev.1.syspath().into(), Ok(()) => {}
}) { Err(_) => {
Ok(()) => {} ::slog::error!(
Err(_) => { logger,
// the receiver has been closed "monitor queue reader dropped before the writer"
// it's pointless to do any more work );
return Err(::std::io::Error::new( // the receiver has been closed
::std::io::ErrorKind::ConnectionReset, // it's pointless to do any more work
"work queue receiver has been closed", return Err(::std::io::Error::new(
)); ::std::io::ErrorKind::ConnectionReset,
} "work queue receiver has been closed",
} ));
cvar.notify_one();
}
Err(_) => {}
} }
} }
Err(_) => { work_notify.notify_one();
// the receiver has been closed }
// it's pointless to do any more work Err(_) => {
return Err(::std::io::Error::new( // the receiver has been closed
::std::io::ErrorKind::ConnectionReset, // it's pointless to do any more work
"udev socket reset", ::slog::error!(logger, "monitor udev socket reset");
)); return Err(::std::io::Error::new(
} ::std::io::ErrorKind::ConnectionReset,
"udev socket reset",
));
} }
} }
} }
::slog::trace!(logger, "udev monitor exited");
Ok(()) Ok(())
} }
fn filter_devices( fn filter_devices(
logger: &::slog::Logger, logger: &::slog::Logger,
cfg: Arc<dfim::state::cfg::Cfg>, cfg: Arc<::libdfim::state::cfg::Cfg>,
dev: ::udev::Device, dev: ::udev::Device,
) -> Result<(String, ::udev::Device), ()> { ) -> Result<(String, ::udev::Device), ()> {
if !dev.is_initialized() { if !dev.is_initialized() {
return Err(()); return Err(());
} }
use dfim::config::device::Device; use ::libdfim::config::device::Device;
let mut last_matched: Option<&Device> = None; let mut last_matched: Option<&Device> = None;
'devloop: for cfg_dev in &cfg.devices { 'devloop: for cfg_dev in &cfg.devices {
...@@ -238,7 +343,7 @@ fn filter_devices( ...@@ -238,7 +343,7 @@ fn filter_devices(
Some(str_value) => str_value, Some(str_value) => str_value,
None => continue 'devloop, None => continue 'devloop,
}; };
use dfim::config::device::AttributeCheck; use ::libdfim::config::device::AttributeCheck;
match attr { match attr {
AttributeCheck::Ok(a) => { AttributeCheck::Ok(a) => {
if !a.value.is_match(str_value) { if !a.value.is_match(str_value) {
...@@ -274,7 +379,7 @@ fn filter_devices( ...@@ -274,7 +379,7 @@ fn filter_devices(
Some(str_value) => str_value, Some(str_value) => str_value,
None => continue 'devloop, None => continue 'devloop,
}; };
use dfim::config::device::PropertyCheck; use ::libdfim::config::device::PropertyCheck;
match prop { match prop {
PropertyCheck::Ok(p) => { PropertyCheck::Ok(p) => {
if !p.value.is_match(str_value) { if !p.value.is_match(str_value) {
...@@ -302,7 +407,7 @@ fn filter_devices( ...@@ -302,7 +407,7 @@ fn filter_devices(
None => continue 'devloop, None => continue 'devloop,
}; };
for val in list_str.split(':') { for val in list_str.split(':') {
use dfim::config::device::TagCheck; use ::libdfim::config::device::TagCheck;
match tag { match tag {
TagCheck::Ok(raw_tag) => { TagCheck::Ok(raw_tag) => {
if val == raw_tag { if val == raw_tag {
...@@ -329,7 +434,7 @@ fn filter_devices( ...@@ -329,7 +434,7 @@ fn filter_devices(
None => continue 'devloop, None => continue 'devloop,
}; };
for val in list_str.split(':') { for val in list_str.split(':') {
use dfim::config::device::TagCheck; use ::libdfim::config::device::TagCheck;
match ctag { match ctag {
TagCheck::Ok(raw_tag) => { TagCheck::Ok(raw_tag) => {
if val == raw_tag { if val == raw_tag {
......
[package]
name = "libdfim"
version = "0.1.0"
edition = "2021"
authors = [ "Luca Fulchir <luker@fenrirproject.org>" ]
description = "Device&Filesystem Initialization&Monitoring"
[lib]
crate-type = ["rlib","dylib"]
[dependencies]
#gpt = "3.0"
gpt = { version = "3.0", git = "https://github.com/LucaFulchir/gpt", branch = "custom_part_type" }
nix = "0.23"
num-traits = "0.2"
num-derive = "0.3"
regex = "1"
ron = "0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"
serde_with = "1.11"
serde_regex = "1.1"
slog = { version = "2.7", features = ["max_level_trace", "release_max_level_warn"] }
slog-async= "2.6"
slog-term="2.6"
slog-syslog="0.12"
strum = { version = "0", features = ["derive"] } #used to convert enum to strings
strum_macros = { version = "0" } #used to convert enum to strings
thiserror = "1.0"
udev ="0.6"
uuid = { version = "0.8", features = ["serde", "v4"] }