Skip to content
scanner.rs 17.5 KiB
Newer Older
Luker's avatar
Luker committed
/*
 * Copyright (c) 2021, Luca Fulchir <luker@fenrirproject.org>
 *
 * This file is part of dfim.
 *
 * dfim is free software: you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 *
 * dfim is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with dfim.  If not, see <https://www.gnu.org/licenses/>.
 */

use super::DeviceMsg;
Luker's avatar
Luker committed
use super::WorkState;
use ::std::sync::Arc;
use ::tokio::sync::mpsc;
Luker's avatar
Luker committed

pub async fn scanner(
    logger: Arc<::slog::Logger>,
Luker's avatar
Luker committed
    work_notify: Arc<::tokio::sync::Notify>,
Luker's avatar
Luker committed
    cfg: Arc<::libdfim::state::cfg::Cfg>,
Luker's avatar
Luker committed
    tx: mpsc::UnboundedSender<DeviceMsg>,
Luker's avatar
Luker committed
) -> Result<(), ::std::io::Error> {
Luker's avatar
Luker committed
    let count_initialized = Arc::new(::std::sync::atomic::AtomicUsize::new(0));

    let (mon_tx, mon_rx) = mpsc::unbounded_channel::<::std::path::PathBuf>();
    // First setup the udev monitors, then enumerate all devices once,
    // then just wait for udev events
Luker's avatar
Luker committed
    ::slog::debug!(logger, "Spawning local udev monitors");
Luker's avatar
Luker committed
    let tk_local = ::tokio::task::LocalSet::new();
Luker's avatar
Luker committed
    let mut mon_vec = Vec::with_capacity(cfg.devices.len());
Luker's avatar
Luker committed
    for dev_idx in 0..cfg.devices.len() {
        let th_join = tk_local.spawn_local(monitor_udev(
Luker's avatar
Luker committed
            logger.clone(),
Luker's avatar
Luker committed
            cfg.clone(),
Luker's avatar
Luker committed
            dev_idx,
            count_initialized.clone(),
            work_notify.clone(),
            mon_tx.clone(),
Luker's avatar
Luker committed
        ));
        mon_vec.push(th_join);
Luker's avatar
Luker committed
    }
Luker's avatar
Luker committed
    let _enumerate_th = tk_local.spawn_local(enumerate(
        logger.clone(),
        cfg.clone(),
        count_initialized.clone(),
        tx.clone(),
    ));
    let _get_filter_join = tk_local.spawn_local(get_and_filter(
        logger.clone(),
        cfg.clone(),
        count_initialized.clone(),
        work_notify,
        mon_rx,
        tx,
        mon_vec,
    ));
    tk_local.await;

    ::slog::debug!(logger, "Scanner exiting");
    Ok(())
}

async fn get_and_filter(
    logger: Arc<::slog::Logger>,
    cfg: Arc<::libdfim::state::cfg::Cfg>,
    count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
    work_notify: Arc<::tokio::sync::Notify>,
    mut mon_rx: mpsc::UnboundedReceiver<::std::path::PathBuf>,
    tx: mpsc::UnboundedSender<DeviceMsg>,
    mon_vec: Vec<::tokio::task::JoinHandle<Result<(), ::std::io::Error>>>,
) -> Result<(), ::std::io::Error> {
    // don't process anything unless the dev monitor have started
    while count_initialized.load(atomic::Ordering::Relaxed) != cfg.devices.len()
    {
        ::std::thread::sleep(::std::time::Duration::from_millis(50));
    }
    loop {
        // This looks complicated, but basically:
        // * we have two mpsc queues:
        //   * one from the udev socket listeners,
        //   * one to give work to the drone(s)
        // * we use a conditional variable/mutex to wake up when the udev
        //   sockets have a new device
        // * we use the same cond.var/mutex to be notified on the WORK_STATE
        //   changes
        if super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed)
            != WorkState::KeepWorking
        {
            break;
        }
        let work = match mon_rx.try_recv() {
            Ok(path) => ::udev::Device::from_syspath(&path).unwrap(),
            Err(::tokio::sync::mpsc::error::TryRecvError::Disconnected) => {
                break;
            }
            Err(::tokio::sync::mpsc::error::TryRecvError::Empty) => {
                ::slog::trace!(logger, "waiting for dev to filter");
                work_notify.notified().await;
                //let lock = work_notify.0.lock().unwrap();
                //let _ = work_notify.1.wait(lock).unwrap();
                continue;
            }
        };
        ::slog::trace!(logger, "Got device to filter: {:?}", work.syspath());
        match filter_devices(&logger, cfg.clone(), work) {
            Ok(dev) => {
                match tx.send(DeviceMsg {
                    id: dev.0,
                    path: dev.1.syspath().into(),
                }) {
                    Ok(()) => {}
                    Err(_) => {
                        ::slog::error!(
                            logger,
                            "drones died before the scanner"
                        );
                        break;
                    }
                }
            }
            Err(_) => {} // device did not match the filter
        }
    }
    match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
        WorkState::KeepWorking | WorkState::ShutdownScannerGraceful => {
            super::WORK_STATE.store(
                WorkState::ShutdownAllGraceful,
                atomic::Ordering::Relaxed,
            );
        }
        _ => {}
    }
    ::slog::trace!(logger, "Dropping all udev monitors");
    for th in mon_vec {
        th.abort();
    }
    Ok(())
}
Luker's avatar
Luker committed
async fn enumerate(
    logger: Arc<::slog::Logger>,
    cfg: Arc<::libdfim::state::cfg::Cfg>,
    count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
    tx: mpsc::UnboundedSender<DeviceMsg>,
) -> Result<(), ::std::io::Error> {
    // before scanning, make sure the monitoring have already started
    while count_initialized.load(atomic::Ordering::Relaxed) != cfg.devices.len()
    {
        ::std::thread::sleep(::std::time::Duration::from_millis(50));
    }
Luker's avatar
Luker committed
    // enumerate all devices
Luker's avatar
Luker committed
    let udev_ctx = ::udev::Udev::new().unwrap();
Luker's avatar
Luker committed
    let mut devices = ::udev::Enumerator::with_udev(udev_ctx.clone())?;
    devices.match_is_initialized().unwrap();

    let dev_list = devices.scan_devices()?;
    for dev in dev_list {
        match filter_devices(&logger, cfg.clone(), dev) {
Luker's avatar
Luker committed
            Ok(dev) => {
Luker's avatar
Luker committed
                match tx.send(DeviceMsg {
Luker's avatar
Luker committed
                    id: dev.0,
                    path: dev.1.syspath().into(),
Luker's avatar
Luker committed
                }) {
Luker's avatar
Luker committed
                    Ok(()) => {}
                    Err(_) => {
                        // the receiver has been closed
                        // it's pointless to do any more work
                        return Err(::std::io::Error::new(
                            ::std::io::ErrorKind::ConnectionReset,
                            "work queue receiver has been closed",
                        ));
                    }
                }
            }
            Err(()) => {}
        }
Luker's avatar
Luker committed
    }
Luker's avatar
Luker committed
    ::slog::trace!(logger, "All devicies enumerated");
Luker's avatar
Luker committed
    Ok(())
}
Luker's avatar
Luker committed
async fn monitor_udev(
    logger: Arc<::slog::Logger>,
Luker's avatar
Luker committed
    cfg: Arc<::libdfim::state::cfg::Cfg>,
Luker's avatar
Luker committed
    dev_idx: usize,
    count_initialized: Arc<::std::sync::atomic::AtomicUsize>,
    work_notify: Arc<::tokio::sync::Notify>,
    mon_tx: mpsc::UnboundedSender<::std::path::PathBuf>,
Luker's avatar
Luker committed
) -> Result<(), ::std::io::Error> {
Luker's avatar
Luker committed
    let dev = &cfg.devices[dev_idx];
    let mut mon_builder = ::tokio_udev::MonitorBuilder::new().unwrap();
    match &dev.subsystem {
        None => {}
        Some(ss) => match &dev.devtype {
            None => {
                mon_builder = match mon_builder.match_subsystem(ss.to_str()) {
                    Ok(new_mon_builder) => new_mon_builder,
                    Err(e) => {
                        ::slog::error!(
                            logger,
                            "Can't monitor subsystem \"{}\"",
                            ss
                        );
                        return Err(e);
                    }
                };
            }
            Some(dt) => {
                mon_builder = match mon_builder
                    .match_subsystem_devtype(ss.to_str(), dt.to_str())
                {
                    Ok(new_mon_builder) => new_mon_builder,
                    Err(e) => {
                        ::slog::error!(
                            logger,
                            "Can't monitor subsystem \"{}\" with devtype \
                             \"{}\"",
                            ss,
                            dt
                        );
                        return Err(e);
                    }
                };
            }
        },
    }
    for tag in &dev.tags {
        mon_builder = match mon_builder.match_tag(&tag) {
            Ok(new_mon_builder) => new_mon_builder,
            Err(e) => {
                ::slog::error!(logger, "Can't filter with tag \"{:?}\"", &tag);
                return Err(e);
            }
        }
    }
    let mon_sock = match mon_builder.listen() {
        Ok(sock) => sock,
        Err(e) => {
            ::slog::error!(logger, "Can't create socket to monitor dev");
            return Err(e);
        }
    };
    let mut async_mon =
        ::tokio_udev::AsyncMonitorSocket::new(mon_sock).unwrap();
    count_initialized.fetch_add(1, atomic::Ordering::Relaxed);
Luker's avatar
Luker committed
    loop {
        match super::WORK_STATE.load(::std::sync::atomic::Ordering::Relaxed) {
Luker's avatar
Luker committed
            WorkState::KeepWorking => {}
            WorkState::ShutdownImmediate
            | WorkState::ShutdownAllGraceful
            | WorkState::ShutdownScannerGraceful => break,
Luker's avatar
Luker committed
        }
        use ::tokio_stream::StreamExt;
Luker's avatar
Luker committed
        let res_event = match async_mon.next().await {
            Some(res_event) => res_event,
            None => continue,
        };
        match res_event {
            Ok(event) => {
                match mon_tx.send(event.device().syspath().to_owned()) {
                    Ok(()) => {}
                    Err(_) => {
                        ::slog::error!(
                            logger,
                            "monitor queue reader dropped before the writer"
                        );
                        // the receiver has been closed
                        // it's pointless to do any more work
                        return Err(::std::io::Error::new(
                            ::std::io::ErrorKind::ConnectionReset,
                            "work queue receiver has been closed",
                        ));
Luker's avatar
Luker committed
                work_notify.notify_one();
            }
            Err(_) => {
                // the receiver has been closed
                // it's pointless to do any more work
                ::slog::error!(logger, "monitor udev socket reset");
                return Err(::std::io::Error::new(
                    ::std::io::ErrorKind::ConnectionReset,
                    "udev socket reset",
                ));
Luker's avatar
Luker committed
    ::slog::trace!(logger, "udev monitor exited");
Luker's avatar
Luker committed
    Ok(())
}
Luker's avatar
Luker committed

fn filter_devices(
    logger: &::slog::Logger,
Luker's avatar
Luker committed
    cfg: Arc<::libdfim::state::cfg::Cfg>,
Luker's avatar
Luker committed
    dev: ::udev::Device,
Luker's avatar
Luker committed
) -> Result<(String, ::udev::Device), ()> {
    if !dev.is_initialized() {
        return Err(());
    }

Luker's avatar
Luker committed
    use ::libdfim::config::device::Device;

    let mut last_matched: Option<&Device> = None;
Luker's avatar
Luker committed
    'devloop: for cfg_dev in &cfg.devices {
Luker's avatar
Luker committed
        match &cfg_dev.subsystem {
            None => {}
            Some(s) => {
                let os_s: ::std::ffi::OsString = s.to_str().into();
                if dev.subsystem() != Some(os_s.as_os_str()) {
Luker's avatar
Luker committed
                    continue 'devloop;
Luker's avatar
Luker committed
                }
            }
        }
        match &cfg_dev.devtype {
            None => {}
            Some(dt) => {
                let os_dt: ::std::ffi::OsString = dt.to_str().into();
                if dev.devtype() != Some(os_dt.as_os_str()) {
Luker's avatar
Luker committed
                    continue 'devloop;
Luker's avatar
Luker committed
        let path = match dev.devpath().to_str() {
            Some(path) => path,
Luker's avatar
Luker committed
            None => continue 'devloop,
Luker's avatar
Luker committed
        };
        if !cfg_dev.path_regex.is_match(path) {
Luker's avatar
Luker committed
            continue 'devloop;
Luker's avatar
Luker committed
        }
        for attr in &cfg_dev.attributes {
            let os_name: ::std::ffi::OsString = attr.name().into();
Luker's avatar
Luker committed
            match dev.attribute_value(os_name) {
                Some(val) => {
                    let str_value = match val.to_str() {
                        Some(str_value) => str_value,
                        None => continue 'devloop,
                    };
Luker's avatar
Luker committed
                    use ::libdfim::config::device::AttributeCheck;
                    match attr {
                        AttributeCheck::Ok(a) => {
                            if !a.value.is_match(str_value) {
                                continue 'devloop;
                            }
                        }
                        AttributeCheck::Exclude(a) => {
                            if a.value.is_match(str_value) {
                                continue 'devloop;
                            }
                        }
Luker's avatar
Luker committed
                    }
                }
                None => {
Luker's avatar
Luker committed
                    continue 'devloop;
Luker's avatar
Luker committed
                }
            }
        }
        match &cfg_dev.sysname {
            None => {}
            Some(sn) => {
                let os_sn: ::std::ffi::OsString = sn.as_str().into();
                if dev.sysname() != os_sn.as_os_str() {
Luker's avatar
Luker committed
                    continue 'devloop;
Luker's avatar
Luker committed
                }
            }
        }
        for prop in &cfg_dev.properties {
            let os_name: ::std::ffi::OsString = prop.name().into();
Luker's avatar
Luker committed
            match dev.property_value(os_name) {
                Some(val) => {
                    let str_value = match val.to_str() {
                        Some(str_value) => str_value,
                        None => continue 'devloop,
                    };
Luker's avatar
Luker committed
                    use ::libdfim::config::device::PropertyCheck;
                    match prop {
                        PropertyCheck::Ok(p) => {
                            if !p.value.is_match(str_value) {
                                continue 'devloop;
                            }
                        }
                        PropertyCheck::Exclude(p) => {
                            if p.value.is_match(str_value) {
                                continue 'devloop;
                            }
                        }
Luker's avatar
Luker committed
                    }
                }
                None => {
Luker's avatar
Luker committed
                    continue 'devloop;
        'tagloop: for tag in &cfg_dev.tags {
            match dev.property_value("TAGS") {
                None => continue 'devloop,
                Some(list_os_str) => {
                    let list_str = match list_os_str.to_str() {
                        Some(list_str) => list_str,
                        None => continue 'devloop,
                    };
                    for val in list_str.split(':') {
Luker's avatar
Luker committed
                        use ::libdfim::config::device::TagCheck;
Luker's avatar
Luker committed
                            TagCheck::Ok(raw_tag) => {
                                if val == raw_tag {
Luker's avatar
Luker committed
                            TagCheck::Exclude(raw_tag) => {
                                if val == raw_tag {
                        }
                    }
                    continue 'devloop;
                }
            }
        }
        'ctagloop: for ctag in &cfg_dev.current_tags {
            match dev.property_value("CURRENT_TAGS") {
                None => continue 'devloop,
                Some(list_os_str) => {
                    let list_str = match list_os_str.to_str() {
                        Some(list_str) => list_str,
                        None => continue 'devloop,
                    };
                    for val in list_str.split(':') {
Luker's avatar
Luker committed
                        use ::libdfim::config::device::TagCheck;
Luker's avatar
Luker committed
                            TagCheck::Ok(raw_tag) => {
                                if val == raw_tag {
                                    continue 'ctagloop;
                                }
                            }
Luker's avatar
Luker committed
                            TagCheck::Exclude(raw_tag) => {
                                if val == raw_tag {
                        }
                    }
                    continue 'devloop;
                }
            }
        }
        if let Some(old_match) = last_matched {
            ::slog::error!(
                logger,
                "device \"{:?}\" has multiple matches: \"{}\" - \"{}\"",
                dev.devpath(),
                old_match.id,
                cfg_dev.id
            );
            return Err(());
        }
        last_matched = Some(&cfg_dev);
    }
    match last_matched {
        None => Err(()),
        Some(cfg_dev) => {
            ::slog::trace!(
                "FOUND device: path:{:?} - initialized {:?} - devnum: {:?} - \
                 syspath: {:?} - devnode: {:?} - subsystem: {:?} - sysname: \
                 {:?} - sysnum: {:?} - devtype: {:?} - driver: {:?}",
                dev.devpath(),
                dev.is_initialized(),
                dev.devnum(),
                dev.syspath(),
                dev.devnode(),
                dev.subsystem(),
                dev.sysname(),
                dev.sysnum(),
                dev.devtype(),
                dev.driver()
            );
            for a in dev.attributes() {
                ::slog::trace!(logger, "A: {:?} - {:?}", a.name(), a.value());
            }
            for p in dev.properties() {
                ::slog::trace!(logger, "P: {:?} - {:?}", p.name(), p.value());
            }
            Ok((cfg_dev.id.to_owned(), dev))
        }