/* * Copyright (c) 2015, Luca Fulchir, All rights reserved. * * This file is part of "libRaptorQ". * * libRaptorQ is free software: you can redistribute it and/or modify * it under the terms of the GNU Lesser General Public License as * published by the Free Software Foundation, either version 3 * of the License, or (at your option) any later version. * * libRaptorQ is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * and a copy of the GNU Lesser General Public License * along with libRaptorQ. If not, see . */ #ifndef RAPTORQ_HPP #define RAPTORQ_HPP ///////////////////// // // These templates are just a wrapper around the // functionalities offered by the RaptorQ::Impl namespace // So if you want to see what the algorithm looks like, // you are in the wrong place // ///////////////////// #include "Interleaver.hpp" #include "De_Interleaver.hpp" #include "Encoder.hpp" #include "Decoder.hpp" #include #include #include #include #include #include #include #include #include #include #include #include namespace RaptorQ { template class RAPTORQ_API Encoder; template class RAPTORQ_API Symbol { public: Symbol (Encoder *enc, const uint32_t esi, const uint8_t sbn) : _enc (enc), _esi (esi), _sbn (sbn) {} std::vector operator()() { std::vector ret; (*this) (ret, 0); return ret; } bool operator() (std::vector &output, const size_t offset = 0) { return _enc->encode (output, _esi, _sbn); } bool operator() (T *output); private: Encoder *_enc; const uint32_t _esi; const uint8_t _sbn; }; template class RAPTORQ_API Symbol_Iterator : public std::iterator> { public: Symbol_Iterator (Encoder *enc, const uint32_t esi, const uint8_t sbn) : _enc (enc), _esi (esi), _sbn (sbn) {} Symbol operator*() { return Symbol (_enc, _esi, _sbn); } Symbol_Iterator& operator++() { ++_esi; return *this; } Symbol_Iterator operator++ (const int i) const { Symbol_Iterator ret (_esi + i, _sbn); return ret; } bool operator== (const Symbol_Iterator it) const { return it._esi == _esi && it._sbn == _sbn; } bool operator!= (const Symbol_Iterator it) const { return it._esi != _esi || it._sbn != _sbn; } private: Encoder *_enc; uint32_t _esi; const uint8_t _sbn; }; template class RAPTORQ_API Block { public: Block (Encoder *enc, const uint16_t symbols, const uint8_t sbn) : _enc (enc), _symbols (symbols), _sbn (sbn) {} Symbol_Iterator begin_source() const { return Symbol_Iterator (_enc, 0, _sbn); } Symbol_Iterator end_source() const { return Symbol_Iterator (_enc, _symbols, _sbn); } Symbol_Iterator begin_repair() const { return Symbol_Iterator (_enc, _symbols, _sbn); } Symbol_Iterator end_repair (const uint32_t max_repair) const { uint32_t max_r = max_repair; if (max_repair >= std::pow (2, 20) - _symbols) max_r = std::pow (2, 20) - _symbols; return Symbol_Iterator (_enc, _symbols + max_r, _sbn); } private: Encoder * _enc; const uint16_t _symbols; const uint8_t _sbn; }; template class RAPTORQ_API Block_Iterator : public std::iterator> { public: Block_Iterator (Encoder *enc, const Impl::Partition part, uint8_t sbn) :_enc (enc), _part (part), _sbn (sbn) {} Block operator*() { if (_sbn > _part.num (0)) return Block (_enc, _part.size (0), _sbn); return Block (_enc, _part.size (1), _sbn); } Block_Iterator& operator++() { ++_sbn; return *this; } Block_Iterator operator++ (const int i) const { Block_Iterator ret = *this; ret._sbn += i; return ret; } bool operator== (const Block_Iterator it) const { return it._sbn == _sbn; } bool operator!= (const Block_Iterator it) const { return it._sbn != _sbn; } private: Encoder *_enc; const Impl::Partition _part; uint8_t _sbn; }; static const uint64_t max_data = 946270874880; typedef uint64_t OTI_Common_Data; typedef uint32_t OTI_Scheme_Specific_Data; template class RAPTORQ_API Encoder { public: const uint16_t _symbol_size; Encoder (std::shared_ptr> data, const uint16_t min_subsymbol_size, const uint16_t symbol_size, const size_t max_memory) : _data (data), _symbol_size (symbol_size), _min_subsymbol (min_subsymbol_size), _mem (max_memory) { static_assert(std::is_unsigned::value, "RaptorQ::Encoder: can only be used with unsigned types"); // max size: between 2^39 and 2^40 if (data == nullptr || data->size() *sizeof(T) > max_data) return; interleave = std::unique_ptr> ( new Impl::Interleaver (data.get(), _min_subsymbol, _mem, _symbol_size)); } Block_Iterator begin () { return Block_Iterator (this, interleave->get_partition(), 0); } Block_Iterator end () { auto part = interleave->get_partition(); return Block_Iterator (this, part, part.num(0) + part.num(1)); } bool operator()() const { return interleave != nullptr; } OTI_Common_Data OTI_Common() const; OTI_Scheme_Specific_Data OTI_Scheme_Specific() const; void precompute (const uint8_t threads, const bool background); size_t precompute_max_memory (); bool encode (std::vector &output, uint32_t esi, uint8_t sbn); // id: 8-bit sbn + 24 bit esi bool encode (std::vector &output, uint32_t &id); void free (const uint8_t sbn); private: class RAPTORQ_LOCAL Locked_Encoder { public: Locked_Encoder (const Impl::Interleaver &symbols, const uint8_t SBN) :_enc (symbols, SBN) {} std::mutex _mtx; Impl::Encoder _enc; }; std::shared_ptr> _data; std::unique_ptr> interleave = nullptr; std::map> encoders; const size_t _mem; std::mutex _mtx; const uint16_t _min_subsymbol; static void precompute_block_all (Encoder *obj, const uint8_t threads); static void precompute_thread (Encoder *obj, uint8_t *sbn, const uint8_t single_sbn); }; template class RAPTORQ_API Decoder { public: // using shared pointers to avoid locking too much or // worrying about deleting used stuff. using Dec_ptr = std::shared_ptr>; // rfc 6330, pg 6 // easy explanation for OTI_* comes next. // we do NOT use bitfields as compilators are not actually forced to put // them in any particular order. meaning tey're useless. // //union OTI_Common_Data { // uint64_t raw; // struct { // uint64_t size:40; // uint8_t reserved:8; // uint16_t symbol_size:16; // }; //}; //union OTI_Scheme_Specific_Data { // uint32_t raw; // struct { // uint8_t source_blocks; // uint16_t sub_blocks; // uint8_t alignment; // }; //}; Decoder (const OTI_Common_Data common,const OTI_Scheme_Specific_Data scheme) { static_assert(std::is_same:: iterator_category>::value, "RaptorQ::Decoder needs an Input Iterator!\n"); // see the above commented bitfields for quick reference _symbol_size = static_cast (common); _sub_blocks = static_cast (scheme >> 8); _blocks = static_cast (common >> 24); assert (static_cast (scheme) <= sizeof(T) && "RaptorQ::Decoder: sizeof(T) must be <= alignment"); // (common >> 24) == total file size const uint64_t size = common >> 24; if (size > max_data) return; const uint64_t total_symbols = static_cast (ceil ( static_cast (size * sizeof(T)) / static_cast (_symbol_size))); part = Impl::Partition (total_symbols, static_cast (scheme >> 24)); //FIXME: check that the OSI and "part" agree on the data. } Decoder (uint16_t symbol_size, uint16_t sub_blocks, uint8_t blocks) :_symbol_size (symbol_size), _sub_blocks (sub_blocks), _blocks (blocks) {} uint32_t decode (InputIterator &start, const InputIterator end); uint32_t decode (InputIterator &start, const InputIterator end, const uint8_t sbn); // id: 8-bit sbn + 24 bit esi bool add_symbol (const std::vector &symbol, const uint32_t id); bool add_symbol (const std::vector &symbol, const uint32_t esi, const uint8_t sbn); void free (const uint8_t sbn); private: Impl::Partition part; uint16_t _symbol_size, _sub_blocks; uint8_t _blocks; std::map decoders; std::mutex _mtx; }; ///////////////// // // Encoder // ///////////////// template OTI_Common_Data Encoder::OTI_Common() const { OTI_Common_Data ret; // first 40 bits: data length. ret = _data->size() << 24; // 8 bits: reserved // last 16 bits: symbol size ret += _symbol_size; return ret; } template OTI_Scheme_Specific_Data Encoder::OTI_Scheme_Specific() const { OTI_Scheme_Specific_Data ret; // 8 bit: source blocks ret = interleave->blocks() << 24; // 16 bit: sub-blocks number (N) ret += interleave->sub_blocks() << 8; // 8 bit: alignment ret += sizeof(T); return ret; } template size_t Encoder::precompute_max_memory () { // give a good estimate on the amount of memory neede for the precomputation // of one block; // this will help you understand how many concurrent precomputations // you want to do :) if (interleave == nullptr) return 0; uint16_t symbols = interleave->source_symbols(0); uint16_t K_idx; for (K_idx = 0; K_idx < Impl::K_padded.size(); ++K_idx) { if (symbols < Impl::K_padded[K_idx]) break; } if (K_idx == Impl::K_padded.size()) return 0; auto S_H = Impl::S_H_W[K_idx]; uint16_t matrix_cols = Impl::K_padded[K_idx] + std::get<0> (S_H) + std::get<1> (S_H); // Rough memory estimate: Matrix A, matrix X (=> *2) and matrix D. return matrix_cols * matrix_cols * 2 + _symbol_size * matrix_cols; } template void Encoder::precompute_thread (Encoder *obj, uint8_t *sbn, const uint8_t single_sbn) { // if "sbn" pointer is NOT nullptr, than we are a thread from // from a precompute_block_all. This means that we need to update // the value of sbn as soon as we get our work. // // if sbn == nullptr, then we have been called to work on a single // sbn, and not from "precompute_block_all". // This means we work on "single_sbn", and do not touch "sbn" uint8_t *sbn_ptr = sbn; if (sbn_ptr == nullptr) sbn_ptr = const_cast (&single_sbn); // call this from a thread, precomput all block symbols while (*sbn_ptr < obj->interleave->blocks()) { obj->_mtx.lock(); if (*sbn_ptr >= obj->interleave->blocks()) { obj->_mtx.unlock(); return; } auto it = obj->encoders.find (*sbn_ptr); if (it == obj->encoders.end()) { bool success; std::tie (it, success) = obj->encoders.insert ({*sbn_ptr, std::make_shared (*obj->interleave, *sbn_ptr) }); } auto enc_ptr = it->second; bool locked = enc_ptr->_mtx.try_lock(); if (sbn != nullptr) ++(*sbn); obj->_mtx.unlock(); if (locked) { // if not locked, someone else is already waiting // on this. so don't do the same work twice. enc_ptr->_enc.generate_symbols(); enc_ptr->_mtx.unlock(); } if (sbn == nullptr) return; } } template void Encoder::precompute (const uint8_t threads, const bool background) { if (background) { std::thread t (precompute_block_all, this, threads); t.detach(); } else { return precompute_block_all (this, threads); } } template void Encoder::precompute_block_all (Encoder *obj, const uint8_t threads) { // precompute all intermediate symbols, do it with more threads. if (obj->interleave == nullptr) return; std::vector t; uint8_t spawned = threads - 1; if (spawned == 0) spawned = std::thread::hardware_concurrency(); if (spawned > 0) t.reserve (spawned); uint8_t sbn = 0; // spawn n-1 threads for (uint8_t id = 0; id < spawned; ++id) t.emplace_back (precompute_thread, obj, &sbn, 0); // do the work ourselves precompute_thread (obj, &sbn, 0); // join other threads for (uint8_t id = 0; id < spawned; ++id) t[id].join(); } template bool Encoder::encode (std::vector &output, uint32_t &id) { const uint32_t mask_8 = static_cast (std::pow (2, 8)) - 1; const uint32_t mask = ~(mask_8 << 24); return encode (output, id & mask, static_cast (id & mask_8)); } template bool Encoder::encode (std::vector &output, uint32_t esi, uint8_t sbn) { if (sbn >= interleave->blocks()) return false; _mtx.lock(); auto it = encoders.find (sbn); if (it == encoders.end()) { bool success; std::tie (it, success) = encoders.emplace (sbn, std::make_shared (*interleave, sbn)); std::thread background (precompute_thread, this, nullptr, sbn); background.detach(); } auto enc_ptr = it->second; _mtx.unlock(); if (esi >= interleave->source_symbols (sbn)) { // make sure we generated the intermediate symbols enc_ptr->_mtx.lock(); enc_ptr->_enc.generate_symbols(); enc_ptr->_mtx.unlock(); } return enc_ptr->_enc.Enc (esi, output, 0); } template void Encoder::free (const uint8_t sbn) { _mtx.lock(); auto it = encoders.find (sbn); if (it != encoders.end()) encoders.erase (it); _mtx.unlock(); } ///////////////// // // Decoder // ///////////////// template void Decoder::free (const uint8_t sbn) { _mtx.lock(); auto it = decoders.find(sbn); if (it != decoders.end()) decoders.erase(it); _mtx.unlock(); } template bool Decoder::add_symbol (const std::vector &symbol, const uint32_t id) { union extract { uint32_t raw; struct { uint8_t sbn; uint32_t esi:24; }; } extracted; extracted.raw = id; return add_symbol (symbol, extracted.esi, extracted.sbn); } template bool Decoder::add_symbol (const std::vector &symbol, const uint32_t esi, const uint8_t sbn) { if (sbn >= _blocks) return false; _mtx.lock(); auto it = decoders.find (sbn); if (it == decoders.end()) { const uint16_t symbols = sbn < part.num (0) ? part.size(0) : part.size(1); decoders.insert ({sbn, std::make_shared> ( symbols, _symbol_size)}); it = decoders.find (sbn); } auto dec = it->second; _mtx.unlock(); return dec->add_symbol (esi, symbol); } template uint32_t Decoder::decode (InputIterator &start, const InputIterator end) { // TODO: incomplete decoding bool missing = false; for (uint8_t sbn = 0; sbn < _blocks; ++sbn) { _mtx.lock(); auto it = decoders.find (sbn); if (it == decoders.end()) { missing = true; continue; } auto dec = it->second; _mtx.unlock(); if (!dec->decode()) return 0; } if (missing) return 0; uint32_t written = 0; for (uint8_t sbn = 0; sbn < _blocks; ++sbn) { _mtx.lock(); auto it = decoders.find (sbn); if (it == decoders.end()) return written; auto dec = it->second; _mtx.unlock(); Impl::De_Interleaver de_interleaving ( dec->get_symbols(), _sub_blocks); written += de_interleaving (start, end); } return written; } template uint32_t Decoder::decode (InputIterator &start, const InputIterator end, const uint8_t sbn) { if (sbn >= _blocks) return 0; _mtx.lock(); auto it = decoders.find (sbn); if (it == decoders.end()) { _mtx.unlock(); return 0; } auto dec = it->second; _mtx.unlock(); if (!dec->decode()) return 0; Impl::De_Interleaver de_interleaving (dec->get_symbols(), _sub_blocks); return de_interleaving (start, end); } } //RaptorQ #endif