Newer
Older
* Copyright (c) 2015-2016, Luca Fulchir<luca@fulchir.it>, All rights reserved.
*
* This file is part of "libRaptorQ".
*
* libRaptorQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3
* of the License, or (at your option) any later version.
*
* libRaptorQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* and a copy of the GNU Lesser General Public License
* along with libRaptorQ. If not, see <http://www.gnu.org/licenses/>.
*/
/////////////////////
//
// These templates are just a wrapper around the
// So if you want to see what the algorithm looks like,
// you are in the wrong place
//
/////////////////////
#include "Interleaver.hpp"
#include "De_Interleaver.hpp"
#include "Encoder.hpp"
#include "Decoder.hpp"
#include "Shared_Computation/Decaying_LF.hpp"
#include <cassert>
#include <future>
Symbol (Encoder<Rnd_It, Fwd_It> *enc, const uint32_t esi, const uint8_t sbn)
uint64_t operator() (Fwd_It &start, const Fwd_It end)
{
uint32_t ret = _sbn;
ret <<= 24;
public std::iterator<std::input_iterator_tag, Symbol<Rnd_It, Fwd_It>>
Symbol_Iterator (Encoder<Rnd_It, Fwd_It> *enc, const uint32_t esi,
Symbol_Iterator<Rnd_It, Fwd_It> ret (_esi + i, _sbn);
bool operator== (const Symbol_Iterator<Rnd_It, Fwd_It> &it) const
bool operator!= (const Symbol_Iterator<Rnd_It, Fwd_It> &it) const
return it._esi != _esi || it._sbn != _sbn;
}
private:
Block (Encoder<Rnd_It, Fwd_It> *enc, const uint16_t symbols,
Symbol_Iterator<Rnd_It, Fwd_It> begin_source() const
return Symbol_Iterator<Rnd_It, Fwd_It> (_enc, 0, _sbn);
Symbol_Iterator<Rnd_It, Fwd_It> end_source() const
return Symbol_Iterator<Rnd_It, Fwd_It> (_enc, _symbols, _sbn);
Symbol_Iterator<Rnd_It, Fwd_It> begin_repair() const
return Symbol_Iterator<Rnd_It, Fwd_It> (_enc, _symbols, _sbn);
Symbol_Iterator<Rnd_It, Fwd_It> end_repair (const uint32_t max_repair)
return Symbol_Iterator<Rnd_It, Fwd_It> (_enc, _symbols + max_r,_sbn);
uint16_t symbols () const
{
return _enc->symbols (_sbn);
}
uint32_t block_size () const
{
return _enc->block_size (_sbn);
}
public std::iterator<std::input_iterator_tag, Block<Rnd_It, Fwd_It>>
Block_Iterator (Encoder<Rnd_It, Fwd_It> *enc, const Impl::Partition part,
return Block<Rnd_It, Fwd_It> (_enc, _part.size (0), _sbn);
return Block<Rnd_It, Fwd_It> (_enc, _part.size (1), _sbn);
Block_Iterator ret = *this;
ret._sbn += i;
return ret;
}
bool operator== (const Block_Iterator &it) const
bool operator!= (const Block_Iterator &it) const
////////////////////
//// Free Functions
////////////////////
uint64_t shared_cache_size (const uint64_t shared_cache);
bool local_cache_size (const uint64_t local_cache);
uint64_t get_shared_cache_size();
uint64_t get_local_cache_size();
static const uint64_t max_data = 946270874880; // ~881 GB
typedef uint64_t OTI_Common_Data;
typedef uint32_t OTI_Scheme_Specific_Data;
bool RAPTORQ_API set_thread_pool (const size_t threads,
const uint16_t max_block_concurrency,
const Work_State exit_type);
namespace Impl {
// maximum times a single block can be decoded at the same time.
// the decoder can be launched multiple times with different combinations
// of repair symbols. This can be useful as the decoding is actually
// probabilistic, and dropping a set of repair symbols *MIGHT* make things
// decodable again.
// keep this low. 1, 2, 3 should be ok.
static uint16_t max_block_decoder_concurrency = 1;
}
const uint16_t min_subsymbol_size,
const uint16_t symbol_size,
const size_t max_memory)
_min_subsymbol (min_subsymbol_size),
interleave (_data_from,
_data_to,
_min_subsymbol,
_mem,
_symbol_size)
IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder");
IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder");
auto _alignment = sizeof(typename
std::iterator_traits<Rnd_It>::value_type);
assert(_symbol_size >= _alignment &&
"RaptorQ: symbol_size must be >= alignment");
assert((_symbol_size % _alignment) == 0 &&
"RaptorQ: symbol_size must be multiple of alignment");
assert(min_subsymbol_size >= _alignment &&
"RaptorQ: minimum subsymbol must be at least aligment");
assert(min_subsymbol_size <= _symbol_size &&
"RaptorQ: minimum subsymbol must be at most symbol_size");
assert((min_subsymbol_size % _alignment) == 0 &&
"RaptorQ: minimum subsymbol must be multiple of alignment");
assert((_symbol_size % min_subsymbol_size == 0) &&
"RaptorQ: symbol size must be multiple of subsymbol size");
if (static_cast<uint64_t> (data_to - data_from) *
sizeof(typename std::iterator_traits<Rnd_It>::value_type)
> max_data) {
pool_lock = std::make_shared<std::pair<std::mutex,
std::condition_variable>> ();
pool_last_reported = -1;
use_pool = true;
exiting = false;
auto part = interleave.get_partition();
return Block_Iterator<Rnd_It, Fwd_It> (this, part,
operator bool() const { return interleave; }
OTI_Common_Data OTI_Common() const;
OTI_Scheme_Specific_Data OTI_Scheme_Specific() const;
// TODO: introduce memory limits on threading ?
std::future<std::pair<Error, uint8_t>> compute (const Compute flags);
uint64_t encode (Fwd_It &output, const Fwd_It end, const uint32_t esi,
uint64_t encode (Fwd_It &output, const Fwd_It end, const uint32_t &id);
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
uint32_t max_repair (const uint8_t sbn) const;
static void wait_threads (Encoder<Rnd_It, Fwd_It> *obj, const Compute flags,
std::promise<std::pair<Error, uint8_t>> p);
class Block_Work : public Impl::Pool_Work {
public:
std::weak_ptr<Impl::Encoder<Rnd_It, Fwd_It>> work;
std::weak_ptr<std::pair<std::mutex, std::condition_variable>> notify;
Work_Exit_Status do_work (Work_State *state) override;
~Block_Work() override {}
};
// TODO: tagged pointer
class Enc {
Enc (const Impl::Interleaver<Rnd_It> &interleaver, const uint8_t sbn)
{
enc = std::make_shared<Impl::Encoder<Rnd_It, Fwd_It>> (interleaver,
sbn);
reported = false;
}
std::shared_ptr<Impl::Encoder<Rnd_It, Fwd_It>> enc;
bool reported;
std::pair<Error, uint8_t> get_report (const Compute flags);
std::shared_ptr<std::pair<std::mutex, std::condition_variable>> pool_lock;
std::deque<std::thread> pool_wait;
std::map<uint8_t, Enc> encoders;
const size_t _mem;
const Rnd_It _data_from, _data_to;
const uint16_t _symbol_size;
const Impl::Interleaver<Rnd_It> interleave;
bool use_pool, exiting;
int16_t pool_last_reported;
// rfc 6330, pg 6
// easy explanation for OTI_* comes next.
// we do NOT use bitfields as compilators are not actually forced to put
// them in any particular order. meaning tey're useless.
//
//union OTI_Common_Data {
// uint64_t raw;
// struct {
// uint64_t size:40;
// uint8_t reserved:8;
// uint16_t symbol_size:16;
// };
//};
//union OTI_Scheme_Specific_Data {
// uint32_t raw;
// struct {
// uint8_t source_blocks;
// uint16_t sub_blocks;
// uint8_t alignment;
// };
//};
Decoder (const OTI_Common_Data common,const OTI_Scheme_Specific_Data scheme)
IS_INPUT(In_It, "RaptorQ__v1::Decoder");
IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder");
// see the above commented bitfields for quick reference
_symbol_size = static_cast<uint16_t> (common);
uint16_t tot_sub_blocks = static_cast<uint16_t> (scheme >> 8);
_alignment = static_cast<uint8_t> (scheme);
_sub_blocks = Impl::Partition (_symbol_size /
static_cast<uint8_t> (scheme),
tot_sub_blocks);
_blocks = static_cast<uint8_t> (scheme >> 24);
return;
const uint64_t total_symbols = static_cast<uint64_t> (ceil (
_size / static_cast<double> (_symbol_size)));
part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
pool_lock = std::make_shared<std::pair<std::mutex,
std::condition_variable>> ();
pool_last_reported = -1;
use_pool = true;
exiting = false;
Decoder (const uint64_t size, const uint16_t symbol_size,
const uint16_t sub_blocks,
const uint8_t blocks,
const uint8_t alignment)
:_size (size), _symbol_size (symbol_size), _blocks (blocks),
_alignment(alignment)
return;
const uint64_t total_symbols = static_cast<uint64_t> (ceil (
_size / static_cast<double> (_symbol_size)));
_sub_blocks = Impl::Partition (_symbol_size / _alignment, sub_blocks);
part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
pool_last_reported = -1;
use_pool = true;
exiting = false;
std::future<std::pair<Error, uint8_t>> compute (const Compute flags);
// result in BYTES
uint64_t decode_bytes (Fwd_It &start, const Fwd_It end, const uint8_t skip);
uint64_t decode_block_bytes (Fwd_It &start, const Fwd_It end,
const uint8_t skip,
const uint8_t sbn);
// result in ITERATORS
// last *might* be half written depending on data alignments
std::pair<size_t, uint8_t> decode_aligned (Fwd_It &start, const Fwd_It end,
const uint8_t skip);
std::pair<size_t, uint8_t> decode_block_aligned (Fwd_It &start,
const Fwd_It end,
const uint8_t skip,
const uint8_t sbn);
Error add_symbol (In_It &start, const In_It end, const uint32_t id);
Error add_symbol (In_It &start, const In_It end, const uint32_t esi,
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
// using shared pointers to avoid locking too much or
// worrying about deleting used stuff.
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
class Block_Work : public Impl::Pool_Work {
public:
std::weak_ptr<Impl::Decoder<In_It>> work;
std::weak_ptr<std::pair<std::mutex, std::condition_variable>> notify;
Work_Exit_Status do_work (Work_State *state) override;
~Block_Work() override {}
};
// TODO: tagged pointer
class Dec {
public:
Dec (const uint16_t symbols, const uint16_t symbol_size)
{
dec = std::make_shared<Impl::Decoder<In_It>> (symbols, symbol_size);
reported = false;
}
std::shared_ptr<Impl::Decoder<In_It>> dec;
bool reported;
};
static void wait_threads (Decoder<In_It, Fwd_It> *obj, const Compute flags,
std::promise<std::pair<Error, uint8_t>> p);
std::pair<Error, uint8_t> get_report (const Compute flags);
std::shared_ptr<std::pair<std::mutex, std::condition_variable>> pool_lock;
std::deque<std::thread> pool_wait;
std::map<uint8_t, Dec> decoders;
std::mutex _mtx;
/////////////////
//
// Encoder
//
/////////////////
exiting = true; // stop notifying thread
pool_lock->second.notify_all();
for (auto &it : encoders) { // stop existing computations
auto ptr = it.second.enc;
if (ptr != nullptr)
ptr->stop();
}
template <typename Rnd_It, typename Fwd_It>
OTI_Common_Data Encoder<Rnd_It, Fwd_It>::OTI_Common() const
ret = (static_cast<uint64_t> (_data_to - _data_from) *
sizeof(typename std::iterator_traits<Rnd_It>::value_type)) << 24;
template <typename Rnd_It, typename Fwd_It>
OTI_Scheme_Specific_Data Encoder<Rnd_It, Fwd_It>::OTI_Scheme_Specific() const
ret = static_cast<uint32_t> (interleave.blocks()) << 24;
ret += static_cast<uint32_t> (interleave.sub_blocks()) << 8;
ret += sizeof(typename std::iterator_traits<Rnd_It>::value_type);
template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::precompute_max_memory ()
{
// give a good estimate on the amount of memory neede for the precomputation
// of one block;
// this will help you understand how many concurrent precomputations
// you want to do :)
uint16_t symbols = interleave.source_symbols (0);
uint16_t K_idx;
for (K_idx = 0; K_idx < Impl::K_padded.size(); ++K_idx) {
if (symbols < Impl::K_padded[K_idx])
break;
}
if (K_idx == Impl::K_padded.size())
return 0;
auto S_H = Impl::S_H_W[K_idx];
uint16_t matrix_cols = Impl::K_padded[K_idx] + std::get<0> (S_H) +
std::get<1> (S_H);
// Rough memory estimate: Matrix A, matrix X (=> *2) and matrix D.
return matrix_cols * matrix_cols * 2 + _symbol_size * matrix_cols;
Work_Exit_Status Encoder<Rnd_It, Fwd_It>::Block_Work::do_work (
Work_State *state)
auto locked_enc = work.lock();
auto locked_notify = notify.lock();
if (locked_enc != nullptr && locked_notify != nullptr) {
// encoding always works. It's one of the few constants of the universe.
if (!locked_enc->generate_symbols (state))
return Work_Exit_Status::STOPPED; // only explanation.
std::lock_guard<std::mutex> mtx (locked_notify->first);
UNUSED(mtx);
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
locked_notify->second.notify_one();
}
return Work_Exit_Status::DONE;
}
template <typename Rnd_It, typename Fwd_It>
std::future<std::pair<Error, uint8_t>> Encoder<Rnd_It, Fwd_It>::compute (
const Compute flags)
{
using ret_t = std::pair<Error, uint8_t>;
std::promise<ret_t> p;
bool error = !interleave;
// need some flags
if (flags == Compute::NONE)
error = true;
// flag incompatibilities
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(Compute::NONE != (flags & (Compute::PARTIAL_ANY |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::PARTIAL_ANY) &&
(Compute::NONE != (flags & (Compute::PARTIAL_FROM_BEGINNING |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::COMPLETE) &&
Compute::NONE != (flags &(Compute::PARTIAL_FROM_BEGINNING |
Compute::PARTIAL_ANY |
Compute::NO_POOL))) {
error = true;
}
if (Compute::NONE != (flags & Compute::NO_POOL)) {
std::unique_lock<std::mutex> lock (_mtx);
UNUSED(lock);
if (encoders.size() != 0) {
// You can only say you won't use the pool *before* you start
// decoding something!
error = true;
} else {
use_pool = false;
p.set_value ({Error::NONE, 0});
return p.get_future();
}
if (error) {
p.set_value ({Error::WRONG_INPUT, 0});
return p.get_future();
}
// flags are fine, add work to pool
std::unique_lock<std::mutex> lock (_mtx);
for (uint8_t block = 0; block < blocks(); ++block) {
auto enc = encoders.find (block);
if (enc == encoders.end()) {
std::tie (enc, success) = encoders.emplace (
std::piecewise_construct,
std::forward_as_tuple (block),
std::forward_as_tuple (interleave, block));
assert (success == true);
std::unique_ptr<Block_Work> work = std::unique_ptr<Block_Work>(
new Block_Work());
work->work = enc->second.enc;
work->notify = pool_lock;
Impl::Thread_Pool::get().add_work (std::move(work));
}
}
lock.unlock();
// spawn thread waiting for other thread exit.
// this way we can set_value to the future when needed.
auto future = p.get_future();
if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
wait_threads (this, flags, std::move(p));
} else {
std::unique_lock<std::mutex> pool_wait_lock (_mtx);
UNUSED(pool_wait_lock);
pool_wait.emplace_back(wait_threads, this, flags, std::move(p));
void Encoder<Rnd_It, Fwd_It>::wait_threads (Encoder<Rnd_It, Fwd_It> *obj,
const Compute flags,
std::promise<std::pair<Error, uint8_t>> p)
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
do {
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
// pool is global (static), so wait only for our stuff.
std::unique_lock<std::mutex> lock (obj->pool_lock->first);
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
auto status = obj->get_report (flags);
if (status.first != Error::WORKING) {
p.set_value (status);
break;
}
obj->pool_lock->second.wait (lock); // conditional wait
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
status = obj->get_report (flags);
lock.unlock(); // unlock
if (status.first != Error::WORKING) {
p.set_value (status);
break;
}
} while (true);
// delete ourselves from the waiting thread vector.
std::unique_lock<std::mutex> lock (obj->_mtx);
UNUSED (lock);
for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
if (it->get_id() == std::this_thread::get_id()) {
it->detach();
obj->pool_wait.erase (it);
std::pair<Error, uint8_t> Encoder<Rnd_It, Fwd_It>::get_report (
const Compute flags)
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
if (Compute::NONE != (flags & Compute::COMPLETE) ||
Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING)) {
auto it = encoders.begin();
for (; it != encoders.end(); ++it) {
auto ptr = it->second.enc;
if (ptr != nullptr && !ptr->ready())
break;
}
if (it == encoders.end()) {
pool_last_reported = static_cast<int16_t> (encoders.size() - 1);
return {Error::NONE, pool_last_reported};
}
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(pool_last_reported < (it->first - 1))) {
pool_last_reported = it->first - 1;
return {Error::NONE, pool_last_reported};
}
return {Error::WORKING, 0};
}
if (Compute::NONE != (flags & Compute::PARTIAL_ANY)) {
for (auto &it : encoders) {
if (!it.second.reported) {
auto ptr = it.second.enc;
if (ptr != nullptr && ptr->ready()) {
return {Error::NONE, it.first};
}
}
}
}
return {Error::WORKING, 0}; // should never be reached
template <typename Rnd_It, typename Fwd_It>
uint64_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
const uint32_t mask_8 = static_cast<uint32_t> (std::pow (2, 8)) - 1;
const uint32_t mask = ~(mask_8 << 24);
return encode (output, end, id & mask, static_cast<uint8_t> (id & mask_8));
template <typename Rnd_It, typename Fwd_It>
uint64_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
std::unique_lock<std::mutex> lock (_mtx);
if (use_pool) {
if (it == encoders.end())
return 0;
auto shared_enc = it->second.enc;
if (!shared_enc->ready())
return 0;
lock.unlock();
return shared_enc->Enc (esi, output, end);
} else {
if (it == encoders.end()) {
bool success;
std::tie (it, success) = encoders.emplace (std::make_pair (sbn,
Enc (interleave, sbn)));
auto shared_enc = it->second.enc;
lock.unlock();
Work_State state = Work_State::KEEP_WORKING;
shared_enc->generate_symbols (&state);
return shared_enc->Enc (esi, output, end);
} else {
auto shared_enc = it->second.enc;
lock.unlock();
if (!shared_enc->ready())
return 0;
return shared_enc->Enc (esi, output, end);
}
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::free (const uint8_t sbn)
std::unique_lock<std::mutex> lock (_mtx);
UNUSED(lock);
auto it = encoders.find (sbn);
if (it != encoders.end())
encoders.erase (it);
}
template <typename Rnd_It, typename Fwd_It>
uint8_t Encoder<Rnd_It, Fwd_It>::blocks() const
template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::block_size (const uint8_t sbn) const
return interleave.source_symbols (sbn) * interleave.symbol_size();
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbol_size() const
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbols (const uint8_t sbn) const
return interleave.source_symbols (sbn);
template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::max_repair (const uint8_t sbn) const
/////////////////
//
// Decoder
//
/////////////////
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::free (const uint8_t sbn)
{
_mtx.lock();
auto it = decoders.find(sbn);
if (it != decoders.end())
decoders.erase(it);
_mtx.unlock();
}
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
const uint32_t esi, const uint8_t sbn)
std::unique_lock<std::mutex> lock (_mtx);
auto it = decoders.find (sbn);
if (it == decoders.end()) {
const uint16_t symbols = sbn < part.num (0) ?
part.size(0) : part.size(1);
bool success;
std::tie (it, success) = decoders.emplace (std::make_pair(sbn,
Dec (symbols, _symbol_size)));
assert (success);
auto dec = it->second.dec;
lock.unlock();
if (err != Error::NONE)
return err;
// automatically add work to pool if we use it and have enough data
lock.lock();
if (use_pool && dec->can_decode()) {
bool add_work = dec->add_concurrent (
Impl::max_block_decoder_concurrency);
if (add_work) {
std::unique_ptr<Block_Work> work = std::unique_ptr<Block_Work>(
new Block_Work());
work->work = dec;
work->notify = pool_lock;
Impl::Thread_Pool::get().add_work (std::move(work));
}
}
return Error::NONE;
}
template <typename In_It, typename Fwd_It>
Work_Exit_Status Decoder<In_It, Fwd_It>::Block_Work::do_work (
Work_State *state)
{
auto locked_dec = work.lock();
auto locked_notify = notify.lock();
if (locked_dec != nullptr && locked_notify != nullptr) {
// initialize, do not lock yet
std::unique_lock<std::mutex> locked_guard (locked_notify->first,
std::defer_lock);
case Impl::Decoder<In_It>::Decoder_Result::DECODED:
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wattributes"
case Impl::Decoder<In_It>::Decoder_Result::NEED_DATA:
locked_dec->drop_concurrent();
return Work_Exit_Status::DONE;
case Impl::Decoder<In_It>::Decoder_Result::STOPPED:
return Work_Exit_Status::STOPPED;
case Impl::Decoder<In_It>::Decoder_Result::CAN_RETRY:
return Work_Exit_Status::REQUEUE;
}
}
return Work_Exit_Status::DONE;
std::future<std::pair<Error, uint8_t>> Decoder<In_It, Fwd_It>::compute (
const Compute flags)
using ret_t = std::pair<Error, uint8_t>;
std::promise<ret_t> p;
bool error = false;
// need some flags
if (flags == Compute::NONE)
error = true;
// flag incompatibilities
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(Compute::NONE != (flags & (Compute::PARTIAL_ANY |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::PARTIAL_ANY) &&
(Compute::NONE != (flags & (Compute::PARTIAL_FROM_BEGINNING |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::COMPLETE) &&
Compute::NONE != (flags &(Compute::PARTIAL_FROM_BEGINNING |
Compute::PARTIAL_ANY |
Compute::NO_POOL))) {
error = true;
if (Compute::NONE != (flags & Compute::NO_POOL)) {
std::unique_lock<std::mutex> lock (_mtx);
UNUSED(lock);
if (decoders.size() != 0) {
// You can only say you won't use the pool *before* you start
// decoding something!
error = true;
} else {
use_pool = false;
p.set_value ({Error::NONE, 0});
return p.get_future();
}
}