Newer
Older
/*
* Copyright (c) 2021, Luca Fulchir <luker@fenrirproject.org>
*
* This file is part of dfim.
*
* dfim is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dfim is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dfim. If not, see <https://www.gnu.org/licenses/>.
*/
use super::DeviceMsg;
pub async fn scanner(
logger: Arc<::slog::Logger>,
cfg: Arc<dfim::config::Config>,
tx: mpsc::Sender<DeviceMsg>,
cvar: Arc<Condvar>,
) -> Result<(), ::std::io::Error> {
// First setup the udev monitors, then enumerate all devices one, then just
// wait for udev events
::slog::debug!(logger, "Spawning local udev monitors");
let mut mon_vec = Vec::with_capacity(cfg.devices.len());
//let mut mon_builder = ::udev::MonitorBuilder::new().unwrap();
let mut mon_builder = ::tokio_udev::MonitorBuilder::new().unwrap();
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
match &dev.subsystem {
None => {}
Some(ss) => match &dev.devtype {
None => {
mon_builder = match mon_builder.match_subsystem(ss.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\"",
ss
);
return Err(e);
}
};
}
Some(dt) => {
mon_builder = match mon_builder
.match_subsystem_devtype(ss.to_str(), dt.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\" with devtype \
\"{}\"",
ss,
dt
);
return Err(e);
}
};
}
},
}
for tag in &dev.tags {
mon_builder = match mon_builder.match_tag(&tag.id) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't filter with tag \"{}\"",
&tag.id
);
return Err(e);
}
}
}
Ok(sock) => sock,
Err(e) => {
::slog::error!(logger, "Can't create socket to monitor dev");
return Err(e);
}
};
let async_mon =
::tokio_udev::AsyncMonitorSocket::new(mon_sock).unwrap();
// let th_join = ::tokio::spawn(monitor_udev(
let th_join = ::tokio::task::spawn_local(monitor_udev(
logger.clone(),
async_mon,
tx.clone(),
cvar.clone(),
));
mon_vec.push(th_join);
let mut devices = ::udev::Enumerator::with_udev(udev_ctx.clone())?;
devices.match_is_initialized().unwrap();
let dev_list = devices.scan_devices()?;
::slog::debug!(logger, "Scanning all devices...");
match filter_devices(cfg.clone(), dev) {
Ok(dev) => {
Ok(()) => {}
Err(_) => {
// the receiver has been closed
// it's pointless to do any more work
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"work queue receiver has been closed",
));
}
}
cvar.notify_one();
}
Err(()) => {}
}
for th in mon_vec {
th.await;
}
Ok(())
}
async fn monitor_udev(
logger: Arc<::slog::Logger>,
mut async_mon: ::tokio_udev::AsyncMonitorSocket,
tx: mpsc::Sender<DeviceMsg>,
cvar: Arc<Condvar>,
) -> Result<(), ::std::io::Error> {
loop {
match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
super::WorkState::KeepWorking => {}
super::WorkState::ShutdownImmediate
| super::WorkState::ShutdownAllGraceful
| super::WorkState::ShutdownScannerGraceful => break,
}
use ::tokio_stream::StreamExt;
while let Some(res_event) = async_mon.next().await {
match res_event {
Ok(event) => {
match filter_devices(cfg.clone(), event.device()) {
Ok(dev) => {
match tx.send(DeviceMsg {
id: dev.0,
path: dev.1.syspath().into(),
}) {
Ok(()) => {}
Err(_) => {
// the receiver has been closed
// it's pointless to do any more work
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"work queue receiver has been closed",
));
}
}
cvar.notify_one();
}
}
Err(_) => {
// the receiver has been closed
// it's pointless to do any more work
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"udev socket reset",
));
}
}
}
}
fn filter_devices(
cfg: Arc<dfim::config::Config>,
dev: ::udev::Device,
) -> Result<(String, ::udev::Device), ()> {
'devloop: for cfg_dev in &cfg.devices {
match &cfg_dev.subsystem {
None => {}
Some(s) => {
let os_s: ::std::ffi::OsString = s.to_str().into();
if dev.subsystem() != Some(os_s.as_os_str()) {
}
}
}
match &cfg_dev.devtype {
None => {}
Some(dt) => {
let os_dt: ::std::ffi::OsString = dt.to_str().into();
if dev.devtype() != Some(os_dt.as_os_str()) {
let path = match dev.devpath().to_str() {
Some(path) => path,
}
for attr in &cfg_dev.attributes {
let os_name: ::std::ffi::OsString = attr.name.clone().into();
match dev.attribute_value(os_name) {
Some(val) => {
let os_value: ::std::ffi::OsString =
attr.value.clone().into();
if val != os_value {
}
}
}
for attr in &cfg_dev.exclude_attributes {
let os_name: ::std::ffi::OsString = attr.name.clone().into();
match dev.attribute_value(os_name) {
Some(val) => {
let os_value: ::std::ffi::OsString =
attr.value.clone().into();
if val == os_value {
}
}
None => {}
}
}
match &cfg_dev.sysname {
None => {}
Some(sn) => {
let os_sn: ::std::ffi::OsString = sn.as_str().into();
if dev.sysname() != os_sn.as_os_str() {
}
}
}
for prop in &cfg_dev.properties {
let os_name: ::std::ffi::OsString = prop.name.clone().into();
match dev.property_value(os_name) {
Some(val) => {
let os_value: ::std::ffi::OsString =
prop.value.clone().into();
if val != os_value {