Newer
Older
/*
* Copyright (c) 2021, Luca Fulchir <luker@fenrirproject.org>
*
* This file is part of dfim.
*
* dfim is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* dfim is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with dfim. If not, see <https://www.gnu.org/licenses/>.
*/
use super::DeviceMsg;
use super::WorkState;
use ::std::sync::Arc;
use ::tokio::sync::mpsc;
pub async fn scanner(
logger: Arc<::slog::Logger>,
let count_initialized = Arc::new(::std::sync::atomic::AtomicUsize::new(0));
let (mon_tx, mon_rx) = mpsc::unbounded_channel::<::std::path::PathBuf>();
// First setup the udev monitors, then enumerate all devices once,
// then just wait for udev events
::slog::debug!(logger, "Spawning local udev monitors");
let mut mon_vec = Vec::with_capacity(cfg.devices.len());
for dev_idx in 0..cfg.devices.len() {
let th_join = tk_local.spawn_local(monitor_udev(
dev_idx,
count_initialized.clone(),
work_notify.clone(),
mon_tx.clone(),
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
let _enumerate_th = tk_local.spawn_local(enumerate(
logger.clone(),
cfg.clone(),
count_initialized.clone(),
tx.clone(),
));
let _get_filter_join = tk_local.spawn_local(get_and_filter(
logger.clone(),
cfg.clone(),
count_initialized.clone(),
work_notify,
mon_rx,
tx,
mon_vec,
));
tk_local.await;
::slog::debug!(logger, "Scanner exiting");
Ok(())
}
async fn get_and_filter(
logger: Arc<::slog::Logger>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
work_notify: Arc<::tokio::sync::Notify>,
mut mon_rx: mpsc::UnboundedReceiver<::std::path::PathBuf>,
tx: mpsc::UnboundedSender<DeviceMsg>,
mon_vec: Vec<::tokio::task::JoinHandle<Result<(), ::std::io::Error>>>,
) -> Result<(), ::std::io::Error> {
// don't process anything unless the dev monitor have started
while count_initialized.load(atomic::Ordering::Relaxed) != cfg.devices.len()
{
::std::thread::sleep(::std::time::Duration::from_millis(50));
}
loop {
// This looks complicated, but basically:
// * we have two mpsc queues:
// * one from the udev socket listeners,
// * one to give work to the drone(s)
// * we use a conditional variable/mutex to wake up when the udev
// sockets have a new device
// * we use the same cond.var/mutex to be notified on the WORK_STATE
// changes
if super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed)
!= WorkState::KeepWorking
{
break;
}
let work = match mon_rx.try_recv() {
Ok(path) => ::udev::Device::from_syspath(&path).unwrap(),
Err(::tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
break;
}
Err(::tokio::sync::mpsc::error::TryRecvError::Empty) => {
::slog::trace!(logger, "waiting for dev to filter");
work_notify.notified().await;
//let lock = work_notify.0.lock().unwrap();
//let _ = work_notify.1.wait(lock).unwrap();
continue;
}
};
::slog::trace!(logger, "Got device to filter: {:?}", work.syspath());
match filter_devices(&logger, cfg.clone(), work) {
Ok(dev) => {
match tx.send(DeviceMsg {
id: dev.0,
path: dev.1.syspath().into(),
}) {
Ok(()) => {}
Err(_) => {
::slog::error!(
logger,
"drones died before the scanner"
);
break;
}
}
}
Err(_) => {} // device did not match the filter
}
}
match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::KeepWorking | WorkState::ShutdownScannerGraceful => {
super::WORK_STATE.store(
WorkState::ShutdownAllGraceful,
atomic::Ordering::Relaxed,
);
}
_ => {}
}
::slog::trace!(logger, "Dropping all udev monitors");
for th in mon_vec {
th.abort();
}
Ok(())
}
async fn enumerate(
logger: Arc<::slog::Logger>,
cfg: Arc<::libdfim::state::cfg::Cfg>,
count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
tx: mpsc::UnboundedSender<DeviceMsg>,
) -> Result<(), ::std::io::Error> {
// before scanning, make sure the monitoring have already started
while count_initialized.load(atomic::Ordering::Relaxed) != cfg.devices.len()
{
::std::thread::sleep(::std::time::Duration::from_millis(50));
}
let mut devices = ::udev::Enumerator::with_udev(udev_ctx.clone())?;
devices.match_is_initialized().unwrap();
let dev_list = devices.scan_devices()?;
for dev in dev_list {
match filter_devices(&logger, cfg.clone(), dev) {
Ok(()) => {}
Err(_) => {
// the receiver has been closed
// it's pointless to do any more work
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"work queue receiver has been closed",
));
}
}
}
Err(()) => {}
}
async fn monitor_udev(
logger: Arc<::slog::Logger>,
dev_idx: usize,
count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
work_notify: Arc<::tokio::sync::Notify>,
mon_tx: mpsc::UnboundedSender<::std::path::PathBuf>,
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
let dev = &cfg.devices[dev_idx];
let mut mon_builder = ::tokio_udev::MonitorBuilder::new().unwrap();
match &dev.subsystem {
None => {}
Some(ss) => match &dev.devtype {
None => {
mon_builder = match mon_builder.match_subsystem(ss.to_str()) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\"",
ss
);
return Err(e);
}
};
}
Some(dt) => {
mon_builder = match mon_builder
.match_subsystem_devtype(ss.to_str(), dt.to_str())
{
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(
logger,
"Can't monitor subsystem \"{}\" with devtype \
\"{}\"",
ss,
dt
);
return Err(e);
}
};
}
},
}
for tag in &dev.tags {
mon_builder = match mon_builder.match_tag(&tag) {
Ok(new_mon_builder) => new_mon_builder,
Err(e) => {
::slog::error!(logger, "Can't filter with tag \"{:?}\"", &tag);
return Err(e);
}
}
}
let mon_sock = match mon_builder.listen() {
Ok(sock) => sock,
Err(e) => {
::slog::error!(logger, "Can't create socket to monitor dev");
return Err(e);
}
};
let mut async_mon =
::tokio_udev::AsyncMonitorSocket::new(mon_sock).unwrap();
count_initialized.fetch_add(1, atomic::Ordering::Relaxed);
loop {
match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
WorkState::KeepWorking => {}
WorkState::ShutdownImmediate
| WorkState::ShutdownAllGraceful
| WorkState::ShutdownScannerGraceful => break,
let res_event = match async_mon.next().await {
Some(res_event) => res_event,
None => continue,
};
match res_event {
Ok(event) => {
match mon_tx.send(event.device().syspath().to_owned()) {
Ok(()) => {}
Err(_) => {
::slog::error!(
logger,
"monitor queue reader dropped before the writer"
);
// the receiver has been closed
// it's pointless to do any more work
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"work queue receiver has been closed",
));
work_notify.notify_one();
}
Err(_) => {
// the receiver has been closed
// it's pointless to do any more work
::slog::error!(logger, "monitor udev socket reset");
return Err(::std::io::Error::new(
::std::io::ErrorKind::ConnectionReset,
"udev socket reset",
));
if !dev.is_initialized() {
return Err(());
}
let mut last_matched: Option<&Device> = None;
match &cfg_dev.subsystem {
None => {}
Some(s) => {
let os_s: ::std::ffi::OsString = s.to_str().into();
if dev.subsystem() != Some(os_s.as_os_str()) {
}
}
}
match &cfg_dev.devtype {
None => {}
Some(dt) => {
let os_dt: ::std::ffi::OsString = dt.to_str().into();
if dev.devtype() != Some(os_dt.as_os_str()) {
let path = match dev.devpath().to_str() {
Some(path) => path,
let os_name: ::std::ffi::OsString = attr.name().into();
match dev.attribute_value(os_name) {
Some(val) => {
let str_value = match val.to_str() {
Some(str_value) => str_value,
None => continue 'devloop,
};
match attr {
AttributeCheck::Ok(a) => {
if !a.value.is_match(str_value) {
continue 'devloop;
}
}
AttributeCheck::Exclude(a) => {
if a.value.is_match(str_value) {
continue 'devloop;
}
}
}
}
}
match &cfg_dev.sysname {
None => {}
Some(sn) => {
let os_sn: ::std::ffi::OsString = sn.as_str().into();
if dev.sysname() != os_sn.as_os_str() {
}
}
}
for prop in &cfg_dev.properties {
let os_name: ::std::ffi::OsString = prop.name().into();
match dev.property_value(os_name) {
Some(val) => {
let str_value = match val.to_str() {
Some(str_value) => str_value,
None => continue 'devloop,
};
match prop {
PropertyCheck::Ok(p) => {
if !p.value.is_match(str_value) {
continue 'devloop;
}
}
PropertyCheck::Exclude(p) => {
if p.value.is_match(str_value) {
continue 'devloop;
}
}
'tagloop: for tag in &cfg_dev.tags {
match dev.property_value("TAGS") {
None => continue 'devloop,
Some(list_os_str) => {
let list_str = match list_os_str.to_str() {
Some(list_str) => list_str,
None => continue 'devloop,
};
for val in list_str.split(':') {
continue 'tagloop;
}
}
TagCheck::Exclude(raw_tag) => {
if val == raw_tag {
continue 'devloop;
}
}
}
}
continue 'devloop;
}
}
}
'ctagloop: for ctag in &cfg_dev.current_tags {
match dev.property_value("CURRENT_TAGS") {
None => continue 'devloop,
Some(list_os_str) => {
let list_str = match list_os_str.to_str() {
Some(list_str) => list_str,
None => continue 'devloop,
};
for val in list_str.split(':') {
continue 'ctagloop;
}
}
TagCheck::Exclude(raw_tag) => {
if val == raw_tag {
continue 'devloop;
}
}
}
}
continue 'devloop;
}
}
}
if let Some(old_match) = last_matched {
::slog::error!(
logger,
"device \"{:?}\" has multiple matches: \"{}\" - \"{}\"",
dev.devpath(),
old_match.id,
cfg_dev.id
);
return Err(());
}
last_matched = Some(&cfg_dev);
}
match last_matched {
None => Err(()),
Some(cfg_dev) => {
"FOUND device: path:{:?} - initialized {:?} - devnum: {:?} - \
syspath: {:?} - devnode: {:?} - subsystem: {:?} - sysname: \
{:?} - sysnum: {:?} - devtype: {:?} - driver: {:?}",
dev.devpath(),
dev.is_initialized(),
dev.devnum(),
dev.syspath(),
dev.devnode(),
dev.subsystem(),
dev.sysname(),
dev.sysnum(),
dev.devtype(),
dev.driver()
);
for a in dev.attributes() {
::slog::trace!(logger, "A: {:?} - {:?}", a.name(), a.value());
}
for p in dev.properties() {
::slog::trace!(logger, "P: {:?} - {:?}", p.name(), p.value());
}
Ok((cfg_dev.id.to_owned(), dev))
}