Skip to content
RFC.hpp 64.9 KiB
Newer Older
Luker's avatar
Luker committed
/*
 * Copyright (c) 2015-2018, Luca Fulchir<luca@fulchir.it>, All rights reserved.
Luker's avatar
Luker committed
 *
 * This file is part of "libRaptorQ".
 *
 * libRaptorQ is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation, either version 3
 * of the License, or (at your option) any later version.
 *
 * libRaptorQ is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * and a copy of the GNU Lesser General Public License
 * along with libRaptorQ.  If not, see <http://www.gnu.org/licenses/>.
 */

Luker's avatar
Luker committed
#pragma once
Luker's avatar
Luker committed

/////////////////////
//
Luker's avatar
Luker committed
//  These templates are just a wrapper around the
//  functionalities offered by the RaptorQ__v1::Impl namespace
//  So if you want to see what the algorithm looks like,
//  you are in the wrong place
Luker's avatar
Luker committed
//
/////////////////////

#include "RaptorQ/v1/block_sizes.hpp"
#include "RaptorQ/v1/Interleaver.hpp"
#include "RaptorQ/v1/De_Interleaver.hpp"
#include "RaptorQ/v1/Decoder.hpp"
#include "RaptorQ/v1/Encoder.hpp"
#include "RaptorQ/v1/RFC_Iterators.hpp"
#include "RaptorQ/v1/Shared_Computation/Decaying_LF.hpp"
#include "RaptorQ/v1/Thread_Pool.hpp"
Luker's avatar
Luker committed
#include "RaptorQ/v1/util/endianess.hpp"
#include <algorithm>
#include <cassert>
#include <future>
Luker's avatar
Luker committed
#include <map>
#include <memory>
Luker's avatar
Luker committed
#include <mutex>
Luker's avatar
Luker committed
#include <limits>
Luker's avatar
Luker committed
#include <thread>
#include <tuple>
Luker's avatar
Luker committed
#include <type_traits>
#include <utility>

namespace RFC6330__v1 {
Luker's avatar
Luker committed

Luker's avatar
Luker committed
constexpr uint64_t max_data = RFC6330_max_data;  // ~881 GB
namespace Impl {
template <typename Rnd_It, typename Fwd_It>
class RAPTORQ_LOCAL Encoder;
template <typename In_It, typename Fwd_It>
class RAPTORQ_LOCAL Decoder;
} // namespace Impl
#ifdef RQ_HEADER_ONLY
    template <typename Rnd_It, typename Fwd_It>
    using Encoder = Impl::Encoder<Rnd_It, Fwd_It>;
    template <typename Rnd_It, typename Fwd_It>
    using Decoder = Impl::Decoder<Rnd_It, Fwd_It>;
Luker's avatar
Luker committed
namespace Impl {

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
class RAPTORQ_LOCAL Encoder
Luker's avatar
Luker committed
{
public:

Luker's avatar
Luker committed
    Encoder() = delete;
    Encoder (const Encoder&) = delete;
    Encoder& operator= (const Encoder&) = delete;
Luker's avatar
Luker committed
    Encoder (Encoder&&) = delete;
    Encoder& operator= (Encoder&&) = delete;
Luker's avatar
Luker committed
    ~Encoder();
    Encoder (const Rnd_It data_from, const Rnd_It data_to,
                                            const uint16_t min_subsymbol_size,
                                            const uint16_t symbol_size,
                                            const size_t max_sub_block)
        : _max_sub_blk (max_sub_block), _data_from (data_from),
                                            _data_to (data_to),
Luker's avatar
Luker committed
                                            _symbol_size (symbol_size),
                                            _min_subsymbol (min_subsymbol_size),
                                            interleave (_data_from,
                                                        _data_to,
                                                        _min_subsymbol,
                                                        _max_sub_blk,
Luker's avatar
Luker committed
                                                        _symbol_size)
    {
        IS_RANDOM(Rnd_It, "RFC6330__v1::Encoder");
        IS_FORWARD(Fwd_It, "RFC6330__v1::Encoder");
        auto _alignment = sizeof(typename
                                    std::iterator_traits<Rnd_It>::value_type);
        RQ_UNUSED(_alignment);  // used only for asserts
        assert(_symbol_size >= _alignment &&
                        "RaptorQ: symbol_size must be >= alignment");
        assert((_symbol_size % _alignment) == 0 &&
                        "RaptorQ: symbol_size must be multiple of alignment");
        assert(min_subsymbol_size >= _alignment &&
                        "RaptorQ: minimum subsymbol must be at least aligment");
        assert(min_subsymbol_size <= _symbol_size &&
                    "RaptorQ: minimum subsymbol must be at most symbol_size");
        assert((min_subsymbol_size % _alignment) == 0 &&
                    "RaptorQ: minimum subsymbol must be multiple of alignment");
        assert((_symbol_size % min_subsymbol_size == 0) &&
                    "RaptorQ: symbol size must be multiple of subsymbol size");
        // max size: ~881 GB
        if (static_cast<uint64_t> (data_to - data_from) *
                    sizeof(typename std::iterator_traits<Rnd_It>::value_type)
                                                                > max_data) {
            return;
        }
Luker's avatar
Luker committed
        _pool_notify = std::make_shared<std::condition_variable>();
        _pool_mtx = std::make_shared<std::mutex>();
Luker's avatar
Luker committed
        pool_last_reported = -1;
        use_pool = true;
        exiting = false;
    }

    It::Encoder::Block_Iterator<Rnd_It, Fwd_It> begin ()
        { return It::Encoder::Block_Iterator<Rnd_It, Fwd_It> (this, 0); }
    const It::Encoder::Block_Iterator<Rnd_It, Fwd_It> end ()
        { return It::Encoder::Block_Iterator<Rnd_It, Fwd_It> (this, blocks()); }
Luker's avatar
Luker committed

    operator bool() const { return interleave; }
Luker's avatar
Luker committed
    RFC6330_OTI_Common_Data OTI_Common() const;
    RFC6330_OTI_Scheme_Specific_Data OTI_Scheme_Specific() const;
Luker's avatar
Luker committed

    std::future<std::pair<Error, uint8_t>> compute (const Compute flags);

    size_t precompute_max_memory ();
    size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t esi,
                                                            const uint8_t sbn);
    // id: 8-bit sbn + 24 bit esi
    size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t &id);
Luker's avatar
Luker committed
    size_t encode_packet (Fwd_It &output, const Fwd_It end, const uint32_t &id);

Luker's avatar
Luker committed
    void free (const uint8_t sbn);
    uint8_t blocks() const;
Luker's avatar
Luker committed
    uint32_t block_size (const uint8_t sbn) const;
Luker's avatar
Luker committed
    uint16_t symbol_size() const;
    uint16_t symbols (const uint8_t sbn) const;
Luker's avatar
Luker committed
    Block_Size extended_symbols (const uint8_t sbn) const;
Luker's avatar
Luker committed
    uint32_t max_repair (const uint8_t sbn) const;
Luker's avatar
Luker committed
private:
Luker's avatar
Luker committed
    static void wait_threads (Encoder<Rnd_It, Fwd_It> *obj, const Compute flags,
                                    std::promise<std::pair<Error, uint8_t>> p);
Luker's avatar
Luker committed
    class Block_Work final : public Impl::Pool_Work {
    public:
        std::weak_ptr<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It,
                                  RaptorQ__v1::Impl::with_interleaver>> work;
Luker's avatar
Luker committed
        std::weak_ptr<std::condition_variable> notify;
Luker's avatar
Luker committed
        std::weak_ptr<std::mutex> lock;
Luker's avatar
Luker committed
        Work_Exit_Status do_work (RaptorQ__v1::Work_State *state) override;
        ~Block_Work() override;
    };
Luker's avatar
Luker committed
    class Enc {
    public:
        Enc (Impl::Interleaver<Rnd_It> *interleaver, const uint8_t sbn)
        {
            enc = std::make_shared<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It,
                                Fwd_It, RaptorQ__v1::Impl::with_interleaver>> (
                                                            interleaver, sbn);
Luker's avatar
Luker committed
            reported = false;
        }
        std::shared_ptr<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It,
                                    RaptorQ__v1::Impl::with_interleaver>> enc;
Luker's avatar
Luker committed
        bool reported;
    };
Luker's avatar
Luker committed
    std::pair<Error, uint8_t> get_report (const Compute flags);
Luker's avatar
Luker committed
    std::shared_ptr<std::condition_variable> _pool_notify;
    std::shared_ptr<std::mutex> _pool_mtx;
Luker's avatar
Luker committed
    std::deque<std::thread> pool_wait;
Luker's avatar
Luker committed
    std::map<uint8_t, Enc> encoders;
    std::mutex _mtx;
    const size_t _max_sub_blk;
Luker's avatar
Luker committed
    const Rnd_It _data_from, _data_to;
    const uint16_t _symbol_size;
    const uint16_t _min_subsymbol;
    Impl::Interleaver<Rnd_It> interleave;
    bool use_pool, exiting;
    int16_t pool_last_reported;
Luker's avatar
Luker committed

};

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
class RAPTORQ_LOCAL Decoder
Luker's avatar
Luker committed
{
public:
Luker's avatar
Luker committed
    // rfc 6330, pg 6
    // easy explanation for OTI_* comes next.
    // we do NOT use bitfields as compilators are not actually forced to put
    // them in any particular order. meaning tey're useless.
    //
    //union OTI_Common_Data {
    //  uint64_t raw;
    //  struct {
    //      uint64_t size:40;
    //      uint8_t reserved:8;
    //      uint16_t symbol_size:16;
    //  };
    //};

    //union OTI_Scheme_Specific_Data {
    //  uint32_t raw;
    //  struct {
    //      uint8_t source_blocks;
    //      uint16_t sub_blocks;
    //      uint8_t alignment;
    //  };
    //};
Luker's avatar
Luker committed
    Decoder (const Decoder&) = delete;
    Decoder& operator= (const Decoder&) = delete;
Luker's avatar
Luker committed
    Decoder (Decoder&&) = delete;
    Decoder& operator= (Decoder&&) = delete;
Luker's avatar
Luker committed
    ~Decoder();
Luker's avatar
Luker committed
    Decoder (const RFC6330_OTI_Common_Data common,
Luker's avatar
Luker committed
                                const RFC6330_OTI_Scheme_Specific_Data scheme)
Luker's avatar
Luker committed
    {
Luker's avatar
Luker committed
        // _size > max_data means improper initialization.
Luker's avatar
Luker committed
        IS_INPUT(In_It, "RaptorQ__v1::Decoder");
        IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder");

Luker's avatar
Luker committed
        RFC6330_OTI_Common_Data _common =
                    RaptorQ__v1::Impl::Endian::b_to_h<RFC6330_OTI_Common_Data>
                                                                    (common);
        RFC6330_OTI_Common_Data _scheme =
            RaptorQ__v1::Impl::Endian::b_to_h<RFC6330_OTI_Scheme_Specific_Data>
                                                                    (scheme);
Luker's avatar
Luker committed
        // see the above commented bitfields for quick reference
Luker's avatar
Luker committed
        _symbol_size = static_cast<uint16_t> (_common);
        _size = _common >> 24;
        uint16_t tot_sub_blocks = static_cast<uint16_t> (_scheme >> 8);
        _alignment = static_cast<uint8_t> (_scheme);
        _blocks = static_cast<uint8_t> (_scheme >> 24);
Luker's avatar
Luker committed
        if (_size > max_data || _size % _alignment != 0 ||
                                            _symbol_size % _alignment != 0) {
            _size = std::numeric_limits<uint64_t>::max();
            return;
        }
Luker's avatar
Luker committed
        _sub_blocks = Impl::Partition (_symbol_size / _alignment,
Luker's avatar
Luker committed
                                                                tot_sub_blocks);

        const uint64_t total_symbols = static_cast<uint64_t> (ceil (
                                _size / static_cast<double> (_symbol_size)));

        part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
        pool_last_reported = -1;
Luker's avatar
Luker committed
        _pool_notify = std::make_shared<std::condition_variable>();
        _pool_mtx = std::make_shared<std::mutex>();
Luker's avatar
Luker committed
        use_pool = true;
        exiting = false;
    }

    Decoder (const uint64_t size, const uint16_t symbol_size,
                                                    const uint16_t sub_blocks,
                                                    const uint8_t blocks,
                                                    const uint8_t alignment)
        :_size (size), _symbol_size (symbol_size), _blocks (blocks),
                                                        _alignment(alignment)
    {
Luker's avatar
Luker committed
        // _size > max_data means improper initialization.
        if (_size > max_data || _size % _alignment != 0 ||
                                            _symbol_size % _alignment != 0) {
            // really, not all the possible tests are here.
            // but the RFC sucks really bad... input validation is a pain...
            // please use the RAW API...
            _size = std::numeric_limits<uint64_t>::max();
Luker's avatar
Luker committed
            return;
Luker's avatar
Luker committed

        const uint64_t total_symbols = static_cast<uint64_t> (ceil (
                                _size / static_cast<double> (_symbol_size)));
        _sub_blocks = Impl::Partition (_symbol_size / _alignment, sub_blocks);

        part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
Luker's avatar
Luker committed
        _pool_notify = std::make_shared<std::condition_variable>();
        _pool_mtx = std::make_shared<std::mutex>();
Luker's avatar
Luker committed
        pool_last_reported = -1;
        use_pool = true;
        exiting = false;
    }
    It::Decoder::Block_Iterator<In_It, Fwd_It> begin ()
        { return It::Decoder::Block_Iterator<In_It, Fwd_It> (this, 0); }
    const It::Decoder::Block_Iterator<In_It, Fwd_It> end ()
        { return It::Decoder::Block_Iterator<In_It, Fwd_It> (this, blocks()); }
Luker's avatar
Luker committed
    operator bool() const
Luker's avatar
Luker committed
        { return _size <= max_data; }
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    // result type tracked by C_RFC_API.h/RFC6330_Result
Luker's avatar
Luker committed
    std::future<std::pair<Error, uint8_t>> compute (const Compute flags);
    // if you can tell there is no more input, we can avoid locking
    // forever and return an error, or if you wish we can fill
    // everythin with zero, and return you the bitmask of which bytes
    // we have and which we do not
    std::vector<bool> end_of_input (const Fill_With_Zeros fill,
                                                        const uint8_t block);
    std::vector<bool> end_of_input (const Fill_With_Zeros fill);

Luker's avatar
Luker committed
    // result in BYTES
Luker's avatar
Luker committed
    uint64_t decode_symbol (Fwd_It &start, const Fwd_It end, const uint16_t esi,
                                                            const uint8_t sbn);
Luker's avatar
Luker committed
    uint64_t decode_bytes (Fwd_It &start, const Fwd_It end, const uint8_t skip);
    size_t decode_block_bytes (Fwd_It &start, const Fwd_It end,
                                                            const uint8_t skip,
                                                            const uint8_t sbn);
    // result in ITERATORS
    // last *might* be half written depending on data alignments
Luker's avatar
Luker committed
    // NOTE: skip = uint8_t to avoid problems with _alignment
    Decoder_written decode_aligned (Fwd_It &start, const Fwd_It end,
Luker's avatar
Luker committed
                                                            const uint8_t skip);
    Decoder_written decode_block_aligned (Fwd_It &start,
Luker's avatar
Luker committed
                                                            const Fwd_It end,
                                                            const uint8_t skip,
                                                            const uint8_t sbn);
    // id: 8-bit sbn + 24 bit esi
    Error add_symbol (In_It &start, const In_It end, const uint32_t id);
    Error add_symbol (In_It &start, const In_It end, const uint32_t esi,
                                                            const uint8_t sbn);
Luker's avatar
Luker committed
    Error add_packet (In_It &start, const In_It end);

    uint8_t blocks_ready();
    bool is_ready();
    bool is_block_ready (const uint8_t block);
Luker's avatar
Luker committed
    void free (const uint8_t sbn);
    uint64_t bytes() const;
    uint8_t blocks() const;
    uint32_t block_size (const uint8_t sbn) const;
    uint16_t symbol_size() const;
    uint16_t symbols (const uint8_t sbn) const;
Luker's avatar
Luker committed
    Block_Size extended_symbols (const uint8_t sbn) const;
Luker's avatar
Luker committed
private:
Luker's avatar
Luker committed
    // using shared pointers to avoid locking too much or
    // worrying about deleting used stuff.
    class RAPTORQ_LOCAL Block_Work final : public Impl::Pool_Work {
    public:
        std::weak_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> work;
        std::weak_ptr<std::condition_variable> notify;
Luker's avatar
Luker committed
        std::weak_ptr<std::mutex> lock;
Luker's avatar
Luker committed
        Work_Exit_Status do_work (RaptorQ__v1::Work_State *state) override;
        ~Block_Work() override;
    };
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    class RAPTORQ_LOCAL Dec {
    public:
Luker's avatar
Luker committed
        Dec (const RaptorQ__v1::Block_Size symbols, const uint16_t symbol_size,
                                                const uint16_t padding_symbols)
Luker's avatar
Luker committed
        {
            dec = std::make_shared<RaptorQ__v1::Impl::Raw_Decoder<In_It>> (
Luker's avatar
Luker committed
                                        symbols, symbol_size, padding_symbols);
Luker's avatar
Luker committed
            reported = false;
        }
        std::shared_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> dec;
        bool reported;
    };

    static void wait_threads (Decoder<In_It, Fwd_It> *obj, const Compute flags,
                                    std::promise<std::pair<Error, uint8_t>> p);
    std::pair<Error, uint8_t> get_report (const Compute flags);
    std::shared_ptr<std::condition_variable> _pool_notify;
Luker's avatar
Luker committed
    std::shared_ptr<std::mutex> _pool_mtx;
Luker's avatar
Luker committed
    std::deque<std::thread> pool_wait;
Luker's avatar
Luker committed
    uint64_t _size;
    Impl::Partition part, _sub_blocks;
    std::map<uint8_t, Dec> decoders;
    std::mutex _mtx;
    uint16_t _symbol_size;
    int16_t pool_last_reported;
    uint8_t _blocks, _alignment;
    bool use_pool, exiting;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    std::vector<bool> decoded_sbn;
Luker's avatar
Luker committed

Luker's avatar
Luker committed


/////////////////
//
// Encoder
//
/////////////////
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::~Encoder()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    exiting = true; // stop notifying thread
    std::unique_lock<std::mutex> enc_lock (_mtx);
    for (auto &it : encoders) { // stop existing computations
        auto ptr = it.second.enc;
        if (ptr != nullptr)
            ptr->stop();
    }
Luker's avatar
Luker committed
    enc_lock.unlock();
    _pool_notify->notify_all();
    while (pool_wait.size() != 0) {
Luker's avatar
Luker committed
        std::unique_lock<std::mutex> lock (*_pool_mtx);
Luker's avatar
Luker committed
        if (pool_wait.size() != 0)
Luker's avatar
Luker committed
            _pool_notify->wait (lock);
    }
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
RFC6330_OTI_Common_Data Encoder<Rnd_It, Fwd_It>::OTI_Common() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!interleave)
        return 0;
Luker's avatar
Luker committed
    RFC6330_OTI_Common_Data ret;
Luker's avatar
Luker committed
    // first 40 bits: data length.
    ret = (static_cast<uint64_t> (_data_to - _data_from) *
            sizeof(typename std::iterator_traits<Rnd_It>::value_type)) << 24;
    // 8 bits: reserved
    // last 16 bits: symbol size
    ret += _symbol_size;

Luker's avatar
Luker committed
    return RaptorQ__v1::Impl::Endian::h_to_b<RFC6330_OTI_Common_Data> (ret);
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
RFC6330_OTI_Scheme_Specific_Data Encoder<Rnd_It, Fwd_It>::OTI_Scheme_Specific()
                                                                        const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!interleave)
        return 0;
Luker's avatar
Luker committed
    RFC6330_OTI_Scheme_Specific_Data ret;
Luker's avatar
Luker committed
    // 8 bit: source blocks
    ret = static_cast<uint32_t> (interleave.blocks()) << 24;
    // 16 bit: sub-blocks number (N)
    ret += static_cast<uint32_t> (interleave.sub_blocks()) << 8;
    // 8 bit: alignment
    ret += sizeof(typename std::iterator_traits<Rnd_It>::value_type);

Luker's avatar
Luker committed
    return RaptorQ__v1::Impl::Endian::h_to_b<RFC6330_OTI_Scheme_Specific_Data> (
                                                                        ret);
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::precompute_max_memory ()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    // give a good estimate on the amount of memory neede for the precomputation
    // of one block;
    // this will help you understand how many concurrent precomputations
    // you want to do :)

    if (!interleave)
        return 0;

    uint16_t symbols = interleave.source_symbols (0);

    uint16_t K_idx;
    for (K_idx = 0; K_idx < RaptorQ__v1::Impl::K_padded.size(); ++K_idx) {
        if (symbols < RaptorQ__v1::Impl::K_padded[K_idx])
            break;
    }
    if (K_idx == RaptorQ__v1::Impl::K_padded.size())
        return 0;

Luker's avatar
Luker committed
    auto S_H_W = RaptorQ__v1::Impl::S_H_W[K_idx];
    enum Tup { S = 0, H = 1, W = 2 };
Luker's avatar
Luker committed
    uint16_t matrix_cols = RaptorQ__v1::Impl::K_padded[K_idx] +
Luker's avatar
Luker committed
                                                    std::get<Tup::S> (S_H_W) +
                                                    std::get<Tup::H> (S_H_W);
Luker's avatar
Luker committed

    // Rough memory estimate: Matrix A, matrix X (=> *2) and matrix D.
    return matrix_cols * matrix_cols * 2 + _symbol_size * matrix_cols;
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::Block_Work::~Block_Work()
{
Luker's avatar
Luker committed
    // cleanup. have we benn called before the computation finished?
    auto locked_enc = work.lock();
Luker's avatar
Luker committed
    auto locked_notify = notify.lock();
    auto locked_mtx = lock.lock();
Luker's avatar
Luker committed
    if (locked_enc != nullptr && locked_notify != nullptr &&
Luker's avatar
Luker committed
                                                        locked_mtx != nullptr) {
Luker's avatar
Luker committed
        locked_enc->stop();
Luker's avatar
Luker committed
        std::unique_lock<std::mutex> p_lock (*locked_mtx);
        RQ_UNUSED(p_lock);
Luker's avatar
Luker committed
        locked_notify->notify_all();
    }
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Work_Exit_Status Encoder<Rnd_It, Fwd_It>::Block_Work::do_work (
Luker's avatar
Luker committed
                                                RaptorQ__v1::Work_State *state)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    auto locked_enc = work.lock();
    auto locked_notify = notify.lock();
    auto locked_mtx = lock.lock();
Luker's avatar
Luker committed
    if (locked_enc != nullptr && locked_notify != nullptr &&
Luker's avatar
Luker committed
                                                        locked_mtx != nullptr) {
Luker's avatar
Luker committed
        // encoding always works. It's one of the few constants of the universe.
        if (!locked_enc->generate_symbols (state))
            return Work_Exit_Status::STOPPED;   // or maybe not so constant
        work.reset();
Luker's avatar
Luker committed
        std::unique_lock<std::mutex> p_lock (*locked_mtx);
        RQ_UNUSED(p_lock);
Luker's avatar
Luker committed
        locked_notify->notify_all();
    }
    return Work_Exit_Status::DONE;
}

template <typename Rnd_It, typename Fwd_It>
std::future<std::pair<Error, uint8_t>> Encoder<Rnd_It, Fwd_It>::compute (
Luker's avatar
Luker committed
                                                            const Compute flags)
Luker's avatar
Luker committed
    using ret_t = std::pair<Error, uint8_t>;
    std::promise<ret_t> p;

    bool error = !interleave;
    // need some flags
    if (flags == Compute::NONE)
        error = true;

    // flag incompatibilities
    if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
                    (Compute::NONE != (flags & (Compute::PARTIAL_ANY |
                                                Compute::COMPLETE |
                                                Compute::NO_POOL)))) {
            error = true;
    } else if (Compute::NONE != (flags & Compute::PARTIAL_ANY) &&
                (Compute::NONE != (flags & (Compute::PARTIAL_FROM_BEGINNING |
                                            Compute::COMPLETE |
                                            Compute::NO_POOL)))) {
            error = true;
    } else if (Compute::NONE != (flags & Compute::COMPLETE) &&
                    Compute::NONE != (flags &(Compute::PARTIAL_FROM_BEGINNING |
                                                Compute::PARTIAL_ANY))) {
Luker's avatar
Luker committed
            error = true;
    }

    if (Compute::NONE != (flags & Compute::NO_POOL)) {
        std::unique_lock<std::mutex> lock (_mtx);
        RQ_UNUSED(lock);
        if (encoders.size() != 0) {
            // You can only say you won't use the pool *before* you start
            // decoding something!
            error = true;
        } else {
            use_pool = false;
            p.set_value ({Error::NONE, 0});
            return p.get_future();
        }
    }

    if (error) {
        p.set_value ({Error::WRONG_INPUT, 0});
        return p.get_future();
    }

    // flags are fine, add work to pool
    std::unique_lock<std::mutex> lock (_mtx);
    for (uint8_t block = 0; block < blocks(); ++block) {
        auto enc = encoders.find (block);
        if (enc == encoders.end()) {
            bool success;
            std::tie (enc, success) = encoders.emplace (
                                    std::piecewise_construct,
                                    std::forward_as_tuple (block),
                                    std::forward_as_tuple (&interleave, block));
            assert (success == true);
            std::unique_ptr<Block_Work> work = std::unique_ptr<Block_Work>(
                                                            new Block_Work());
            work->work = enc->second.enc;
            work->notify = _pool_notify;
Luker's avatar
Luker committed
            work->lock = _pool_mtx;
Luker's avatar
Luker committed
            Thread_Pool::get().add_work (std::move(work));
        }
    }
    lock.unlock();

    // spawn thread waiting for other thread exit.
    // this way we can set_value to the future when needed.
    auto future = p.get_future();
    if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
        wait_threads (this, flags, std::move(p));
    } else {
        std::unique_lock<std::mutex> pool_wait_lock (*_pool_mtx);
        RQ_UNUSED(pool_wait_lock);
        pool_wait.emplace_back (wait_threads, this, flags, std::move(p));
Luker's avatar
Luker committed
    }
    return future;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::wait_threads (Encoder<Rnd_It, Fwd_It> *obj,
Luker's avatar
Luker committed
                                    const Compute flags,
                                    std::promise<std::pair<Error, uint8_t>> p)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    auto _notify = obj->_pool_notify;
    while (true) {
Luker's avatar
Luker committed
        std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
        if (obj->exiting) {
            p.set_value ({Error::EXITING, 0});
            break;
        }
        auto status = obj->get_report (flags);
        if (Error::WORKING != status.first) {
            p.set_value (status);
            break;
        }

        _notify->wait (lock);
        lock.unlock();
    }

    // delete ourselves from the waiting thread vector.
    std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
    for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
        if (it->get_id() == std::this_thread::get_id()) {
            it->detach();
            obj->pool_wait.erase (it);
            break;
        }
    }
Luker's avatar
Luker committed
    lock.unlock();
Luker's avatar
Luker committed
    _notify->notify_all();
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
std::pair<Error, uint8_t> Encoder<Rnd_It, Fwd_It>::get_report (
Luker's avatar
Luker committed
                                                            const Compute flags)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (encoders.size() == 0)
        return {Error::WORKING, 0};
Luker's avatar
Luker committed
    if (Compute::NONE != (flags & Compute::COMPLETE) ||
                Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING)) {
        auto it = encoders.begin();
        for (; it != encoders.end(); ++it) {
            auto ptr = it->second.enc;
            if (ptr != nullptr) {
                if (!ptr->ready()) {
                    if (ptr->is_stopped())
                        return{Error::EXITING, 0};
                    break;
                }
            }
        }
        if (it == encoders.end()) {
            pool_last_reported = static_cast<int16_t> (encoders.size() - 1);
            return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
        }
        if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
                                    (pool_last_reported < (it->first - 1))) {
            pool_last_reported = it->first - 1;
            return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
        }
        return {Error::WORKING, 0};
    }
    if (Compute::NONE != (flags & Compute::PARTIAL_ANY)) {
        for (auto &it : encoders) {
            if (!it.second.reported) {
                auto ptr = it.second.enc;
                if (ptr != nullptr) {
                    if (ptr->ready())
                        return {Error::NONE, it.first};
                    if (ptr->is_stopped())
                        return{Error::EXITING, 0};
                }
            }
        }
    }
    return {Error::WORKING, 0}; // should never be reached
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
Luker's avatar
Luker committed
                                                            const uint32_t &id)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h<uint32_t> (id);
    constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    return encode (output, end, host_id & mask,
                                        static_cast<uint8_t> (host_id >> 24));
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
Luker's avatar
Luker committed
                                                            const uint32_t esi,
                                                            const uint8_t sbn)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (sbn >= interleave.blocks())
        return 0;

Luker's avatar
Luker committed
    const uint32_t syms = this->symbols (sbn);
    const uint32_t padding = static_cast<uint16_t>(this->extended_symbols (sbn))
                                                                        - syms;
    const uint32_t real_esi = esi < syms ? esi : esi + padding;

Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
    auto it = encoders.find (sbn);
    if (use_pool) {
        if (it == encoders.end())
            return 0;
        auto shared_enc = it->second.enc;
        if (!shared_enc->ready())
            return 0;
        lock.unlock();
Luker's avatar
Luker committed
        return shared_enc->Enc (real_esi, output, end);
Luker's avatar
Luker committed
    } else {
        if (it == encoders.end()) {
            bool success;
            std::tie (it, success) = encoders.emplace (std::make_pair (sbn,
                                                    Enc (&interleave, sbn)));
            auto shared_enc = it->second.enc;
            lock.unlock();
            RaptorQ__v1::Work_State state =
                                        RaptorQ__v1::Work_State::KEEP_WORKING;
            shared_enc->generate_symbols (&state);
Luker's avatar
Luker committed
            return shared_enc->Enc (real_esi, output, end);
Luker's avatar
Luker committed
        } else {
            auto shared_enc = it->second.enc;
            lock.unlock();
            if (!shared_enc->ready())
                return 0;
Luker's avatar
Luker committed
            return shared_enc->Enc (real_esi, output, end);
Luker's avatar
Luker committed
        }
    }
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::encode_packet (Fwd_It &output, const Fwd_It end,
                                                            const uint32_t &id)
{
    // RFC packet, section 4.4.2 page 11

    // return the size of the packet in BYTES
    using T = typename std::iterator_traits<Fwd_It>::value_type;

    // each packet has an header of 32 bits. We can not start writing the
    // encoded symbols in the middle of an iterator yet.
    if (sizeof(T) > sizeof(uint32_t) || sizeof(T) == 3) {
        assert (false && "libRaptorQ: sorry, encde_packets can only be used "
                                    "with types of at most 32 bits for now\n");
        return 0;
    }

    // first of all, check if we have enough space
    size_t max_pkt_len;
    if (std::is_same<typename std::iterator_traits<Fwd_It>::iterator_category,
                                    std::random_access_iterator_tag>::value) {
        // we were lucky with a random iterator.
        max_pkt_len = sizeof(T) * (end - output);
    } else {
        max_pkt_len = 0;
        auto out_copy = output;
        while (out_copy != end) {
            max_pkt_len += sizeof(T);
            ++out_copy;
        }
    }

    if (max_pkt_len <= sizeof(uint32_t))
        return 0; // we can only write the header, or not even that.

    max_pkt_len -= sizeof(uint32_t);

    constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
    const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h<uint32_t> (id);
    const uint8_t sbn = host_id >> 24;
    uint32_t symbol = host_id & mask;

    const uint32_t source_symbols = symbols (sbn);
    const bool only_source = (symbol >= source_symbols);

    size_t symbols_to_write = 0;
    while (max_pkt_len >= _symbol_size) {
        if (only_source && symbol >= source_symbols)
            break;
        uint32_t real_symbol_size;
        if (!only_source) {
            real_symbol_size = _symbol_size;
        } else if (sbn == (blocks() - 1) && symbol == source_symbols - 1) {
            // the last symbol the last block can have less bytes than
            // a full symbol, and we do not need to send the padding bytes.
            real_symbol_size = block_size (sbn) % _symbol_size;
            if (real_symbol_size == 0)
                real_symbol_size = _symbol_size;
        }
        if (max_pkt_len <= real_symbol_size)
            break;
        max_pkt_len -= real_symbol_size;
        ++symbol;
        ++symbols_to_write;
    }
    if (symbols_to_write == 0)
        return 0;

    // ok, now we can finally start writing something.
    // write the header
    const uint8_t *p = reinterpret_cast<const uint8_t*> (&id);
    uint8_t *p_out = reinterpret_cast<uint8_t*> (&*output);
    // manual loop unrolling ftw
    *(p_out++) = *(p++);
    if (sizeof(T) == 1) {
        ++output;
        p_out = reinterpret_cast<uint8_t*> (&*output);
    }
    *(p_out++) = *(p++);
    if (sizeof(T) == 2 || sizeof(T) == 1) {
        ++output;
        p_out = reinterpret_cast<uint8_t*> (&*output);
    }
    *(p_out++) = *(p++);
    if (sizeof(T) == 1) {
        ++output;
        p_out = reinterpret_cast<uint8_t*> (&*output);
    }
    *(p_out++) = *(p++);
    // sizeof(T) can only be 1,2,4, so we can safely just increment output here
    ++output;

    // FINALLY write the symbols
    size_t written = sizeof(uint32_t);
    symbol = static_cast<uint8_t> (id >> 24);
    while (symbols_to_write > 0) {
        size_t tmp_written = encode (output, end, symbol, sbn);
        if (tmp_written == 0)
            return written == sizeof(uint32_t) ? 0 : written;
        ++symbol;
    }
    return written;
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::free (const uint8_t sbn)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
    RQ_UNUSED(lock);
    auto it = encoders.find (sbn);
    if (it != encoders.end())
        encoders.erase (it);
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
uint8_t Encoder<Rnd_It, Fwd_It>::blocks() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!interleave)
        return 0;
    return interleave.blocks();
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::block_size (const uint8_t sbn) const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!interleave)
        return 0;
    return interleave.source_symbols (sbn) * interleave.symbol_size();
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbol_size() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!interleave)
        return 0;
    return interleave.symbol_size();
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbols (const uint8_t sbn) const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!interleave)
        return 0;
    return interleave.source_symbols (sbn);
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Block_Size Encoder<Rnd_It, Fwd_It>::extended_symbols (const uint8_t sbn) const
{
    if (!interleave)
        // outside of the enum, but you should have checked the
        // initialization anyway. not relly nice either way
        return static_cast<Block_Size> (0);
Luker's avatar
Luker committed
    return interleave.extended_symbols (sbn);
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::max_repair (const uint8_t sbn) const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!interleave)
        return 0;
    return static_cast<uint32_t> (std::pow (2, 20)) -
Luker's avatar
Luker committed
                                                interleave.source_symbols (sbn);
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed
/////////////////
//
// Decoder
//
/////////////////

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::~Decoder()
{
Luker's avatar
Luker committed
    exiting = true; // stop notifying thread
Luker's avatar
Luker committed
    _mtx.lock();
Luker's avatar
Luker committed
    for (auto &it : decoders) { // stop existing computations
        auto ptr = it.second.dec;
        if (ptr != nullptr)
            ptr->stop();
    }
Luker's avatar
Luker committed
    _mtx.unlock();
    _pool_notify->notify_all();
Luker's avatar
Luker committed
    while (pool_wait.size() != 0) {
Luker's avatar
Luker committed
        std::unique_lock<std::mutex> lock (*_pool_mtx);
Luker's avatar
Luker committed
        if (pool_wait.size() != 0)
Luker's avatar
Luker committed
            _pool_notify->wait (lock);
    }
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::free (const uint8_t sbn)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    _mtx.lock();
    auto it = decoders.find(sbn);
Luker's avatar
Luker committed
    if (it != decoders.end())
Luker's avatar
Luker committed
        decoders.erase(it);
    _mtx.unlock();
Luker's avatar
Luker committed
    _pool_notify->notify_all();
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
Luker's avatar
Luker committed
                                                            const uint32_t id)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
    const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h (id);
    const uint32_t esi = host_id & mask;
    const uint8_t sbn = host_id >> 24;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    return add_symbol (start, end, esi, sbn);
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
Luker's avatar
Luker committed
                                        const uint32_t esi, const uint8_t sbn)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
    if (!operator bool())
        return Error::INITIALIZATION;
Luker's avatar
Luker committed
    if (sbn >= _blocks)
        return Error::WRONG_INPUT;
Luker's avatar
Luker committed

    const uint16_t syms = this->symbols (sbn);
    const Block_Size b_size = this->extended_symbols (sbn);
    // we might have padding symbols. add thse to the esi.
    const uint16_t padding = static_cast<uint16_t> (b_size) - syms;
    const uint32_t real_esi = esi < syms ? esi : esi + padding;

    bool added_decoder = false;

Luker's avatar
Luker committed
    std::unique_lock<std::mutex> lock (_mtx);
    auto it = decoders.find (sbn);
    if (it == decoders.end()) {
        bool success;
        std::tie (it, success) = decoders.emplace (std::make_pair(sbn,
Luker's avatar
Luker committed
                                        Dec (b_size, _symbol_size, padding)));
Luker's avatar
Luker committed
        assert (success);
Luker's avatar
Luker committed
        added_decoder = true;
Luker's avatar
Luker committed
    }
    auto dec = it->second.dec;
    lock.unlock();

Luker's avatar
Luker committed
    // the last symbol in a block can have less size than the symbol size,
    // in which case we should add padding
    const bool add_padding = (esi == syms);

    auto err = dec->add_symbol (start, end, real_esi, add_padding);
Luker's avatar
Luker committed
    if (err != Error::NONE)
        return err;
    // automatically add work to pool if we use it and have enough data
Luker's avatar
Luker committed
    std::unique_lock<std::mutex> pool_lock (*_pool_mtx);
    RQ_UNUSED(pool_lock);
Luker's avatar
Luker committed
    if (use_pool && dec->can_decode()) {
        bool add_work = dec->add_concurrent (max_block_decoder_concurrency);
        if (add_work) {
            std::unique_ptr<Block_Work> work = std::unique_ptr<Block_Work>(
                                                            new Block_Work());
            work->work = dec;
            work->notify = _pool_notify;
Luker's avatar
Luker committed
            work->lock = _pool_mtx;