Newer
Older
* Copyright (c) 2015-2018, Luca Fulchir<luca@fulchir.it>, All rights reserved.
*
* This file is part of "libRaptorQ".
*
* libRaptorQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3
* of the License, or (at your option) any later version.
*
* libRaptorQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* and a copy of the GNU Lesser General Public License
* along with libRaptorQ. If not, see <http://www.gnu.org/licenses/>.
*/
// These templates are just a wrapper around the
// functionalities offered by the RaptorQ__v1::Impl namespace
// So if you want to see what the algorithm looks like,
// you are in the wrong place
#include "RaptorQ/v1/block_sizes.hpp"
#include "RaptorQ/v1/Interleaver.hpp"
#include "RaptorQ/v1/De_Interleaver.hpp"
#include "RaptorQ/v1/Decoder.hpp"
#include "RaptorQ/v1/Encoder.hpp"
#include "RaptorQ/v1/RFC_Iterators.hpp"
#include "RaptorQ/v1/Shared_Computation/Decaying_LF.hpp"
#include "RaptorQ/v1/Thread_Pool.hpp"
#include <cassert>
#include <future>
constexpr uint64_t max_data = RFC6330_max_data; // ~881 GB
template <typename Rnd_It, typename Fwd_It>
template <typename In_It, typename Fwd_It>
class RAPTORQ_LOCAL Decoder;
} // namespace Impl
#ifdef RQ_HEADER_ONLY
template <typename Rnd_It, typename Fwd_It>
using Encoder = Impl::Encoder<Rnd_It, Fwd_It>;
template <typename Rnd_It, typename Fwd_It>
using Decoder = Impl::Decoder<Rnd_It, Fwd_It>;
Encoder() = delete;
Encoder (const Encoder&) = delete;
Encoder& operator= (const Encoder&) = delete;
Encoder (Encoder&&) = delete;
Encoder& operator= (Encoder&&) = delete;
~Encoder();
Encoder (const Rnd_It data_from, const Rnd_It data_to,
const uint16_t min_subsymbol_size,
const uint16_t symbol_size,
const size_t max_sub_block)
: _max_sub_blk (max_sub_block), _data_from (data_from),
_data_to (data_to),
_symbol_size (symbol_size),
_min_subsymbol (min_subsymbol_size),
interleave (_data_from,
_data_to,
_min_subsymbol,
_symbol_size)
{
IS_RANDOM(Rnd_It, "RFC6330__v1::Encoder");
IS_FORWARD(Fwd_It, "RFC6330__v1::Encoder");
auto _alignment = sizeof(typename
std::iterator_traits<Rnd_It>::value_type);
RQ_UNUSED(_alignment); // used only for asserts
assert(_symbol_size >= _alignment &&
"RaptorQ: symbol_size must be >= alignment");
assert((_symbol_size % _alignment) == 0 &&
"RaptorQ: symbol_size must be multiple of alignment");
assert(min_subsymbol_size >= _alignment &&
"RaptorQ: minimum subsymbol must be at least aligment");
assert(min_subsymbol_size <= _symbol_size &&
"RaptorQ: minimum subsymbol must be at most symbol_size");
assert((min_subsymbol_size % _alignment) == 0 &&
"RaptorQ: minimum subsymbol must be multiple of alignment");
assert((_symbol_size % min_subsymbol_size == 0) &&
"RaptorQ: symbol size must be multiple of subsymbol size");
// max size: ~881 GB
if (static_cast<uint64_t> (data_to - data_from) *
sizeof(typename std::iterator_traits<Rnd_It>::value_type)
> max_data) {
return;
}
_pool_notify = std::make_shared<std::condition_variable>();
_pool_mtx = std::make_shared<std::mutex>();
pool_last_reported = -1;
use_pool = true;
exiting = false;
}
It::Encoder::Block_Iterator<Rnd_It, Fwd_It> begin ()
{ return It::Encoder::Block_Iterator<Rnd_It, Fwd_It> (this, 0); }
const It::Encoder::Block_Iterator<Rnd_It, Fwd_It> end ()
{ return It::Encoder::Block_Iterator<Rnd_It, Fwd_It> (this, blocks()); }
RFC6330_OTI_Common_Data OTI_Common() const;
RFC6330_OTI_Scheme_Specific_Data OTI_Scheme_Specific() const;
std::future<std::pair<Error, uint8_t>> compute (const Compute flags);
size_t precompute_max_memory ();
size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t esi,
const uint8_t sbn);
// id: 8-bit sbn + 24 bit esi
size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t id);
size_t encode_packet (Fwd_It &output, const Fwd_It end, const uint32_t id);
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
Block_Size extended_symbols (const uint8_t sbn) const;
static void wait_threads (Encoder<Rnd_It, Fwd_It> *obj, const Compute flags,
std::promise<std::pair<Error, uint8_t>> p);
class Block_Work final : public Impl::Pool_Work {
public:
std::weak_ptr<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It,
RaptorQ__v1::Impl::with_interleaver>> work;
Work_Exit_Status do_work (RaptorQ__v1::Work_State *state) override;
~Block_Work() override;
};
class Enc {
public:
Enc (Impl::Interleaver<Rnd_It> *interleaver, const uint8_t sbn)
{
enc = std::make_shared<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It,
Fwd_It, RaptorQ__v1::Impl::with_interleaver>> (
interleaver, sbn);
reported = false;
}
std::shared_ptr<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It,
RaptorQ__v1::Impl::with_interleaver>> enc;
std::pair<Error, uint8_t> get_report (const Compute flags);
std::shared_ptr<std::condition_variable> _pool_notify;
std::shared_ptr<std::mutex> _pool_mtx;
const Rnd_It _data_from, _data_to;
const uint16_t _symbol_size;
const uint16_t _min_subsymbol;
Impl::Interleaver<Rnd_It> interleave;
bool use_pool, exiting;
int16_t pool_last_reported;
// rfc 6330, pg 6
// easy explanation for OTI_* comes next.
// we do NOT use bitfields as compilators are not actually forced to put
// them in any particular order. meaning tey're useless.
//
//union OTI_Common_Data {
// uint64_t raw;
// struct {
// uint64_t size:40;
// uint8_t reserved:8;
// uint16_t symbol_size:16;
// };
//};
//union OTI_Scheme_Specific_Data {
// uint32_t raw;
// struct {
// uint8_t source_blocks;
// uint16_t sub_blocks;
// uint8_t alignment;
// };
//};
Decoder (const Decoder&) = delete;
Decoder& operator= (const Decoder&) = delete;
Decoder (Decoder&&) = delete;
Decoder& operator= (Decoder&&) = delete;
// _size > max_data means improper initialization.
IS_INPUT(In_It, "RaptorQ__v1::Decoder");
IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder");
RFC6330_OTI_Common_Data _common =
RaptorQ__v1::Impl::Endian::b_to_h<RFC6330_OTI_Common_Data>
(common);
RFC6330_OTI_Common_Data _scheme =
RaptorQ__v1::Impl::Endian::b_to_h<RFC6330_OTI_Scheme_Specific_Data>
(scheme);
_symbol_size = static_cast<uint16_t> (_common);
_size = _common >> 24;
uint16_t tot_sub_blocks = static_cast<uint16_t> (_scheme >> 8);
_alignment = static_cast<uint8_t> (_scheme);
_blocks = static_cast<uint8_t> (_scheme >> 24);
if (_size > max_data || _size % _alignment != 0 ||
_symbol_size % _alignment != 0) {
_size = std::numeric_limits<uint64_t>::max();
return;
}
_sub_blocks = Impl::Partition (_symbol_size / _alignment,
tot_sub_blocks);
const uint64_t total_symbols = static_cast<uint64_t> (ceil (
_size / static_cast<double> (_symbol_size)));
part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
pool_last_reported = -1;
_pool_notify = std::make_shared<std::condition_variable>();
_pool_mtx = std::make_shared<std::mutex>();
use_pool = true;
exiting = false;
}
Decoder (const uint64_t size, const uint16_t symbol_size,
const uint16_t sub_blocks,
const uint8_t blocks,
const uint8_t alignment)
:_size (size), _symbol_size (symbol_size), _blocks (blocks),
_alignment(alignment)
{
// _size > max_data means improper initialization.
if (_size > max_data || _size % _alignment != 0 ||
_symbol_size % _alignment != 0) {
// really, not all the possible tests are here.
// but the RFC sucks really bad... input validation is a pain...
// please use the RAW API...
_size = std::numeric_limits<uint64_t>::max();
const uint64_t total_symbols = static_cast<uint64_t> (ceil (
_size / static_cast<double> (_symbol_size)));
_sub_blocks = Impl::Partition (_symbol_size / _alignment, sub_blocks);
part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
_pool_notify = std::make_shared<std::condition_variable>();
_pool_mtx = std::make_shared<std::mutex>();
pool_last_reported = -1;
use_pool = true;
exiting = false;
}
It::Decoder::Block_Iterator<In_It, Fwd_It> begin ()
{ return It::Decoder::Block_Iterator<In_It, Fwd_It> (this, 0); }
const It::Decoder::Block_Iterator<In_It, Fwd_It> end ()
{ return It::Decoder::Block_Iterator<In_It, Fwd_It> (this, blocks()); }
std::future<std::pair<Error, uint8_t>> compute (const Compute flags);
// if you can tell there is no more input, we can avoid locking
// forever and return an error, or if you wish we can fill
// everythin with zero, and return you the bitmask of which bytes
// we have and which we do not
std::vector<bool> end_of_input (const Fill_With_Zeros fill,
const uint8_t block);
std::vector<bool> end_of_input (const Fill_With_Zeros fill);
uint64_t decode_symbol (Fwd_It &start, const Fwd_It end, const uint16_t esi,
const uint8_t sbn);
uint64_t decode_bytes (Fwd_It &start, const Fwd_It end, const uint8_t skip);
size_t decode_block_bytes (Fwd_It &start, const Fwd_It end,
const uint8_t skip,
const uint8_t sbn);
// result in ITERATORS
// last *might* be half written depending on data alignments
Decoder_written decode_aligned (Fwd_It &start, const Fwd_It end,
Decoder_written decode_block_aligned (Fwd_It &start,
const Fwd_It end,
const uint8_t skip,
const uint8_t sbn);
// id: 8-bit sbn + 24 bit esi
Error add_symbol (In_It &start, const In_It end, const uint32_t id);
Error add_symbol (In_It &start, const In_It end, const uint32_t esi,
const uint8_t sbn);
uint8_t blocks_ready();
bool is_ready();
bool is_block_ready (const uint8_t block);
void free (const uint8_t sbn);
uint64_t bytes() const;
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
Block_Size extended_symbols (const uint8_t sbn) const;
// using shared pointers to avoid locking too much or
// worrying about deleting used stuff.
class RAPTORQ_LOCAL Block_Work final : public Impl::Pool_Work {
public:
std::weak_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> work;
std::weak_ptr<std::condition_variable> notify;
Work_Exit_Status do_work (RaptorQ__v1::Work_State *state) override;
~Block_Work() override;
};
Dec (const RaptorQ__v1::Block_Size symbols, const uint16_t symbol_size,
const uint16_t padding_symbols)
{
dec = std::make_shared<RaptorQ__v1::Impl::Raw_Decoder<In_It>> (
reported = false;
}
std::shared_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> dec;
bool reported;
};
static void wait_threads (Decoder<In_It, Fwd_It> *obj, const Compute flags,
std::promise<std::pair<Error, uint8_t>> p);
std::pair<Error, uint8_t> get_report (const Compute flags);
std::shared_ptr<std::condition_variable> _pool_notify;
uint64_t _size;
Impl::Partition part, _sub_blocks;
std::map<uint8_t, Dec> decoders;
std::mutex _mtx;
uint16_t _symbol_size;
int16_t pool_last_reported;
uint8_t _blocks, _alignment;
bool use_pool, exiting;
/////////////////
//
// Encoder
//
/////////////////
exiting = true; // stop notifying thread
std::unique_lock<std::mutex> enc_lock (_mtx);
for (auto &it : encoders) { // stop existing computations
auto ptr = it.second.enc;
if (ptr != nullptr)
ptr->stop();
}
enc_lock.unlock();
_pool_notify->notify_all();
while (pool_wait.size() != 0) {
RFC6330_OTI_Common_Data Encoder<Rnd_It, Fwd_It>::OTI_Common() const
// first 40 bits: data length.
ret = (static_cast<uint64_t> (_data_to - _data_from) *
sizeof(typename std::iterator_traits<Rnd_It>::value_type)) << 24;
// 8 bits: reserved
// last 16 bits: symbol size
ret += _symbol_size;
return RaptorQ__v1::Impl::Endian::h_to_b<RFC6330_OTI_Common_Data> (ret);
RFC6330_OTI_Scheme_Specific_Data Encoder<Rnd_It, Fwd_It>::OTI_Scheme_Specific()
const
// 8 bit: source blocks
ret = static_cast<uint32_t> (interleave.blocks()) << 24;
// 16 bit: sub-blocks number (N)
ret += static_cast<uint32_t> (interleave.sub_blocks()) << 8;
// 8 bit: alignment
ret += sizeof(typename std::iterator_traits<Rnd_It>::value_type);
return RaptorQ__v1::Impl::Endian::h_to_b<RFC6330_OTI_Scheme_Specific_Data> (
ret);
template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::precompute_max_memory ()
// give a good estimate on the amount of memory neede for the precomputation
// of one block;
// this will help you understand how many concurrent precomputations
// you want to do :)
if (!interleave)
return 0;
uint16_t symbols = interleave.source_symbols (0);
uint16_t K_idx;
for (K_idx = 0; K_idx < RaptorQ__v1::Impl::K_padded.size(); ++K_idx) {
if (symbols < RaptorQ__v1::Impl::K_padded[K_idx])
break;
}
if (K_idx == RaptorQ__v1::Impl::K_padded.size())
return 0;
auto S_H_W = RaptorQ__v1::Impl::S_H_W[K_idx];
enum Tup { S = 0, H = 1, W = 2 };
uint16_t matrix_cols = RaptorQ__v1::Impl::K_padded[K_idx] +
std::get<Tup::S> (S_H_W) +
std::get<Tup::H> (S_H_W);
// Rough memory estimate: Matrix A, matrix X (=> *2) and matrix D.
return matrix_cols * matrix_cols * 2 + _symbol_size * matrix_cols;
template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::Block_Work::~Block_Work()
{
// cleanup. have we benn called before the computation finished?
auto locked_enc = work.lock();
auto locked_notify = notify.lock();
auto locked_mtx = lock.lock();
std::unique_lock<std::mutex> p_lock (*locked_mtx);
RQ_UNUSED(p_lock);
Work_Exit_Status Encoder<Rnd_It, Fwd_It>::Block_Work::do_work (
auto locked_enc = work.lock();
auto locked_notify = notify.lock();
auto locked_mtx = lock.lock();
// encoding always works. It's one of the few constants of the universe.
if (!locked_enc->generate_symbols (state))
return Work_Exit_Status::STOPPED; // or maybe not so constant
work.reset();
std::unique_lock<std::mutex> p_lock (*locked_mtx);
RQ_UNUSED(p_lock);
locked_notify->notify_all();
}
return Work_Exit_Status::DONE;
}
template <typename Rnd_It, typename Fwd_It>
std::future<std::pair<Error, uint8_t>> Encoder<Rnd_It, Fwd_It>::compute (
using ret_t = std::pair<Error, uint8_t>;
std::promise<ret_t> p;
bool error = !interleave;
// need some flags
if (flags == Compute::NONE)
error = true;
// flag incompatibilities
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(Compute::NONE != (flags & (Compute::PARTIAL_ANY |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::PARTIAL_ANY) &&
(Compute::NONE != (flags & (Compute::PARTIAL_FROM_BEGINNING |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::COMPLETE) &&
Compute::NONE != (flags &(Compute::PARTIAL_FROM_BEGINNING |
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
error = true;
}
if (Compute::NONE != (flags & Compute::NO_POOL)) {
std::unique_lock<std::mutex> lock (_mtx);
RQ_UNUSED(lock);
if (encoders.size() != 0) {
// You can only say you won't use the pool *before* you start
// decoding something!
error = true;
} else {
use_pool = false;
p.set_value ({Error::NONE, 0});
return p.get_future();
}
}
if (error) {
p.set_value ({Error::WRONG_INPUT, 0});
return p.get_future();
}
// flags are fine, add work to pool
std::unique_lock<std::mutex> lock (_mtx);
for (uint8_t block = 0; block < blocks(); ++block) {
auto enc = encoders.find (block);
if (enc == encoders.end()) {
bool success;
std::tie (enc, success) = encoders.emplace (
std::piecewise_construct,
std::forward_as_tuple (block),
std::forward_as_tuple (&interleave, block));
assert (success == true);
std::unique_ptr<Block_Work> work = std::unique_ptr<Block_Work>(
new Block_Work());
work->work = enc->second.enc;
work->notify = _pool_notify;
Thread_Pool::get().add_work (std::move(work));
}
}
lock.unlock();
// spawn thread waiting for other thread exit.
// this way we can set_value to the future when needed.
auto future = p.get_future();
if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
wait_threads (this, flags, std::move(p));
} else {
std::unique_lock<std::mutex> pool_wait_lock (*_pool_mtx);
RQ_UNUSED(pool_wait_lock);
pool_wait.emplace_back (wait_threads, this, flags, std::move(p));
void Encoder<Rnd_It, Fwd_It>::wait_threads (Encoder<Rnd_It, Fwd_It> *obj,
const Compute flags,
std::promise<std::pair<Error, uint8_t>> p)
std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
if (obj->exiting) {
p.set_value ({Error::EXITING, 0});
break;
}
auto status = obj->get_report (flags);
if (Error::WORKING != status.first) {
p.set_value (status);
break;
}
_notify->wait (lock);
lock.unlock();
}
// delete ourselves from the waiting thread vector.
std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
if (it->get_id() == std::this_thread::get_id()) {
it->detach();
obj->pool_wait.erase (it);
break;
}
}
std::pair<Error, uint8_t> Encoder<Rnd_It, Fwd_It>::get_report (
if (encoders.size() == 0)
return {Error::WORKING, 0};
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
if (Compute::NONE != (flags & Compute::COMPLETE) ||
Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING)) {
auto it = encoders.begin();
for (; it != encoders.end(); ++it) {
auto ptr = it->second.enc;
if (ptr != nullptr) {
if (!ptr->ready()) {
if (ptr->is_stopped())
return{Error::EXITING, 0};
break;
}
}
}
if (it == encoders.end()) {
pool_last_reported = static_cast<int16_t> (encoders.size() - 1);
return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
}
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(pool_last_reported < (it->first - 1))) {
pool_last_reported = it->first - 1;
return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
}
return {Error::WORKING, 0};
}
if (Compute::NONE != (flags & Compute::PARTIAL_ANY)) {
for (auto &it : encoders) {
if (!it.second.reported) {
auto ptr = it.second.enc;
if (ptr != nullptr) {
if (ptr->ready())
return {Error::NONE, it.first};
if (ptr->is_stopped())
return{Error::EXITING, 0};
}
}
}
}
return {Error::WORKING, 0}; // should never be reached
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h<uint32_t> (id);
constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
return encode (output, end, host_id & mask,
static_cast<uint8_t> (host_id >> 24));
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
const uint32_t syms = this->symbols (sbn);
const uint32_t padding = static_cast<uint16_t>(this->extended_symbols (sbn))
- syms;
const uint32_t real_esi = esi < syms ? esi : esi + padding;
std::unique_lock<std::mutex> lock (_mtx);
auto it = encoders.find (sbn);
if (use_pool) {
if (it == encoders.end())
return 0;
auto shared_enc = it->second.enc;
if (!shared_enc->ready())
return 0;
lock.unlock();
} else {
if (it == encoders.end()) {
bool success;
std::tie (it, success) = encoders.emplace (std::make_pair (sbn,
Enc (&interleave, sbn)));
auto shared_enc = it->second.enc;
lock.unlock();
RaptorQ__v1::Work_State state =
RaptorQ__v1::Work_State::KEEP_WORKING;
shared_enc->generate_symbols (&state);
} else {
auto shared_enc = it->second.enc;
lock.unlock();
if (!shared_enc->ready())
return 0;
template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::encode_packet (Fwd_It &output, const Fwd_It end,
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
{
// RFC packet, section 4.4.2 page 11
// return the size of the packet in BYTES
using T = typename std::iterator_traits<Fwd_It>::value_type;
// each packet has an header of 32 bits. We can not start writing the
// encoded symbols in the middle of an iterator yet.
if (sizeof(T) > sizeof(uint32_t) || sizeof(T) == 3) {
assert (false && "libRaptorQ: sorry, encde_packets can only be used "
"with types of at most 32 bits for now\n");
return 0;
}
// first of all, check if we have enough space
size_t max_pkt_len;
if (std::is_same<typename std::iterator_traits<Fwd_It>::iterator_category,
std::random_access_iterator_tag>::value) {
// we were lucky with a random iterator.
max_pkt_len = sizeof(T) * (end - output);
} else {
max_pkt_len = 0;
auto out_copy = output;
while (out_copy != end) {
max_pkt_len += sizeof(T);
++out_copy;
}
}
if (max_pkt_len <= sizeof(uint32_t))
return 0; // we can only write the header, or not even that.
max_pkt_len -= sizeof(uint32_t);
constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h<uint32_t> (id);
const uint8_t sbn = host_id >> 24;
uint32_t symbol = host_id & mask;
const uint32_t source_symbols = symbols (sbn);
const bool only_source = (symbol >= source_symbols);
size_t symbols_to_write = 0;
while (max_pkt_len >= _symbol_size) {
if (only_source && symbol >= source_symbols)
break;
uint32_t real_symbol_size;
if (!only_source) {
real_symbol_size = _symbol_size;
} else if (sbn == (blocks() - 1) && symbol == source_symbols - 1) {
// the last symbol the last block can have less bytes than
// a full symbol, and we do not need to send the padding bytes.
real_symbol_size = block_size (sbn) % _symbol_size;
if (real_symbol_size == 0)
real_symbol_size = _symbol_size;
}
if (max_pkt_len <= real_symbol_size)
break;
max_pkt_len -= real_symbol_size;
++symbol;
++symbols_to_write;
}
if (symbols_to_write == 0)
return 0;
// ok, now we can finally start writing something.
// write the header
const uint8_t *p = reinterpret_cast<const uint8_t*> (&id);
uint8_t *p_out = reinterpret_cast<uint8_t*> (&*output);
// manual loop unrolling ftw
*(p_out++) = *(p++);
if (sizeof(T) == 1) {
++output;
p_out = reinterpret_cast<uint8_t*> (&*output);
}
*(p_out++) = *(p++);
if (sizeof(T) == 2 || sizeof(T) == 1) {
++output;
p_out = reinterpret_cast<uint8_t*> (&*output);
}
*(p_out++) = *(p++);
if (sizeof(T) == 1) {
++output;
p_out = reinterpret_cast<uint8_t*> (&*output);
}
*(p_out++) = *(p++);
// sizeof(T) can only be 1,2,4, so we can safely just increment output here
++output;
// FINALLY write the symbols
size_t written = sizeof(uint32_t);
symbol = static_cast<uint8_t> (id >> 24);
while (symbols_to_write > 0) {
size_t tmp_written = encode (output, end, symbol, sbn);
if (tmp_written == 0)
return written == sizeof(uint32_t) ? 0 : written;
++symbol;
}
return written;
}
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::free (const uint8_t sbn)
std::unique_lock<std::mutex> lock (_mtx);
RQ_UNUSED(lock);
auto it = encoders.find (sbn);
if (it != encoders.end())
encoders.erase (it);
template <typename Rnd_It, typename Fwd_It>
uint8_t Encoder<Rnd_It, Fwd_It>::blocks() const
if (!interleave)
return 0;
return interleave.blocks();
template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::block_size (const uint8_t sbn) const
if (!interleave)
return 0;
return interleave.source_symbols (sbn) * interleave.symbol_size();
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbol_size() const
if (!interleave)
return 0;
return interleave.symbol_size();
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbols (const uint8_t sbn) const
if (!interleave)
return 0;
return interleave.source_symbols (sbn);
template <typename Rnd_It, typename Fwd_It>
Block_Size Encoder<Rnd_It, Fwd_It>::extended_symbols (const uint8_t sbn) const
{
if (!interleave)
// outside of the enum, but you should have checked the
// initialization anyway. not relly nice either way
return static_cast<Block_Size> (0);
template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::max_repair (const uint8_t sbn) const
if (!interleave)
return 0;
return static_cast<uint32_t> (std::pow (2, 20)) -
/////////////////
//
// Decoder
//
/////////////////
template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::~Decoder()
{
for (auto &it : decoders) { // stop existing computations
auto ptr = it.second.dec;
if (ptr != nullptr)
ptr->stop();
}
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::free (const uint8_t sbn)
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h (id);
const uint32_t esi = host_id & mask;
const uint8_t sbn = host_id >> 24;
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
if (!operator bool())
return Error::INITIALIZATION;
const uint16_t syms = this->symbols (sbn);
const Block_Size b_size = this->extended_symbols (sbn);
// we might have padding symbols. add thse to the esi.
const uint16_t padding = static_cast<uint16_t> (b_size) - syms;
const uint32_t real_esi = esi < syms ? esi : esi + padding;
bool added_decoder = false;
std::unique_lock<std::mutex> lock (_mtx);
auto it = decoders.find (sbn);
if (it == decoders.end()) {
bool success;
std::tie (it, success) = decoders.emplace (std::make_pair(sbn,
// the last symbol in a block can have less size than the symbol size,
// in which case we should add padding
const bool add_padding = (esi == syms);
auto err = dec->add_symbol (start, end, real_esi, add_padding);
if (err != Error::NONE)
return err;
// automatically add work to pool if we use it and have enough data
std::unique_lock<std::mutex> pool_lock (*_pool_mtx);
RQ_UNUSED(pool_lock);
if (use_pool && dec->can_decode()) {
bool add_work = dec->add_concurrent (max_block_decoder_concurrency);
if (add_work) {
std::unique_ptr<Block_Work> work = std::unique_ptr<Block_Work>(
new Block_Work());
work->work = dec;
work->notify = _pool_notify;