Skip to content
RFC.hpp 54.6 KiB
Newer Older
Luker's avatar
Luker committed
                                                        Compute::NO_POOL)))) {
        error = true;
    } else if (Compute::NONE != (flags & Compute::PARTIAL_ANY) &&
                (Compute::NONE != (flags & (Compute::PARTIAL_FROM_BEGINNING |
                                            Compute::COMPLETE |
                                            Compute::NO_POOL)))) {
        error = true;
    } else if (Compute::NONE != (flags & Compute::COMPLETE) &&
                    Compute::NONE != (flags &(Compute::PARTIAL_FROM_BEGINNING |
                                                Compute::PARTIAL_ANY |
                                                Compute::NO_POOL))) {
        error = true;
    }

    if (Compute::NONE != (flags & Compute::NO_POOL)) {
        std::unique_lock<std::mutex> lock (_mtx);
        RQ_UNUSED(lock);
        if (decoders.size() != 0) {
            // You can only say you won't use the pool *before* you start
            // decoding something!
            error = true;
        } else {
            use_pool = false;
            p.set_value ({Error::NONE, 0});
            return p.get_future();
        }
    }

    if (error) {
        p.set_value ({Error::WRONG_INPUT, 0});
        return p.get_future();
    }

    // do not add work to the pool to save up memory.
    // let "add_symbol craete the Decoders as needed.

    // spawn thread waiting for other thread exit.
    // this way we can set_value to the future when needed.
    auto future = p.get_future();
    if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
        wait_threads (this, flags, std::move(p));
    } else {
        std::unique_lock<std::mutex> pool_wait_lock (*_pool_mtx);
        RQ_UNUSED(pool_wait_lock);
        pool_wait.emplace_back (wait_threads, this, flags, std::move(p));
    }
    return future;
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::wait_threads (Decoder<In_It, Fwd_It> *obj,
Luker's avatar
Luker committed
                                    const Compute flags,
                                    std::promise<std::pair<Error, uint8_t>> p)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    auto _notify = obj->_pool_notify;
    while (true) {
Luker's avatar
Luker committed
        std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
        if (obj->exiting) { // make sure we can exit
            p.set_value ({Error::EXITING, 0});
            break;
        }
        auto status = obj->get_report (flags);
        if (Error::WORKING != status.first) {
            p.set_value (status);
            break;
        }

        _notify->wait (lock);
        lock.unlock();
    }

    // delete ourselves from the waiting thread vector.
    std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
    for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
        if (it->get_id() == std::this_thread::get_id()) {
            it->detach();
            obj->pool_wait.erase (it);
            break;
        }
    }
Luker's avatar
Luker committed
    lock.unlock();
Luker's avatar
Luker committed
    _notify->notify_all();
}

template <typename In_It, typename Fwd_It>
std::pair<Error, uint8_t> Decoder<In_It, Fwd_It>::get_report (
Luker's avatar
Luker committed
                                                            const Compute flags)
Luker's avatar
Luker committed
    if (decoders.size() == 0)
        return {Error::WORKING, 0};
Luker's avatar
Luker committed
    if (Compute::COMPLETE == (flags & Compute::COMPLETE) ||
            Compute::PARTIAL_FROM_BEGINNING ==
                                    (flags & Compute::PARTIAL_FROM_BEGINNING)) {
        uint16_t reportable = 0;
        uint16_t next_expected = static_cast<uint16_t> (pool_last_reported + 1);
        std::unique_lock<std::mutex> dec_lock (_mtx);
        auto it = decoders.lower_bound (static_cast<uint8_t> (next_expected));

Luker's avatar
Luker committed
        // get last reportable block
        for (; it != decoders.end(); ++it) {
            auto id = it->first;
            if (id != next_expected)
                break; // not consecutive
Luker's avatar
Luker committed
            auto ptr = it->second.dec;
            if (ptr == nullptr) {
                assert(false && "RFC6330: decoder should never be nullptr.");
                break;
            }
            if (!ptr->ready()) {
                if (ptr->is_stopped())
                    return {Error::EXITING, 0};
                if (ptr->end_of_input && ptr->threads() == 0)
                    return {Error::NEED_DATA, 0};
                break; // still working
            }
Luker's avatar
Luker committed
            ++reportable;
            ++next_expected;
Luker's avatar
Luker committed
        }
        dec_lock.unlock();
Luker's avatar
Luker committed
        if (reportable > 0) {
            pool_last_reported += reportable;
            if (Compute::PARTIAL_FROM_BEGINNING ==
                                    (flags & Compute::PARTIAL_FROM_BEGINNING)) {
                return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
            } else {
                // complete
                if (pool_last_reported == _blocks - 1)
                    return {Error::NONE,
Luker's avatar
Luker committed
                                    static_cast<uint8_t>(pool_last_reported)};
Luker's avatar
Luker committed
            }
        }
    } else if (Compute::PARTIAL_ANY == (flags & Compute::PARTIAL_ANY)) {
        // invalidate other pointers.
        auto undecodable = decoders.end();
        std::unique_lock<std::mutex> dec_lock (_mtx);
        RQ_UNUSED(dec_lock);
        for (auto it = decoders.begin(); it != decoders.end(); ++it) {
Luker's avatar
Luker committed
            if (!it->second.reported) {
                auto ptr = it->second.dec;
                if (ptr == nullptr) {
                    assert(false && "RFC6330: decoder should never be nullptr");
                    break;
                }
                if (ptr->ready()) {
                    it->second.reported = true;
                    return {Error::NONE, it->first};
                }
                if (ptr->is_stopped())
                    return {Error::EXITING, 0};
                // first return all decodable blocks
                // then return the ones we can not decode.
                if (ptr->end_of_input && ptr->threads() == 0)
                    undecodable = it;
Luker's avatar
Luker committed
            }
        }
        if (undecodable != decoders.end()) {
            undecodable->second.reported = true;
            return {Error::NEED_DATA, undecodable->first};
        }
Luker's avatar
Luker committed
    }
    // can be reached if computing thread was stopped
    return {Error::WORKING, 0};
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::decode_symbol (Fwd_It &start, const Fwd_It end,
                                                            const uint16_t esi,
                                                            const uint8_t sbn)
{
    if (!operator bool() || sbn > blocks() || esi > symbols (sbn))
        return 0;


    std::shared_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> dec_ptr = nullptr;
    std::unique_lock<std::mutex> lock (_mtx);
    auto it = decoders.find (sbn);

    if (it == decoders.end())
        return 0;   // did not receiveany data yet.

    if (use_pool) {
        dec_ptr = it->second.dec;
        lock.unlock();
        if (!dec_ptr->ready())
            return 0;   // did not receive enough data, or could not decode yet.
    } else {
        dec_ptr = it->second.dec;
        lock.unlock();
        if (!dec_ptr->ready()) {
            if (!dec_ptr->can_decode())
                return 0;
            RaptorQ__v1::Work_State keep_working =
                                        RaptorQ__v1::Work_State::KEEP_WORKING;
            dec_ptr->decode (&keep_working);
            if (!dec_ptr->ready())
                return 0;
        }
    }
    // decoder has decoded the block

    Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
                                                    _sub_blocks, _alignment);
    size_t max_bytes = block_size (sbn);
    if (sbn == (blocks() - 1)) {
        // size of the data (_size) is different from the sum of the size of
        // all blocks. get the real size, so we do not write more.
        // we obviously need to consider this only for the last block.
        uint64_t all_blocks = 0;
        for (uint8_t id = 0; id < blocks(); ++id)
            all_blocks += block_size (sbn);
        const uint64_t diff = all_blocks - _size;
        max_bytes -= static_cast<size_t>(diff);
    }
    // find the end:
    auto real_end = start;
    size_t fwd_iter_for_symbol = symbol_size() /
                    sizeof(typename std::iterator_traits<Fwd_It>::value_type);
    // be sure that 'end' points AT MAX to the end of the symbol
    if (std::is_same<typename std::iterator_traits<Fwd_It>::iterator_category,
                                    std::random_access_iterator_tag>::value) {
        real_end += fwd_iter_for_symbol;
        if (real_end > end)
            real_end = end;
    } else {
        // sory, fwd_iterators do not have comparison operators :(
        while (real_end != end && fwd_iter_for_symbol != 0)
            ++real_end;
    }
    return de_interleaving (start, end, max_bytes, 0, esi);
}

template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::decode_bytes (Fwd_It &start, const Fwd_It end,
Luker's avatar
Luker committed
                                                            const uint8_t skip)
Luker's avatar
Luker committed
    if (!operator bool())
        return 0;
Luker's avatar
Luker committed
    // Decode from the beginning, up untill we can.
    // return number of BYTES written, starting at "start + skip" bytes
    //
    // in case the last iterator is only half written, "start" will
    // point to the half-written iterator.

    uint64_t written = 0;
    uint8_t new_skip = skip;
    for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
        std::unique_lock<std::mutex> block_lock (_mtx);
        auto it = decoders.find (sbn);
        if (it == decoders.end())
            return written;
        auto dec_ptr = it->second.dec;
        block_lock.unlock();

        if (!dec_ptr->ready()) {
            if (!use_pool && dec_ptr->can_decode()) {
                RaptorQ__v1::Work_State state =
                                        RaptorQ__v1::Work_State::KEEP_WORKING;
                auto ret = dec_ptr->decode (&state);
Luker's avatar
Luker committed
                if (RaptorQ__v1::Decoder_Result::DECODED != ret) {
Luker's avatar
Luker committed
                    return written;
                }
            } else {
                return written;
            }
        }

        Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
                                                    _sub_blocks, _alignment);

        size_t max_bytes = block_size (sbn);
        if (sbn == (blocks() - 1)) {
            // size of the data (_size) is different from the sum of the size of
            // all blocks. get the real size, so we do not write more.
            // we obviously need to consider this only for the last block.
            uint64_t all_blocks = 0;
            for (uint8_t id = 0; id < blocks(); ++id)
                all_blocks += block_size (sbn);
            const size_t diff = static_cast<size_t> (all_blocks - _size);
            max_bytes -= diff;
        }
        auto tmp_start = start;
        uint64_t bytes_written = de_interleaving (tmp_start, end, max_bytes,
                                                                    new_skip);
        written += bytes_written;
        uint64_t bytes_and_skip = new_skip + bytes_written;
        new_skip = bytes_and_skip %
                    sizeof(typename std::iterator_traits<Fwd_It>::value_type);
        if (bytes_written == 0)
            return written;
        //new_skip = block_size (sbn) %
        //          sizeof(typename std::iterator_traits<Fwd_It>::value_type);
        // if we ended decoding in the middle of a Fwd_It, do not advance
        // start too much, or we will end up having additional zeros.
        if (new_skip == 0) {
            start = tmp_start;
        } else {
            uint64_t it_written = bytes_and_skip /
                    sizeof(typename std::iterator_traits<Fwd_It>::value_type);
            // RaptorQ handles at most 881GB per rfc, so
            // casting uint64 to int64 is safe
            // we can not do "--start" since it's a forward iterator
            #pragma clang diagnostic push
            #pragma clang diagnostic ignored "-Wshorten-64-to-32"
            #pragma clang diagnostic ignored "-Wsign-conversion"
            start += std::max (static_cast<uint64_t>(0), it_written - 1);
            #pragma clang diagnostic pop
        }
    }
    return written;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Decoder<In_It, Fwd_It>::decode_block_bytes (Fwd_It &start,
Luker's avatar
Luker committed
                                                            const Fwd_It end,
                                                            const uint8_t skip,
                                                            const uint8_t sbn)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!operator bool() || sbn >= _blocks)
Luker's avatar
Luker committed
        return 0;

    std::shared_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> dec_ptr = nullptr;
    std::unique_lock<std::mutex> lock (_mtx);
    auto it = decoders.find (sbn);

    if (it == decoders.end())
        return 0;   // did not receiveany data yet.

    if (use_pool) {
        dec_ptr = it->second.dec;
        lock.unlock();
        if (!dec_ptr->ready())
            return 0;   // did not receive enough data, or could not decode yet.
    } else {
        dec_ptr = it->second.dec;
        lock.unlock();
        if (!dec_ptr->ready()) {
            if (!dec_ptr->can_decode())
                return 0;
            RaptorQ__v1::Work_State keep_working =
                                        RaptorQ__v1::Work_State::KEEP_WORKING;
            dec_ptr->decode (&keep_working);
            if (!dec_ptr->ready())
                return 0;
        }
    }
    // decoder has decoded the block

    Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
                                                    _sub_blocks, _alignment);
    size_t max_bytes = block_size (sbn);
    if (sbn == (blocks() - 1)) {
        // size of the data (_size) is different from the sum of the size of
        // all blocks. get the real size, so we do not write more.
        // we obviously need to consider this only for the last block.
        uint64_t all_blocks = 0;
        for (uint8_t id = 0; id < blocks(); ++id)
            all_blocks += block_size (sbn);
        const uint64_t diff = all_blocks - _size;
        max_bytes -= static_cast<size_t>(diff);
    }
    return de_interleaving (start, end, max_bytes, skip);
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Decoder_written Decoder<In_It, Fwd_It>::decode_aligned (Fwd_It &start,
                                                            const Fwd_It end,
                                                            const uint8_t skip)
Luker's avatar
Luker committed
    const uint64_t bytes = decode_bytes (start, end, skip);
    const uint64_t skip_and_bytes = skip + bytes;
    const uint64_t iterators = skip_and_bytes /
                    sizeof(typename std::iterator_traits<Fwd_It>::value_type);
    const uint8_t new_skip = skip_and_bytes %
                    sizeof(typename std::iterator_traits<Fwd_It>::value_type);
    return {iterators, new_skip};
}

template <typename In_It, typename Fwd_It>
Decoder_written Decoder<In_It, Fwd_It>::decode_block_aligned (
Luker's avatar
Luker committed
                                                            Fwd_It &start,
                                                            const Fwd_It end,
                                                            const uint8_t skip,
                                                            const uint8_t sbn)
Luker's avatar
Luker committed
    const size_t bytes = decode_block_bytes (start, end, skip, sbn);
    const size_t skip_and_bytes = skip + bytes;
    const size_t iterators = skip_and_bytes /
                    sizeof(typename std::iterator_traits<Fwd_It>::value_type);
    const uint8_t new_skip = skip_and_bytes %
                    sizeof(typename std::iterator_traits<Fwd_It>::value_type);
    return {iterators, new_skip};
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks_ready()
{
    uint8_t blocks_ready = 0;
    for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
        if (is_block_ready (sbn))
            ++blocks_ready;
    }
    return blocks_ready;
}

template <typename In_It, typename Fwd_It>
bool Decoder<In_It, Fwd_It>::is_ready()
    { return blocks_ready() == blocks(); }

template <typename In_It, typename Fwd_It>
bool Decoder<In_It, Fwd_It>::is_block_ready (const uint8_t block)
{
    std::unique_lock<std::mutex> block_lock (_mtx);
    auto it = decoders.find (block);
    if (it == decoders.end())
        return false;
    auto dec_ptr = it->second.dec;
    block_lock.unlock();

    if (dec_ptr->ready())
        return true;
    return false;
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::bytes() const
Luker's avatar
Luker committed
    if (!operator bool())
        return 0;
Luker's avatar
Luker committed
    return _size;
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!operator bool())
        return 0;
Luker's avatar
Luker committed
    return static_cast<uint8_t> (part.num (0) + part.num (1));
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint32_t Decoder<In_It, Fwd_It>::block_size (const uint8_t sbn) const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!operator bool())
        return 0;
Luker's avatar
Luker committed
    if (sbn < part.num (0)) {
        return part.size (0) * _symbol_size;
    } else if (sbn - part.num (0) < part.num (1)) {
        return part.size (1) * _symbol_size;
    }
    return 0;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!operator bool())
        return 0;
Luker's avatar
Luker committed
    return _symbol_size;
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols (const uint8_t sbn) const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!operator bool())
        return 0;
Luker's avatar
Luker committed
    if (sbn < part.num (0)) {
        return part.size (0);
    } else if (sbn - part.num (0) < part.num (1)) {
        return part.size (1);
    }
    return 0;
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Block_Size Decoder<Rnd_It, Fwd_It>::extended_symbols (const uint8_t sbn) const
{
    const uint16_t symbols = this->symbols (sbn);
    if (symbols == 0)
        return static_cast<Block_Size> (0);
    uint16_t idx;
    for (idx = 0; idx < (*RFC6330__v1::blocks).size(); ++idx) {
Luker's avatar
Luker committed
        if (static_cast<uint16_t> ((*RFC6330__v1::blocks)[idx]) >= symbols)
Luker's avatar
Luker committed
            break;
    }
    // check that the user did not try some cast trickery,
    // and maximum size is ssize_t::max. But ssize_t is not standard,
    // so we search the maximum ourselves.
    if (idx == (*RFC6330__v1::blocks).size())
        return static_cast<Block_Size> (0);
    return (*RFC6330__v1::blocks)[idx];
}

}   // namespace Impl
Luker's avatar
Luker committed
}   // namespace RFC6330__v1
Luker's avatar
Luker committed