/* * Copyright (c) 2015-2016, Luca Fulchir, All rights reserved. * * This file is part of "libRaptorQ". * * libRaptorQ is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 * of the License, or (at your option) any later version. * * libRaptorQ is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * and a copy of the GNU Lesser General Public License * along with libRaptorQ. If not, see . */ #pragma once #include "RaptorQ/v1/common.hpp" #include "RaptorQ/v1/block_sizes.hpp" #ifdef RQ_HEADER_ONLY #include "RaptorQ/v1/RaptorQ_Iterators.hpp" #endif #include "RaptorQ/v1/Encoder.hpp" #include "RaptorQ/v1/Decoder.hpp" #include "RaptorQ/v1/Parameters.hpp" #include #include #include #include #include #include #include #include #include #include namespace RaptorQ__v1 { namespace Impl { template class RAPTORQ_LOCAL Encoder; template class RAPTORQ_LOCAL Decoder; } // namespace Impl // expose classes, but only if header-only #ifdef RQ_HEADER_ONLY // does this export symbols?? template using Encoder = Impl::Encoder; template using Decoder = Impl::Decoder; #endif namespace Impl { template class RAPTORQ_LOCAL Encoder { public: ~Encoder(); // used for precomputation Encoder (const Block_Size symbols, const size_t symbol_size); explicit operator bool() const; uint16_t symbols() const; size_t symbol_size() const; //FIXME: max smbol size is same as signed size_t uint32_t max_repair() const; #ifdef RQ_HEADER_ONLY RaptorQ__v1::It::Encoder::Symbol_Iterator begin_source(); RaptorQ__v1::It::Encoder::Symbol_Iterator end_source(); RaptorQ__v1::It::Encoder::Symbol_Iterator begin_repair(); RaptorQ__v1::It::Encoder::Symbol_Iterator end_repair (const uint32_t repair); #endif bool has_data() const; size_t set_data (const Rnd_It &from, const Rnd_It &to); void clear_data(); void stop(); bool precompute_sync(); bool compute_sync(); std::future precompute(); std::future compute(); size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t id); private: enum class Enc_State : uint8_t { INIT_ERROR = 1, NEED_DATA = 2, FULL = 3 }; const size_t _symbol_size; const uint16_t _symbols; Enc_State _state; Raw_Encoder encoder; DenseMtx precomputed; std::thread waiting; Rnd_It _from, _to; static void compute_thread (Encoder *obj, bool forced_precomputation, std::promise p); }; enum class RAPTORQ_LOCAL Dec_Report : uint8_t { PARTIAL_FROM_BEGINNING = RQ_COMPUTE_PARTIAL_FROM_BEGINNING, PARTIAL_ANY = RQ_COMPUTE_PARTIAL_ANY, COMPLETE = RQ_COMPUTE_COMPLETE }; template class RAPTORQ_LOCAL Decoder { public: using Report = Dec_Report; ~Decoder(); Decoder (const Block_Size symbols, const size_t symbol_size, const Report type); explicit operator bool() const; uint16_t symbols() const; size_t symbol_size() const; #ifdef RQ_HEADER_ONLY RaptorQ__v1::It::Decoder::Symbol_Iterator begin(); RaptorQ__v1::It::Decoder::Symbol_Iterator end(); #endif Error add_symbol (In_It &from, const In_It to, const uint32_t esi); void end_of_input(); bool can_decode() const; void stop(); uint16_t needed_symbols() const; void set_max_concurrency (const uint16_t max_threads); Decoder_Result decode_once(); std::pair poll(); std::pair wait_sync(); std::future> wait(); Error decode_symbol (Fwd_It &start, const Fwd_It end,const uint16_t esi); // return number of bytes written std::pair decode_bytes (Fwd_It &start, const Fwd_It end, const size_t from_byte, const size_t skip); private: uint16_t _max_threads; const uint16_t _symbols; const size_t _symbol_size; std::atomic last_reported; const Report _type; RaptorQ__v1::Work_State work; Raw_Decoder dec; // 2* symbols. Actually tracks available and reported symbols. // each symbol gets 2 bool: 1= available, 2=reported std::deque> symbols_tracker; std::mutex _mtx; std::condition_variable _cond; std::vector waiting; static void waiting_thread (Decoder *obj, std::promise> p); }; /////////////////// //// Encoder /////////////////// template Encoder::~Encoder() { encoder.stop(); if (waiting.joinable()) waiting.join(); } template Encoder::Encoder (const Block_Size symbols, const size_t symbol_size) : _symbol_size (symbol_size), _symbols (static_cast (symbols)), encoder (symbols, _symbol_size) { IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder"); IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder"); // check for proper initialization uint16_t idx; for (idx = 0; idx < (*blocks).size(); ++idx) { if ((*blocks)[idx] == symbols) break; } // check that the user did not try some cast trickery, // and maximum size is ssize_t::max. But ssize_t is not standard, // so we search the maximum ourselves. if (idx == (*blocks).size() || symbol_size >= std::pow (2, (sizeof(size_t) == 4 ? 31 : 63))) { _state = Enc_State::INIT_ERROR; } _state = Enc_State::NEED_DATA; } template Encoder::operator bool() const { return _state != Enc_State::INIT_ERROR; } template uint16_t Encoder::symbols() const { if (_state == Enc_State::INIT_ERROR) return 0; return _symbols; } template size_t Encoder::symbol_size() const { if (_state == Enc_State::INIT_ERROR) return 0; return _symbol_size; } template uint32_t Encoder::max_repair() const { // you can have up to 56403 symbols in a block // rfc6330 limits you to 992173 repair symbols // but limits are meant to be broken! // the limit sould be up to 4294967279 repair symbols 2^32-(_param.S + H) // but people might misuse the API, and call end_repair(max_repair), // which would overflow. // We are sorry for taking away from you that 0.0014% of repair symbols. if (_state == Enc_State::INIT_ERROR) return 0; auto _param = Parameters (_symbols); return static_cast (std::numeric_limits::max() - _param.L); } #ifdef RQ_HEADER_ONLY template RaptorQ__v1::It::Encoder::Symbol_Iterator Encoder::begin_source() { return RaptorQ__v1::It::Encoder::Symbol_Iterator (this, 0); } template RaptorQ__v1::It::Encoder::Symbol_Iterator Encoder::end_source() { return RaptorQ__v1::It::Encoder::Symbol_Iterator (this, _symbols); } template RaptorQ__v1::It::Encoder::Symbol_Iterator Encoder::begin_repair() { return end_source(); } template RaptorQ__v1::It::Encoder::Symbol_Iterator Encoder::end_repair (const uint32_t repair) { return RaptorQ__v1::It::Encoder::Symbol_Iterator (nullptr, _symbols + repair); } #endif template bool Encoder::has_data() const { if (_state == Enc_State::INIT_ERROR) return false; return _state == Enc_State::FULL; } template size_t Encoder::set_data (const Rnd_It &from, const Rnd_It &to) { if (_state == Enc_State::INIT_ERROR) return 0; _from = from; _to = to; _state = Enc_State::FULL; return static_cast(_to - _from) * sizeof(typename std::iterator_traits::value_type); } template void Encoder::clear_data() { if (_state == Enc_State::INIT_ERROR) return; _state = Enc_State::NEED_DATA; encoder.clear_data(); } template void Encoder::stop() { encoder.stop(); } template bool Encoder::precompute_sync() { if (_state == Enc_State::INIT_ERROR) return false; static RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING; if (precomputed.rows() == 0) { precomputed = encoder.get_precomputed (&work); if (precomputed.rows() == 0) return false; // exit was forced. } if (_state == Enc_State::FULL) encoder.generate_symbols (precomputed, &_from, &_to); return true; } template bool Encoder::compute_sync() { if (_state == Enc_State::INIT_ERROR) return false; static RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING; if (encoder.ready()) return true; if (_state == Enc_State::FULL) { return encoder.generate_symbols (&work, &_from, &_to); } else { if (precomputed.rows() != 0) return true; precomputed = encoder.get_precomputed (&work); return precomputed.rows() != 0; } } template void Encoder::compute_thread ( Encoder *obj, bool force_precomputation, std::promise p) { static RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING; if (force_precomputation) { if (obj->precomputed.rows() == 0) obj->precomputed = obj->encoder.get_precomputed (&work); if (obj->precomputed.rows() == 0) { // encoder always works. only possible reason: p.set_value (Error::EXITING); return; } // if we finished getting data by the time the computation // finished, update it all. if (obj->_state == Enc_State::FULL && !obj->encoder.ready()) obj->encoder.generate_symbols (obj->precomputed, &obj->_from, &obj->_to); p.set_value (Error::NONE); } else { if (obj->encoder.ready()) { p.set_value (Error::NONE); return; } if (obj->_state == Enc_State::FULL) { if (obj->encoder.generate_symbols (&work, &obj->_from, &obj->_to)) { p.set_value (Error::NONE); return; } else { // only possible reason: p.set_value (Error::EXITING); return; } } else { if (obj->precomputed.rows() == 0) { obj->precomputed = obj->encoder.get_precomputed (&work); if (obj->precomputed.rows() == 0) { // only possible reason: p.set_value (Error::EXITING); return; } } if (obj->_state == Enc_State::FULL) { // if we finished getting data by the time the computation // finished, update it all. obj->encoder.generate_symbols (obj->precomputed, &obj->_from, &obj->_to); } p.set_value (Error::NONE); return; } } } template std::future Encoder::precompute() { std::promise p; if (_state == Enc_State::INIT_ERROR) { p.set_value (Error::INITIALIZATION); return p.get_future(); } auto future = p.get_future(); // only one waiting thread for the encoder if (waiting.joinable()) { p.set_value (Error::WORKING); return p.get_future(); } waiting = std::thread (compute_thread, this, true, std::move(p)); return future; } template std::future Encoder::compute() { std::promise p; if (_state == Enc_State::INIT_ERROR) { p.set_value (Error::INITIALIZATION); return p.get_future(); } auto future = p.get_future(); // only one waiting thread for the encoder if (waiting.joinable()) { p.set_value (Error::WORKING); return p.get_future(); } waiting = std::thread (compute_thread, this, false, std::move(p)); return future; } template size_t Encoder::encode (Fwd_It &output, const Fwd_It end, const uint32_t id) { if (_state == Enc_State::INIT_ERROR) return 0; // returns number of iterators written if (_state == Enc_State::FULL) { if (!encoder.ready()) { if (precomputed.rows() == 0) return 0; encoder.generate_symbols (precomputed, &_from, &_to); } return encoder.Enc (id, output, end); } return 0; } /////////////////// //// Decoder /////////////////// template Decoder::~Decoder () { work = RaptorQ__v1::Work_State::ABORT_COMPUTATION; _cond.notify_all(); // wait threads to exit do { std::unique_lock lock (_mtx); if (waiting.size() == 0) break; _cond.wait (lock); lock.unlock(); } while (waiting.size() != 0); } template Decoder::Decoder (const Block_Size symbols, const size_t symbol_size, const Report type) :_symbols (static_cast (symbols)), _symbol_size (symbol_size), _type (type), dec (symbols, symbol_size) { IS_INPUT(In_It, "RaptorQ__v1::Decoder"); IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder"); // check for proper initialization uint16_t idx; for (idx = 0; idx < (*blocks).size(); ++idx) { if ((*blocks)[idx] == symbols) break; } // check that the user did not try some cast trickery, // and maximum size is ssize_t::max. But ssize_t is not standard, // so we search the maximum ourselves. if (idx == (*blocks).size() || symbol_size >= std::pow (2, (sizeof(size_t) == 4 ? 31 : 63))) { return; } if (type != Report::PARTIAL_FROM_BEGINNING && type != Report::PARTIAL_ANY && type != Report::COMPLETE) { return; // no cast trickey plz } last_reported.store (0); symbols_tracker = std::deque> (2 * _symbols); for (idx = 0; idx < 2 * _symbols; ++idx) symbols_tracker[idx] = false; work = RaptorQ__v1::Work_State::KEEP_WORKING; _max_threads = 2; } template Decoder::operator bool() const { return symbols_tracker.size() > 0; } template uint16_t Decoder::symbols() const { if (symbols_tracker.size() == 0) return 0; return _symbols; } template size_t Decoder::symbol_size() const { if (symbols_tracker.size() == 0) return 0; return _symbol_size; } #ifdef RQ_HEADER_ONLY template RaptorQ__v1::It::Decoder::Symbol_Iterator Decoder::begin() { return RaptorQ__v1::It::Decoder::Symbol_Iterator (this, 0); } template RaptorQ__v1::It::Decoder::Symbol_Iterator Decoder::end() { return RaptorQ__v1::It::Decoder::Symbol_Iterator (nullptr, _symbols); } #endif template uint16_t Decoder::needed_symbols() const { if (symbols_tracker.size() == 0) return 0; return dec.needed_symbols(); } template Error Decoder::add_symbol (In_It &from, const In_It to, const uint32_t esi) { if (symbols_tracker.size() == 0) return Error::INITIALIZATION; auto ret = dec.add_symbol (from, to, esi); if (ret == Error::NONE && esi < _symbols) { symbols_tracker [2 * esi].store (true); _cond.notify_all(); } return ret; } template std::pair Decoder::poll () { if (symbols_tracker.size() == 0) return {Error::INITIALIZATION, 0}; uint32_t idx; uint32_t last; bool expected = false; switch (_type) { case Report::PARTIAL_FROM_BEGINNING: // report the number of symbols that are known, starting from // the beginning. last = last_reported.load(); idx = last; for (; idx < symbols_tracker.size(); idx += 2) { if (symbols_tracker[idx].load() == true) { ++idx; if (symbols_tracker[idx].load() == false) symbols_tracker[idx].store (true); --idx; } else { break; } } idx /= 2; if (idx > last) { while (!last_reported.compare_exchange_weak (last, idx)) { // expected is now "last_reported.load()" if (last >= idx) { // other thread already reported more than us. // do not report things twice. if (dec.ready()) { last_reported.store (_symbols); return {Error::NONE, _symbols}; } if (dec.threads() > 0) return {Error::WORKING, 0}; return {Error::NEED_DATA, 0}; } // else we can report the new stuff } return {Error::NONE, idx}; } // nothing to report if (dec.ready()) { last_reported.store (_symbols); return {Error::NONE, _symbols}; } if (dec.threads() > 0) return {Error::WORKING, 0}; return {Error::NEED_DATA, 0}; case Report::PARTIAL_ANY: // report the first available, not yet reported. // or return {NONE, _symbols} if all have been reported if (dec.ready()) return {Error::NONE, _symbols}; for (idx = 0; idx < static_cast (symbols_tracker.size()); idx += 2) { if (symbols_tracker[idx].load() == true) { ++idx; if (symbols_tracker[idx].load() == false) { expected = false; if (symbols_tracker[idx]. compare_exchange_strong (expected, true)) { return {Error::NONE, idx / 2}; } // else some other thread raced us, keep trying other // symbols } } } if (dec.ready()) return {Error::NONE, _symbols}; if (dec.threads() > 0) return {Error::WORKING, 0}; return {Error::NEED_DATA, 0}; case Report::COMPLETE: auto init = last_reported.load(); idx = init * 2; for (; idx < symbols_tracker.size(); idx += 2) { if (symbols_tracker[idx].load() == false) { idx /= 2; while (!last_reported.compare_exchange_weak(init, idx)) idx = std::max(init, idx); if (dec.threads() > 0) return {Error::WORKING, 0}; return {Error::NEED_DATA, 0}; } } last_reported.store (_symbols); return {Error::NONE, 0}; } return {Error::WORKING, 0}; } template void Decoder::waiting_thread (Decoder *obj, std::promise> p) { while (obj->work == RaptorQ__v1::Work_State::KEEP_WORKING) { bool compute = obj->dec.add_concurrent (obj->_max_threads); if (compute) { obj->decode_once(); std::unique_lock lock (obj->_mtx); obj->dec.drop_concurrent(); lock.unlock(); obj->_cond.notify_all(); // notify other waiting threads } std::unique_lock lock (obj->_mtx); // poll() does not actually need to be locked, but we use the // lock-wait mechanism to signal the arrival of new symbols, // so that we retry only when we get new data. auto res = obj->poll(); if (res.first == Error::NONE || (obj->dec.end_of_input == true && !obj->dec.can_decode() && obj->dec.threads() == 0 && res.first == Error::NEED_DATA)){ p.set_value (res); break; } obj->_cond.wait (lock); lock.unlock(); } if (obj->work != RaptorQ__v1::Work_State::KEEP_WORKING) p.set_value ({Error::EXITING, 0}); std::unique_lock lock (obj->_mtx); RQ_UNUSED (lock); for (auto th = obj->waiting.begin(); th != obj->waiting.end(); ++th) { if (std::this_thread::get_id() == th->get_id()) { th->detach(); obj->waiting.erase (th); break; } } lock.unlock(); obj->_cond.notify_all(); // notify exit to destructor } template std::pair Decoder::wait_sync () { if (symbols_tracker.size() == 0) return {Error::INITIALIZATION, 0}; std::promise> p; auto fut = p.get_future(); waiting_thread (this, std::move(p)); fut.wait(); return fut.get(); } template std::future> Decoder::wait () { std::promise> p; if (symbols_tracker.size() == 0) { p.set_value ({Error::INITIALIZATION, 0}); return p.get_future(); } auto f = p.get_future(); waiting.emplace_back (waiting_thread, this, std::move(p)); return f; } template void Decoder::end_of_input() { if (symbols_tracker.size() != 0) dec.end_of_input = true; } template bool Decoder::can_decode() const { if (symbols_tracker.size() == 0) return false; return dec.can_decode(); } template void Decoder::set_max_concurrency (const uint16_t max_threads) { if (symbols_tracker.size() != 0) _max_threads = max_threads; } template Decoder_Result Decoder::decode_once() { if (symbols_tracker.size() == 0) return Decoder_Result::STOPPED; auto res = dec.decode (&work); if (res == Decoder_Result::DECODED) { std::unique_lock lock (_mtx); RQ_UNUSED (lock); if (_type != Report::COMPLETE) { uint32_t id = last_reported.load(); for (; id < symbols_tracker.size(); id += 2) symbols_tracker[id].store (true); } last_reported.store(_symbols); lock.unlock(); } return res; } template void Decoder::stop() { if (symbols_tracker.size() == 0) return; work = RaptorQ__v1::Work_State::ABORT_COMPUTATION; _cond.notify_all(); } template std::pair Decoder::decode_bytes (Fwd_It &start, const Fwd_It end, const size_t from_byte, const size_t skip) { using T = typename std::iterator_traits::value_type; if (symbols_tracker.size() == 0 || skip >= sizeof(T) || from_byte >= static_cast (_symbols * _symbol_size)) { return {0, 0}; } auto decoded = dec.get_symbols(); uint16_t esi = static_cast (from_byte / static_cast (_symbol_size)); uint16_t byte = static_cast (from_byte % static_cast (_symbol_size)); size_t offset_al = skip; T element = static_cast (0); if (skip != 0) { uint8_t *p = reinterpret_cast (&*start); for (size_t keep = 0; keep < skip; ++keep) { element += static_cast (*(p++)) << keep * 8; } } size_t written = 0; while (start != end && esi < _symbols && dec.has_symbol (esi)) { element += static_cast (static_cast ((*decoded)(esi, byte))) << offset_al * 8; ++offset_al; if (offset_al == sizeof(T)) { *start = element; ++start; written += offset_al; offset_al = 0; element = static_cast (0); } ++byte; if (byte == decoded->cols()) { byte = 0; ++esi; } } if (start != end && offset_al != 0) { // we have more stuff in "element", but not enough to fill // the iterator. Do not overwrite additional data of the iterator. uint8_t *out = reinterpret_cast (&*start); uint8_t *in = reinterpret_cast (&element); for (size_t idx = 0; idx < offset_al; ++idx, ++out, ++in) *out = *in; written += offset_al; } return {written, offset_al}; } template Error Decoder::decode_symbol (Fwd_It &start, const Fwd_It end, const uint16_t esi) { if (symbols_tracker.size() == 0) return Error::INITIALIZATION; auto start_copy = start; size_t esi_byte = esi * _symbol_size; auto pair = decode_bytes (start_copy, end, esi_byte, 0); if (pair.first == _symbol_size) { start = start_copy; return Error::NONE; } return Error::NEED_DATA; } } // namespace Impl } // namespace RaptorQ__v1