Skip to content
RaptorQ.hpp 30.4 KiB
Newer Older
Luker's avatar
Luker committed
/*
Luker's avatar
Luker committed
 * Copyright (c) 2015-2018, Luca Fulchir<luca@fulchir.it>, All rights reserved.
Luker's avatar
Luker committed
 *
 * This file is part of "libRaptorQ".
 *
 * libRaptorQ is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation, either version 3
 * of the License, or (at your option) any later version.
 *
 * libRaptorQ is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * and a copy of the GNU Lesser General Public License
 * along with libRaptorQ.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once

#include "RaptorQ/v1/common.hpp"
#include "RaptorQ/v1/block_sizes.hpp"
#ifdef RQ_HEADER_ONLY
    #include "RaptorQ/v1/RaptorQ_Iterators.hpp"
#endif
Luker's avatar
Luker committed
#include "RaptorQ/v1/Encoder.hpp"
#include "RaptorQ/v1/Decoder.hpp"
Luker's avatar
Luker committed
#include "RaptorQ/v1/Parameters.hpp"
Luker's avatar
Luker committed
#include <algorithm>
Luker's avatar
Luker committed
#include <atomic>
Luker's avatar
Luker committed
#include <cmath>
Luker's avatar
Luker committed
#include <deque>
Luker's avatar
Luker committed
#include <future>
Luker's avatar
Luker committed
#include <limits>
Luker's avatar
Luker committed
#include <memory>
Luker's avatar
Luker committed
#include <mutex>
Luker's avatar
Luker committed
#include <vector>
#include <utility>

Luker's avatar
Luker committed


namespace RaptorQ__v1 {
Luker's avatar
Luker committed
namespace Impl {
template <typename Rnd_It, typename Fwd_It = Rnd_It>
class RAPTORQ_LOCAL Encoder;
template <typename In_It, typename Fwd_It = In_It>
class RAPTORQ_LOCAL Decoder;
} // namespace Impl

// expose classes, but only if header-only
#ifdef RQ_HEADER_ONLY
    template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
    using Encoder = Impl::Encoder<Rnd_It, Fwd_It>;
    template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
    using Decoder = Impl::Decoder<Rnd_It, Fwd_It>;
Luker's avatar
Luker committed

namespace Impl {
Luker's avatar
Luker committed

template <typename Rnd_It, typename Fwd_It>
class RAPTORQ_LOCAL Encoder
{
public:
Luker's avatar
Luker committed
    ~Encoder();
Luker's avatar
Luker committed
    // used for precomputation
    Encoder (const Block_Size symbols, const size_t symbol_size);
Luker's avatar
Luker committed
    Encoder() = delete;
    Encoder (const Encoder&) = delete;
    Encoder& operator= (const Encoder&) = delete;
Luker's avatar
Luker committed
    Encoder (Encoder &&) = delete;
    Encoder& operator= (Encoder &&) = delete;
Luker's avatar
Luker committed
    explicit operator bool() const;
Luker's avatar
Luker committed
    uint16_t symbols() const;
Luker's avatar
Luker committed
    size_t symbol_size() const;
Luker's avatar
Luker committed
    uint32_t max_repair() const;
Luker's avatar
Luker committed

#ifdef RQ_HEADER_ONLY
Luker's avatar
Luker committed
    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> begin_source();
Luker's avatar
Luker committed
    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> end_source();
Luker's avatar
Luker committed
    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> begin_repair();
    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> end_repair
Luker's avatar
Luker committed
                                                        (const uint32_t repair);
Luker's avatar
Luker committed

    bool has_data() const;
Luker's avatar
Luker committed
    size_t set_data (const Rnd_It &from, const Rnd_It &to);
    void clear_data();
    bool ready() const;
    void stop();
Luker's avatar
Luker committed

    bool precompute_sync();
Luker's avatar
Luker committed
    bool compute_sync();
Luker's avatar
Luker committed
    std::shared_future<Error> precompute();
    std::shared_future<Error> compute();
Luker's avatar
Luker committed
    size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t id);
Luker's avatar
Luker committed

private:
Luker's avatar
Luker committed
    enum class Enc_State : uint8_t {
        INIT_ERROR = 1,
        NEED_DATA = 2,
        FULL = 3
    };
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    const size_t _symbol_size;
Luker's avatar
Luker committed
    const uint16_t _symbols;
Luker's avatar
Luker committed
    Enc_State _state;
Luker's avatar
Luker committed
    Raw_Encoder<Rnd_It, Fwd_It, without_interleaver> encoder;
    DenseMtx precomputed;
    Rnd_It _from, _to;
Luker's avatar
Luker committed
    // avoid launching multiple computations for the encoder.
    // it is guaranteed to succeed anyway.
    std::mutex _mtx;
    std::shared_future<Error> _single_wait;
    std::thread _waiting;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    static void compute_thread (Encoder<Rnd_It, Fwd_It> *obj,
                                                    bool forced_precomputation,
Luker's avatar
Luker committed
                                                    std::promise<Error> p);
Luker's avatar
Luker committed
};

template <typename In_It, typename Fwd_It>
class RAPTORQ_LOCAL Decoder
{
public:
    using Report = Dec_Report;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    ~Decoder();
    Decoder (const Block_Size symbols, const size_t symbol_size,
Luker's avatar
Luker committed
                                                        const Dec_Report type);
Luker's avatar
Luker committed
    Decoder (const Decoder&) = delete;
    Decoder& operator= (const Decoder&) = delete;
Luker's avatar
Luker committed
    Decoder (Decoder &&) = delete;
    Decoder& operator= (Decoder &&) = delete;
Luker's avatar
Luker committed
    explicit operator bool() const;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    uint16_t symbols() const;
    size_t symbol_size() const;
Luker's avatar
Luker committed

#ifdef RQ_HEADER_ONLY
Luker's avatar
Luker committed
    RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> begin();
Luker's avatar
Luker committed
    RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> end();
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    Error add_symbol (In_It &from, const In_It to, const uint32_t esi);
    std::vector<bool> end_of_input (const Fill_With_Zeros fill);
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    bool can_decode() const;
    bool ready() const;
Luker's avatar
Luker committed
    void stop();
Luker's avatar
Luker committed
    void clear_data();
Luker's avatar
Luker committed
    uint16_t needed_symbols() const;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    void set_max_concurrency (const uint16_t max_threads);
    Decoder_Result decode_once();
Luker's avatar
Luker committed

    struct Decoder_wait_res poll();
    struct Decoder_wait_res wait_sync();
    std::future<struct Decoder_wait_res> wait();
Luker's avatar
Luker committed

Luker's avatar
Luker committed

Luker's avatar
Luker committed
    Error decode_symbol (Fwd_It &start, const Fwd_It end, const uint16_t esi);
Luker's avatar
Luker committed
    // return number of bytes written
Luker's avatar
Luker committed
    struct Decoder_written decode_bytes (Fwd_It &start, const Fwd_It end,
Luker's avatar
Luker committed
                                    const size_t from_byte, const size_t skip);
Luker's avatar
Luker committed
private:
Luker's avatar
Luker committed
    uint16_t _max_threads;
Luker's avatar
Luker committed
    const uint16_t _symbols;
Luker's avatar
Luker committed
    const size_t _symbol_size;
Luker's avatar
Luker committed
    std::atomic<uint32_t> last_reported;
Luker's avatar
Luker committed
    const Dec_Report _type;
Luker's avatar
Luker committed
    RaptorQ__v1::Work_State work;
    Raw_Decoder<In_It> dec;
    // 2* symbols. Actually tracks available and reported symbols.
    // each symbol gets 2 bool: 1= available, 2=reported
    std::deque<std::atomic<bool>> symbols_tracker;
    std::mutex _mtx;
    std::condition_variable _cond;
    std::vector<std::thread> waiting;

    static void waiting_thread (Decoder<In_It, Fwd_It> *obj,
                                    std::promise<struct Decoder_wait_res> p);
Luker's avatar
Luker committed
};


///////////////////
//// Encoder
///////////////////

template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::~Encoder()
{
Luker's avatar
Luker committed
    encoder.stop();
Luker's avatar
Luker committed
    if (_waiting.joinable())
        _waiting.join();
}

template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::Encoder (const Block_Size symbols,
Luker's avatar
Luker committed
                                                    const size_t symbol_size)
Luker's avatar
Luker committed
    : _symbol_size (symbol_size), _symbols (static_cast<uint16_t> (symbols)),
                                                encoder (symbols, _symbol_size)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder");
    IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder");
Luker's avatar
Luker committed
    // check for proper initialization
    uint16_t idx;
    for (idx = 0; idx < (*blocks).size(); ++idx) {
        if ((*blocks)[idx] == symbols)
            break;
    }
    // check that the user did not try some cast trickery,
    // and maximum size is ssize_t::max. But ssize_t is not standard,
    // so we search the maximum ourselves.
    if (idx == (*blocks).size() || symbol_size >= std::pow (2,
                                    (sizeof(size_t) == 4 ? 31 : 63))) {
        _state = Enc_State::INIT_ERROR;
    }
    _state = Enc_State::NEED_DATA;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::operator bool() const
    { return _state != Enc_State::INIT_ERROR; }

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbols() const
{
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR)
        return 0;
Luker's avatar
Luker committed
    return _symbols;
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::symbol_size() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR)
        return 0;
Luker's avatar
Luker committed
    return _symbol_size;
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::max_repair() const
{
Luker's avatar
Luker committed
    // you can have up to 56403 symbols in a block
    // rfc6330 limits you to 992173 repair symbols
    // but limits are meant to be broken!
    // the limit sould be up to 4294967279 repair symbols 2^32-(_param.S + H)
    // but people might misuse the API, and call end_repair(max_repair),
    // which would overflow.
    // We are sorry for taking away from you that 0.0014% of repair symbols.
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR)
        return 0;
Luker's avatar
Luker committed
    auto _param = Parameters (_symbols);
    return static_cast<uint32_t> (std::numeric_limits<uint32_t>::max() -
                                                                    _param.L);
#ifdef RQ_HEADER_ONLY
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
                                        Encoder<Rnd_It, Fwd_It>::begin_source()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (this, 0);
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
                                       Encoder<Rnd_It, Fwd_It>::end_source()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (this,
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
                                        Encoder<Rnd_It, Fwd_It>::begin_repair()
    { return end_source(); }
Luker's avatar
Luker committed

template <typename Rnd_It, typename Fwd_It>
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
                    Encoder<Rnd_It, Fwd_It>::end_repair (const uint32_t repair)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (nullptr,
Luker's avatar
Luker committed
                                                            _symbols + repair);
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
bool Encoder<Rnd_It, Fwd_It>::has_data() const
Luker's avatar
Luker committed
{
    if (_state == Enc_State::INIT_ERROR)
        return false;
    return _state == Enc_State::FULL;
}
Luker's avatar
Luker committed

template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::set_data (const Rnd_It &from, const Rnd_It &to)
{
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR)
        return 0;
    _from = from;
    _to = to;
Luker's avatar
Luker committed
    encoder.set_data (&_from, &_to);
Luker's avatar
Luker committed
    _state = Enc_State::FULL;
    return static_cast<size_t>(_to - _from) *
                    sizeof(typename std::iterator_traits<Rnd_It>::value_type);
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::clear_data()
{
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR)
        return;
Luker's avatar
Luker committed
    std::lock_guard<std::mutex> lock (_mtx);
    RQ_UNUSED (lock);
    if (_waiting.joinable()) {
        encoder.stop();
        _waiting.join();
    }
    _single_wait = std::shared_future<Error>();
Luker's avatar
Luker committed
    _state = Enc_State::NEED_DATA;
Luker's avatar
Luker committed
    encoder.clear_data();
template <typename Rnd_It, typename Fwd_It>
bool Encoder<Rnd_It, Fwd_It>::ready() const
{
    if (_state == Enc_State::INIT_ERROR)
        return false;
    return encoder.ready();
}

template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::stop()
    { encoder.stop(); }

template <typename Rnd_It, typename Fwd_It>
bool Encoder<Rnd_It, Fwd_It>::precompute_sync()
{
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR)
        return false;
Luker's avatar
Luker committed
    if (_single_wait.valid()) {
        _single_wait.wait();
        return true;
Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
    if (_single_wait.valid() &&  _single_wait.get() == Error::NONE)
        return true;
    stop();
    if (_waiting.joinable())
        _waiting.join();
    std::promise<Error> p;
    _single_wait = p.get_future().share();
    lock.unlock();
    compute_thread (this, true, std::move(p));
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
bool Encoder<Rnd_It, Fwd_It>::compute_sync()
{
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR)
        return false;
Luker's avatar
Luker committed
    if (_single_wait.valid())
        _single_wait.wait();
    std::unique_lock<std::mutex> lock (_mtx);
    if (_single_wait.valid() && _single_wait.get() == Error::NONE)
Luker's avatar
Luker committed
    stop();
    if (_waiting.joinable())
        _waiting.join();
    std::promise<Error> p;
    _single_wait = p.get_future().share();
    lock.unlock();
    compute_thread (this, false, std::move(p));
    return true;
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::compute_thread (
                        Encoder<Rnd_It, Fwd_It> *obj, bool force_precomputation,
                                                        std::promise<Error> p)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    static RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING;

    if (force_precomputation) {
        if (obj->precomputed.rows() == 0)
            obj->precomputed = obj->encoder.get_precomputed (&work);
        if (obj->precomputed.rows() == 0) {
            // encoder always works. only possible reason:
            p.set_value (Error::EXITING);
            return;
        }
        // if we finished getting data by the time the computation
        // finished, update it all.
Luker's avatar
Luker committed
        if (obj->_state == Enc_State::FULL && !obj->encoder.ready())
            obj->encoder.generate_symbols (obj->precomputed,
                                                    &obj->_from, &obj->_to);
        p.set_value (Error::NONE);
    } else {
        if (obj->encoder.ready()) {
            p.set_value (Error::NONE);
            return;
        }
Luker's avatar
Luker committed
        if (obj->_state == Enc_State::FULL) {
            if (obj->encoder.generate_symbols (&work, &obj->_from, &obj->_to)) {
                p.set_value (Error::NONE);
                return;
            } else {
                // only possible reason:
                p.set_value (Error::EXITING);
                return;
            }
        } else {
            if (obj->precomputed.rows() == 0) {
                obj->precomputed = obj->encoder.get_precomputed (&work);
                if (obj->precomputed.rows() == 0) {
                    // only possible reason:
                    p.set_value (Error::EXITING);
                    return;
                }
            }
Luker's avatar
Luker committed
            if (obj->_state == Enc_State::FULL) {
                // if we finished getting data by the time the computation
                // finished, update it all.
                obj->encoder.generate_symbols (obj->precomputed,
                                                    &obj->_from, &obj->_to);
            }
            p.set_value (Error::NONE);
            return;
        }
    }
}

template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
std::shared_future<Error> Encoder<Rnd_It, Fwd_It>::precompute()
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR) {
Luker's avatar
Luker committed
        std::promise<Error> p;
Luker's avatar
Luker committed
        p.set_value (Error::INITIALIZATION);
Luker's avatar
Luker committed
        return p.get_future().share();
Luker's avatar
Luker committed
    }
Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
    if (_single_wait.valid())
        return _single_wait;
    stop();
    if (_waiting.joinable())
        _waiting.join();
    std::promise<Error> p;
    _single_wait = p.get_future().share();
    _waiting = std::thread (compute_thread, this, true, std::move(p));
    lock.unlock();
    return _single_wait;
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
std::shared_future<Error> Encoder<Rnd_It, Fwd_It>::compute()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    std::promise<Error> p;
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR) {
        p.set_value (Error::INITIALIZATION);
Luker's avatar
Luker committed
        return p.get_future().share();
Luker's avatar
Luker committed
    }
Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
    if (_single_wait.valid())
        return _single_wait;
    stop();
    if (_waiting.joinable())
        _waiting.join();
    _single_wait = p.get_future().share();
    _waiting = std::thread (compute_thread, this, false, std::move(p));
    lock.unlock();
    return _single_wait;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
Luker's avatar
Luker committed
                                                            const uint32_t id)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (_state == Enc_State::INIT_ERROR)
        return 0;
Luker's avatar
Luker committed
    // returns number of iterators written
Luker's avatar
Luker committed
    if (_state == Enc_State::FULL) {
Luker's avatar
Luker committed
        if (id >= _symbols) { // repair symbol
            if (!encoder.ready()) {
                if (!_single_wait.valid())
                    _single_wait = compute();
                _single_wait.wait();
            }
Luker's avatar
Luker committed
        return encoder.Enc (id, output, end);
    }
    return 0;
Luker's avatar
Luker committed
}

///////////////////
//// Decoder
///////////////////

template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::~Decoder ()
{
Luker's avatar
Luker committed
    work = RaptorQ__v1::Work_State::ABORT_COMPUTATION;
Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
Luker's avatar
Luker committed
    _cond.notify_all();
Luker's avatar
Luker committed
    lock.unlock();
Luker's avatar
Luker committed
    // wait threads to exit
    do {
Luker's avatar
Luker committed
        lock.lock();
        if (waiting.size() == 0) {
            lock.unlock();
Luker's avatar
Luker committed
            break;
Luker's avatar
Luker committed
        }
Luker's avatar
Luker committed
        _cond.wait (lock);
        lock.unlock();
    } while (waiting.size() != 0);
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::Decoder (const Block_Size symbols,
Luker's avatar
Luker committed
                                const size_t symbol_size, const Dec_Report type)
Luker's avatar
Luker committed
    :_symbols (static_cast<uint16_t> (symbols)), _symbol_size (symbol_size),
                                    _type (type), dec (symbols, symbol_size)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    IS_INPUT(In_It, "RaptorQ__v1::Decoder");
    IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder");
Luker's avatar
Luker committed
    // check for proper initialization
    uint16_t idx;
    for (idx = 0; idx < (*blocks).size(); ++idx) {
        if ((*blocks)[idx] == symbols)
            break;
    }
    // check that the user did not try some cast trickery,
Luker's avatar
Luker committed
    // and maximum size is ssize_t::max.
    if (idx == (*blocks).size() || symbol_size >=
                                        std::numeric_limits<ssize_t>::max()) {
Luker's avatar
Luker committed
        return;
    }
Luker's avatar
Luker committed
    if (type != Dec_Report::PARTIAL_FROM_BEGINNING &&
            type != Dec_Report::PARTIAL_ANY &&
            type != Dec_Report::COMPLETE) {
Luker's avatar
Luker committed
        return; // no cast trickey plz
    }
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    last_reported.store (0);
    symbols_tracker = std::deque<std::atomic<bool>> (2 * _symbols);
Luker's avatar
Luker committed
    for (idx = 0; idx < 2 * _symbols; ++idx)
Luker's avatar
Luker committed
        symbols_tracker[idx] = false;
    work = RaptorQ__v1::Work_State::KEEP_WORKING;
Luker's avatar
Luker committed
    _max_threads = 2;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Decoder<Rnd_It, Fwd_It>::operator bool() const
    { return symbols_tracker.size() > 0; }

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols() const
{
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
        return 0;
Luker's avatar
Luker committed
    return _symbols;
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Decoder<In_It, Fwd_It>::symbol_size() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
        return 0;
Luker's avatar
Luker committed
    return _symbol_size;
Luker's avatar
Luker committed
}

#ifdef RQ_HEADER_ONLY
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It>
Luker's avatar
Luker committed
                                                Decoder<In_It, Fwd_It>::begin()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    return RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> (this, 0);
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It>
Luker's avatar
Luker committed
                                                Decoder<In_It, Fwd_It>::end()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    return RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> (nullptr,
                                                                _symbols);
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::needed_symbols() const
{
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
        return 0;
Luker's avatar
Luker committed
    return dec.needed_symbols();
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &from, const In_It to,
Luker's avatar
Luker committed
                                                            const uint32_t esi)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
        return Error::INITIALIZATION;
Luker's avatar
Luker committed
    auto ret = dec.add_symbol (from, to, esi, false);
Luker's avatar
Luker committed
    if (ret == Error::NONE) {
        if (esi < _symbols)
            symbols_tracker [2 * esi].store (true);
        std::unique_lock<std::mutex> lock (_mtx);
Luker's avatar
Luker committed
        _cond.notify_all();
    }
    return ret;
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Decoder_wait_res Decoder<In_It, Fwd_It>::poll ()
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
        return {Error::INITIALIZATION, 0};
Luker's avatar
Luker committed
    uint32_t idx;
    uint32_t last;
    bool expected = false;
    switch (_type) {
Luker's avatar
Luker committed
    case Dec_Report::PARTIAL_FROM_BEGINNING:
Luker's avatar
Luker committed
        // report the number of symbols that are known, starting from
        // the beginning.
        last = last_reported.load();
        idx = last;
        for (; idx < symbols_tracker.size(); idx += 2) {
            if (symbols_tracker[idx].load() == true) {
                ++idx;
                if (symbols_tracker[idx].load() == false)
                    symbols_tracker[idx].store (true);
                --idx;
            } else {
                break;
            }
        }
        idx /= 2;
        if (idx > last) {
            while (!last_reported.compare_exchange_weak (last, idx)) {
                // expected is now "last_reported.load()"
                if (last >= idx) {
                    // other thread already reported more than us.
                    // do not report things twice.
                    if (dec.ready()) {
                        last_reported.store (_symbols);
                        return {Error::NONE, _symbols};
                    }
Luker's avatar
Luker committed
                    if (dec.threads() > 0)
Luker's avatar
Luker committed
                        return {Error::WORKING, 0};
                    return {Error::NEED_DATA, 0};
                }
                // else we can report the new stuff
            }
Luker's avatar
Luker committed
            return {Error::NONE, static_cast<uint16_t>(idx)};
Luker's avatar
Luker committed
        }
        // nothing to report
        if (dec.ready()) {
            last_reported.store (_symbols);
            return {Error::NONE, _symbols};
        }
Luker's avatar
Luker committed
        if (dec.threads() > 0)
Luker's avatar
Luker committed
            return {Error::WORKING, 0};
        return {Error::NEED_DATA, 0};
Luker's avatar
Luker committed
    case Dec_Report::PARTIAL_ANY:
Luker's avatar
Luker committed
        // report the first available, not yet reported.
        // or return {NONE, _symbols} if all have been reported
        if (dec.ready())
            return {Error::NONE, _symbols};
        for (idx = 0; idx < static_cast<uint32_t> (symbols_tracker.size());
                                                                    idx += 2) {
            if (symbols_tracker[idx].load() == true) {
                ++idx;
                if (symbols_tracker[idx].load() == false) {
                    expected = false;
                    if (symbols_tracker[idx].
                                    compare_exchange_strong (expected, true)) {
Luker's avatar
Luker committed
                        return {Error::NONE, static_cast<uint16_t> (idx / 2)};
Luker's avatar
Luker committed
                    }   // else some other thread raced us, keep trying other
                        // symbols
                }
            }
        }
        if (dec.ready())
            return {Error::NONE, _symbols};
Luker's avatar
Luker committed
        if (dec.threads() > 0)
Luker's avatar
Luker committed
            return {Error::WORKING, 0};
        return {Error::NEED_DATA, 0};
Luker's avatar
Luker committed
    case Dec_Report::COMPLETE:
Luker's avatar
Luker committed
        auto init = last_reported.load();
Luker's avatar
Luker committed
        idx = init * 2;
        for (; idx < symbols_tracker.size(); idx += 2) {
            if (symbols_tracker[idx].load() == false) {
Luker's avatar
Luker committed
                idx /= 2;
                while (!last_reported.compare_exchange_weak(init, idx))
                    idx = std::max(init, idx);
Luker's avatar
Luker committed
                if (dec.threads() > 0)
                    return {Error::WORKING, 0};
                return {Error::NEED_DATA, 0};
            }
        }
        last_reported.store (_symbols);
        return {Error::NONE, 0};
    }
    return {Error::WORKING, 0};
}

template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::waiting_thread (Decoder<In_It, Fwd_It> *obj,
Luker's avatar
Luker committed
                                        std::promise<struct Decoder_wait_res> p)
Luker's avatar
Luker committed
    bool promise_set = false;
Luker's avatar
Luker committed
    while (obj->work == RaptorQ__v1::Work_State::KEEP_WORKING) {
Luker's avatar
Luker committed
        bool compute = obj->dec.add_concurrent (obj->_max_threads);
        if (compute) {
            obj->decode_once();
            std::unique_lock<std::mutex> lock (obj->_mtx);
            obj->dec.drop_concurrent();
            obj->_cond.notify_all(); // notify other waiting threads
        }
Luker's avatar
Luker committed
        std::unique_lock<std::mutex> lock (obj->_mtx);
        // poll() does not actually need to be locked, but we use the
        // lock-wait mechanism to signal the arrival of new symbols,
        // so that we retry only when we get new data.
        auto res = obj->poll();
Luker's avatar
Luker committed
        if (res.error == Error::NONE || (obj->dec.end_of_input == true &&
Luker's avatar
Luker committed
                                            !obj->dec.can_decode()  &&
                                            obj->dec.threads() == 0 &&
Luker's avatar
Luker committed
                                                res.error == Error::NEED_DATA)){
Luker's avatar
Luker committed
            p.set_value (res);
Luker's avatar
Luker committed
            promise_set = true;
Luker's avatar
Luker committed
            break;
        }
        obj->_cond.wait (lock);
        lock.unlock();
    }

Luker's avatar
Luker committed
    if (obj->work != RaptorQ__v1::Work_State::KEEP_WORKING && !promise_set)
Luker's avatar
Luker committed
        p.set_value ({Error::EXITING, 0});

    std::unique_lock<std::mutex> lock (obj->_mtx);
    RQ_UNUSED (lock);
    for (auto th = obj->waiting.begin(); th != obj->waiting.end(); ++th) {
        if (std::this_thread::get_id() == th->get_id()) {
            th->detach();
            obj->waiting.erase (th);
            break;
        }
    }
    obj->_cond.notify_all(); // notify exit to destructor
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Decoder_wait_res Decoder<In_It, Fwd_It>::wait_sync ()
Luker's avatar
Luker committed
    // FIXME: if used, then poll() can not return ERROR::WAITING
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
        return {Error::INITIALIZATION, 0};
Luker's avatar
Luker committed
    std::promise<struct Decoder_wait_res> p;
Luker's avatar
Luker committed
    auto fut = p.get_future();
    waiting_thread (this, std::move(p));
    fut.wait();
    return fut.get();
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
std::future<struct Decoder_wait_res> Decoder<In_It, Fwd_It>::wait ()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    std::promise<struct Decoder_wait_res> p;
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0) {
        p.set_value ({Error::INITIALIZATION, 0});
        return p.get_future();
    }
Luker's avatar
Luker committed
    auto f = p.get_future();
Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
    RQ_UNUSED (lock);
Luker's avatar
Luker committed
    waiting.emplace_back (waiting_thread, this, std::move(p));
    return f;
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
std::vector<bool> Decoder<In_It, Fwd_It>::end_of_input (
                                                    const Fill_With_Zeros fill)
Luker's avatar
Luker committed
{
    if (symbols_tracker.size() != 0) {
        if (fill == Fill_With_Zeros::YES)
            return dec.fill_with_zeros();
Luker's avatar
Luker committed
        dec.end_of_input = true;
    }
    return std::vector<bool>();
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
bool Decoder<In_It, Fwd_It>::can_decode() const
Luker's avatar
Luker committed
{
    if (symbols_tracker.size() == 0)
        return false;
    return dec.can_decode();
}
Luker's avatar
Luker committed

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
void Decoder<In_It, Fwd_It>::set_max_concurrency (const uint16_t max_threads)
Luker's avatar
Luker committed
{
    if (symbols_tracker.size() != 0)
        _max_threads = max_threads;
}
Luker's avatar
Luker committed

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Decoder_Result Decoder<In_It, Fwd_It>::decode_once()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
Luker's avatar
Luker committed
        return Decoder_Result::STOPPED;
Luker's avatar
Luker committed
    auto res = dec.decode (&work);
    if (res == Decoder_Result::DECODED) {
        std::unique_lock<std::mutex> lock (_mtx);
        RQ_UNUSED (lock);
Luker's avatar
Luker committed
        if (_type != Dec_Report::COMPLETE) {
Luker's avatar
Luker committed
            uint32_t id = last_reported.load();
            for (; id < symbols_tracker.size(); id += 2)
                symbols_tracker[id].store (true);
        }
        last_reported.store(_symbols);
        lock.unlock();
    }
    return res;
template <typename In_It, typename Fwd_It>
bool Decoder<In_It, Fwd_It>::ready() const
{
    if (symbols_tracker.size() == 0)
        return false;
    return dec.ready();
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::stop()
{
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
        return;
Luker's avatar
Luker committed
    work = RaptorQ__v1::Work_State::ABORT_COMPUTATION;
Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
    RQ_UNUSED (lock);
Luker's avatar
Luker committed
    _cond.notify_all();
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::clear_data()
{
    if (symbols_tracker.size() == 0)
        return;
    std::unique_lock<std::mutex> lock (_mtx);
    RQ_UNUSED (lock);
    dec.clear_data();
    last_reported.store(0);
    for (auto it = symbols_tracker.begin(); it != symbols_tracker.end(); ++it)
        *it = false;
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Decoder_written Decoder<In_It, Fwd_It>::decode_bytes (Fwd_It &start,
                                                        const Fwd_It end,
                                                        const size_t from_byte,
                                                        const size_t skip)
Luker's avatar
Luker committed
{
    using T = typename std::iterator_traits<Fwd_It>::value_type;

Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0 || skip >= sizeof(T) || from_byte >=
Luker's avatar
Luker committed
                            static_cast<size_t> (_symbols * _symbol_size)) {
        return {0, 0};
    }

    auto decoded = dec.get_symbols();

    uint16_t esi = static_cast<uint16_t> (from_byte /
                                            static_cast<size_t> (_symbol_size));
    uint16_t byte = static_cast<uint16_t> (from_byte %
                                            static_cast<size_t> (_symbol_size));

    size_t offset_al = skip;
    T element = static_cast<T> (0);
    if (skip != 0) {
        uint8_t *p = reinterpret_cast<uint8_t *> (&*start);
        for (size_t keep = 0; keep < skip; ++keep) {
            element += static_cast<T> (*(p++)) << keep * 8;
        }
    }
    size_t written = 0;
    while (start != end && esi < _symbols && dec.has_symbol (esi)) {
        element += static_cast<T> (static_cast<uint8_t> ((*decoded)(esi, byte)))
                                                            << offset_al * 8;
        ++offset_al;
        if (offset_al == sizeof(T)) {
            *start = element;
            ++start;
            written += offset_al;
            offset_al = 0;
            element = static_cast<T> (0);
        }
        ++byte;
        if (byte == decoded->cols()) {
            byte = 0;
            ++esi;
        }
    }
    if (start != end && offset_al != 0) {
        // we have more stuff in "element", but not enough to fill
        // the iterator. Do not overwrite additional data of the iterator.
        uint8_t *out = reinterpret_cast<uint8_t *> (&*start);
        uint8_t *in = reinterpret_cast<uint8_t *> (&element);
        for (size_t idx = 0; idx < offset_al; ++idx, ++out, ++in)
            *out = *in;
        written += offset_al;
    }
    return {written, offset_al};
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Error Decoder<In_It, Fwd_It>::decode_symbol (Fwd_It &start, const Fwd_It end,
Luker's avatar
Luker committed
                                                            const uint16_t esi)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (symbols_tracker.size() == 0)
        return Error::INITIALIZATION;
Luker's avatar
Luker committed
    auto start_copy = start;
    size_t esi_byte = esi * _symbol_size;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    auto out = decode_bytes (start_copy, end, esi_byte, 0);
    if (out.written == _symbol_size) {
Luker's avatar
Luker committed
        start = start_copy;
        return Error::NONE;
    }
    return Error::NEED_DATA;
Luker's avatar
Luker committed
}

}   // namespace Impl
}   // namespace RaptorQ__v1