/* * Copyright (c) 2015-2016, Luca Fulchir, All rights reserved. * * This file is part of "libRaptorQ". * * libRaptorQ is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 * of the License, or (at your option) any later version. * * libRaptorQ is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * and a copy of the GNU Lesser General Public License * along with libRaptorQ. If not, see . */ #pragma once #include "RaptorQ/v1/common.hpp" #include "RaptorQ/v1/RaptorQ_Iterators.hpp" #include "RaptorQ/v1/Encoder.hpp" #include "RaptorQ/v1/Decoder.hpp" #include namespace RaptorQ__v1 { namespace Impl { template class RAPTORQ_LOCAL Encoder { public: ~Encoder(); // used for precomputation Encoder (const uint16_t symbols, const uint16_t symbol_size); // with data at the beginning. Less work. Encoder (const Rnd_It data_from, const Rnd_It data_to, const uint16_t symbol_size); RaptorQ__v1::Encoder::Symbol_Iterator begin (); RaptorQ__v1::Encoder::Symbol_Iterator end (const uint32_t repair); uint64_t add_data (Rnd_It from, const Rnd_It to); bool compute_sync(); std::future compute(); uint64_t encode (Fwd_It &output, const Fwd_It end, const uint32_t &id); private: enum class Data_State : uint8_t { NEED_DATA = 1, // first constructor used. no interleaver until FULL FULL = 2, INIT = 3 // second constructor used: we already have the interleaver }; std::unique_ptr> interleaver; Raw_Encoder encoder; DenseMtx precomputed; std::vector::value_type> data; const uint16_t _symbols, _symbol_size; const RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING; Data_State state; }; template class RAPTORQ_LOCAL Decoder { public: enum class RAPTORQ_LOCAL Report : uint8_t { PARTIAL_FROM_BEGINNING = 1, PARTIAL_ANY = 2, COMPLETE = 3 }; ~Decoder(); Decoder (const uint64_t bytes, const uint16_t symbol_size, const Report type); RaptorQ__v1::Decoder::Symbol_Iterator begin (); RaptorQ__v1::Decoder::Symbol_Iterator end (); Error add_symbol (In_It from, const In_It to, const uint32_t esi); using Decoder_Result = typename Raw_Decoder::Decoder_Result; bool can_decode() const; Decoder_Result decode(); void stop(); std::pair poll() const; std::future> wait (bool blocking) const; // return number of symbols. // simbol_size % sizeof(FWD) == 0 else assert! // returns number of iterators written uint64_t decode_symbol (Fwd_It &start, const Fwd_It end,const uint16_t esi); std::pair decode_bytes (Fwd_It &start, const Fwd_It end, const size_t from_byte, const size_t skip); private: static uint16_t get_symbols (const uint64_t bytes, const uint16_t symbol_size); static void waiting_thread (Decoder *obj, std::promise> p); Raw_Decoder dec; // 2* symbols. actually tracks available and reported symbols. std::vector symbols_tracker; std::mutex _mtx; std::condition_variable _cond; std::vector waiting; const uint16_t _symbols, _symbol_size; const Report _type; RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING; }; /////////////////// //// Encoder /////////////////// template Encoder::~Encoder() { encoder->stop(); } template Encoder::Encoder (const uint16_t symbols, const uint16_t symbol_size) : interleaver (nullptr), encoder (symbols), _symbols (symbols), _symbol_size (symbol_size) { IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder"); IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder"); state = Data_State::INIT; } template Encoder::Encoder (const Rnd_It data_from, const Rnd_It data_to, const uint16_t symbol_size) : interleaver (new RFC6330__v1::Impl::Interleaver (data_from, data_to, _symbol_size, SIZE_MAX, symbol_size)), encoder (interleaver.get(), 0), _symbols (0), _symbol_size (0) // these last 2 constants are unused { IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder"); IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder"); state = Data_State::NEED_DATA; } template RaptorQ__v1::Encoder::Symbol_Iterator Encoder::begin() { return RaptorQ__v1::Encoder::Symbol_Iterator (this, 0); } template RaptorQ__v1::Encoder::Symbol_Iterator Encoder::end (const uint32_t repair) { return RaptorQ__v1::Encoder::Symbol_Iterator (nullptr, _symbols + repair); } template uint64_t Encoder::add_data (Rnd_It from, const Rnd_It to) { uint64_t written = 0; using T = typename std::iterator_traits::value_type; if (state != Data_State::NEED_DATA) return written; while (from != to) { if ((data.size() * sizeof (T) >= _symbols * _symbol_size)) { state = Data_State::FULL; break; } data.push_back (from); ++from; ++written; } return written; } template bool Encoder::compute_sync() { if (state == Data_State::INIT) { return encoder.generate_symbols (&work); } else { precomputed = encoder.get_precomputed (&work); return precomputed.rows() != 0; } return false; } template uint64_t Encoder::encode (Fwd_It &output, const Fwd_It end, const uint32_t &id) { switch (state) { case Data_State::INIT: if (!encoder->ready()) return 0; return encoder.Enc (id, output, end); case Data_State::NEED_DATA: return 0; case Data_State::FULL: if (!encoder->ready()) { if (precomputed.rows() == 0) { return 0; } else { interleaver = std::unique_ptr< RFC6330__v1::Impl::Interleaver> ( new RFC6330__v1::Impl::Interleaver ( data.begin(), data.end(), _symbol_size, SIZE_MAX, _symbol_size)); encoder.generate_symbols (precomputed, interleaver); precomputed = DenseMtx(); // free mem } } return encoder.Enc (id, output, end); } } /////////////////// //// Decoder /////////////////// template uint16_t Decoder::get_symbols (const uint64_t bytes, const uint16_t symbol_size) { uint16_t symbols = static_cast (bytes / symbol_size); if (bytes % symbol_size != 0) ++symbols; return symbols; } template Decoder::~Decoder () { work = RaptorQ__v1::Work_State::ABORT_COMPUTATION; _cond.notify_all(); // wait threads to exit do { std::unique_lock lock (_mtx); if (waiting.size() == 0) break; _cond.wait (lock); lock.unlock(); } while (waiting.size() != 0); } template Decoder::Decoder (const uint64_t bytes, const uint16_t symbol_size, const Report type) :_symbols (get_symbols (bytes, symbol_size)), _symbol_size (symbol_size), _type (type) { IS_INPUT(In_It, "RaptorQ__v1::Decoder"); IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder"); dec = Raw_Decoder (_symbols, symbol_size); symbols_tracker = std::vector (2 * _symbols, false); } template RaptorQ__v1::Decoder::Symbol_Iterator Decoder::begin() { return RaptorQ__v1::Decoder::Symbol_Iterator (this, 0); } template RaptorQ__v1::Decoder::Symbol_Iterator Decoder::end() { return RaptorQ__v1::Decoder::Symbol_Iterator (nullptr, _symbols); } template Error Decoder::add_symbol (In_It from, const In_It to, const uint32_t esi) { auto ret = dec.add_symbol (from, to, esi); if (ret == Error::NONE && esi < _symbols) { symbols_tracker [2 * esi] = true; std::unique_lock lock (_mtx); RQ_UNUSED (lock); _cond.notify_all(); } return ret; } template std::pair Decoder::poll () const { switch (_type) { case Report::PARTIAL_FROM_BEGINNING: for (uint32_t id = 0; id < symbols_tracker.size(); id += 2) { if (symbols_tracker[id] == true) { ++id; if (symbols_tracker[id] == false) return {Error::NONE, id / 2}; } else { break; } } if (dec->ready()) return {Error::NONE, 0}; if (dec.can_decode()) return {Error::NEED_DATA, 0}; return {Error::WORKING, 0}; case Report::PARTIAL_ANY: for (uint32_t id = 0; id < symbols_tracker.size(); id += 2) { if (symbols_tracker[id] == true) { ++id; if (symbols_tracker[id] == false) return {Error::NONE, id / 2}; } } if (dec->ready()) return {Error::NONE, 0}; if (dec.can_decode()) return {Error::NEED_DATA, 0}; return {Error::WORKING, 0}; case Report::COMPLETE: for (uint32_t id = 0; id < symbols_tracker.size(); id += 2) { if (symbols_tracker[id] == false) { if (dec.can_decode()) return {Error::WORKING, 0}; return {Error::NEED_DATA, 0}; } } return {Error::NONE, 0}; } return {Error::WORKING, 0}; } template void Decoder::waiting_thread (Decoder *obj, std::promise> p) { while (obj->work == RaptorQ__v1::Work_State::KEEP_WORKING) { std::unique_lock lock (obj->_mtx); auto res = obj->poll(); if (obj->poll.first == Error::NONE) { p.set_value (res); break; } obj->_cond.wait (lock); res = obj->poll(); lock.unlock(); if (obj->poll.first == Error::NONE) { p.set_value (res); break; } } if (obj->work != RaptorQ__v1::Work_State::KEEP_WORKING) p.set_value ({Error::EXITING, 0}); std::unique_lock lock (obj->_mtx); RQ_UNUSED (lock); for (auto th = obj->waiting.begin(); th != obj->waiting.end(); ++th) { if (std::this_thread::get_id() == th.id()) { th.detach(); obj->waiting.erase (th); break; } } obj->_cond.notify_all(); // notify exit to destructor } template std::future> Decoder::wait ( const bool blocking) const { std::promise> p; if (blocking) { waiting_thread (this, std::move(p)); } else { waiting.emplace_back (waiting_thread, this, std::move(p)); } } template bool Decoder::can_decode() const { return dec.can_decode(); } template typename Decoder::Decoder_Result Decoder::decode() { auto res = dec.decode (&work); if (res == Decoder_Result::DECODED) { std::unique_lock lock (_mtx); _cond.notify_all(); } return res; } template void Decoder::stop() { work = RaptorQ__v1::Work_State::ABORT_COMPUTATION; std::unique_lock lock (_mtx); _cond.notify_all(); } template std::pair Decoder::decode_bytes (Fwd_It &start, const Fwd_It end, const uint64_t from_byte, const size_t skip) { if (!dec.ready()) return {0, 0}; auto decoded = dec->get_symbols(); uint16_t esi = from_byte / decoded.cols(); uint16_t byte = from_byte % decoded.cols(); using T = typename std::iterator_traits::value_type; size_t offset_al = skip; T element = *start; uint64_t written = 0; while (start != end && esi < decoded.rows()) { element += static_cast (static_cast((*decoded)(esi, byte))) << offset_al * 8; ++offset_al; if (offset_al == sizeof(T)) { *start = element; written += offset_al; offset_al = 0; element = static_cast (0); } ++byte; if (byte == decoded->cols()) { byte = 0; ++esi; } } if (start != end && offset_al != 0) { // we have more stuff in "element", but not enough to fill // the iterator. *start = element; written += offset_al; } return {written, offset_al}; } template uint64_t Decoder::decode_symbol (Fwd_It &start, const Fwd_It end, const uint16_t esi) { using T = typename std::iterator_traits::value_type; assert ((_symbol_size % sizeof(T)) == 0); if (!dec.ready()) return 0; size_t esi_byte = esi * dec->cols(); auto pair = decode_bytes (start, end, esi_byte, 0); assert (pair.second == 0 ); return pair.first / _symbol_size; } } // namespace Impl } // namespace RaptorQ__v1