Newer
Older
* Copyright (c) 2015-2016, Luca Fulchir<luca@fulchir.it>, All rights reserved.
*
* This file is part of "libRaptorQ".
*
* libRaptorQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3
* of the License, or (at your option) any later version.
*
* libRaptorQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* and a copy of the GNU Lesser General Public License
* along with libRaptorQ. If not, see <http://www.gnu.org/licenses/>.
*/
// These templates are just a wrapper around the
// functionalities offered by the RaptorQ__v1::Impl namespace
// So if you want to see what the algorithm looks like,
// you are in the wrong place
#include "RaptorQ/v1/block_sizes.hpp"
#include "RaptorQ/v1/Interleaver.hpp"
#include "RaptorQ/v1/De_Interleaver.hpp"
#include "RaptorQ/v1/Decoder.hpp"
#include "RaptorQ/v1/Encoder.hpp"
#include "RaptorQ/v1/Shared_Computation/Decaying_LF.hpp"
#include "RaptorQ/v1/Thread_Pool.hpp"
#include <cassert>
#include <future>
typedef uint64_t RQ_OTI_Common_Data;
typedef uint32_t RQ_OTI_Scheme_Specific_Data;
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
~Encoder();
Encoder (const Rnd_It data_from, const Rnd_It data_to,
const uint16_t min_subsymbol_size,
const uint16_t symbol_size,
const size_t max_memory)
: _mem (max_memory), _data_from (data_from), _data_to (data_to),
_symbol_size (symbol_size),
_min_subsymbol (min_subsymbol_size),
interleave (_data_from,
_data_to,
_min_subsymbol,
_mem,
_symbol_size)
{
IS_RANDOM(Rnd_It, "RFC6330__v1::Encoder");
IS_FORWARD(Fwd_It, "RFC6330__v1::Encoder");
auto _alignment = sizeof(typename
std::iterator_traits<Rnd_It>::value_type);
RQ_UNUSED(_alignment); // used only for asserts
assert(_symbol_size >= _alignment &&
"RaptorQ: symbol_size must be >= alignment");
assert((_symbol_size % _alignment) == 0 &&
"RaptorQ: symbol_size must be multiple of alignment");
assert(min_subsymbol_size >= _alignment &&
"RaptorQ: minimum subsymbol must be at least aligment");
assert(min_subsymbol_size <= _symbol_size &&
"RaptorQ: minimum subsymbol must be at most symbol_size");
assert((min_subsymbol_size % _alignment) == 0 &&
"RaptorQ: minimum subsymbol must be multiple of alignment");
assert((_symbol_size % min_subsymbol_size == 0) &&
"RaptorQ: symbol size must be multiple of subsymbol size");
// max size: ~881 GB
if (static_cast<uint64_t> (data_to - data_from) *
sizeof(typename std::iterator_traits<Rnd_It>::value_type)
> max_data) {
return;
}
_pool_notify = std::make_shared<std::condition_variable>();
_pool_mtx = std::make_shared<std::mutex>();
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
pool_last_reported = -1;
use_pool = true;
exiting = false;
}
Block_Iterator<Rnd_It, Fwd_It> begin ()
{
return Block_Iterator<Rnd_It, Fwd_It> (this,
interleave.get_partition(), 0);
}
const Block_Iterator<Rnd_It, Fwd_It> end ()
{
auto part = interleave.get_partition();
return Block_Iterator<Rnd_It, Fwd_It> (this, part,
static_cast<uint8_t> (part.num(0) + part.num(1)));
}
operator bool() const { return interleave; }
RQ_OTI_Common_Data OTI_Common() const;
RQ_OTI_Scheme_Specific_Data OTI_Scheme_Specific() const;
// TODO: introduce memory limits on threading ?
std::future<std::pair<Error, uint8_t>> compute (const Compute flags);
size_t precompute_max_memory ();
size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t esi,
const uint8_t sbn);
// id: 8-bit sbn + 24 bit esi
size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t &id);
void free (const uint8_t sbn);
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
uint32_t max_repair (const uint8_t sbn) const;
static void wait_threads (Encoder<Rnd_It, Fwd_It> *obj, const Compute flags,
std::promise<std::pair<Error, uint8_t>> p);
class Block_Work final : public Impl::Pool_Work {
public:
std::weak_ptr<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It,
RaptorQ__v1::Impl::with_interleaver>> work;
Work_Exit_Status do_work (RaptorQ__v1::Work_State *state) override;
~Block_Work() override;
};
// TODO: tagged pointer
class Enc {
public:
Enc (Impl::Interleaver<Rnd_It> *interleaver, const uint8_t sbn)
{
enc = std::make_shared<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It,
Fwd_It, RaptorQ__v1::Impl::with_interleaver>> (
interleaver, sbn);
reported = false;
}
std::shared_ptr<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It,
RaptorQ__v1::Impl::with_interleaver>> enc;
std::pair<Error, uint8_t> get_report (const Compute flags);
std::shared_ptr<std::condition_variable> _pool_notify;
std::shared_ptr<std::mutex> _pool_mtx;
const size_t _mem;
const Rnd_It _data_from, _data_to;
const uint16_t _symbol_size;
const uint16_t _min_subsymbol;
Impl::Interleaver<Rnd_It> interleave;
bool use_pool, exiting;
int16_t pool_last_reported;
// rfc 6330, pg 6
// easy explanation for OTI_* comes next.
// we do NOT use bitfields as compilators are not actually forced to put
// them in any particular order. meaning tey're useless.
//
//union OTI_Common_Data {
// uint64_t raw;
// struct {
// uint64_t size:40;
// uint8_t reserved:8;
// uint16_t symbol_size:16;
// };
//};
//union OTI_Scheme_Specific_Data {
// uint32_t raw;
// struct {
// uint8_t source_blocks;
// uint16_t sub_blocks;
// uint8_t alignment;
// };
//};
~Decoder();
Decoder (const RQ_OTI_Common_Data common,
const RQ_OTI_Scheme_Specific_Data scheme)
{
IS_INPUT(In_It, "RaptorQ__v1::Decoder");
IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder");
// see the above commented bitfields for quick reference
_symbol_size = static_cast<uint16_t> (common);
uint16_t tot_sub_blocks = static_cast<uint16_t> (scheme >> 8);
_alignment = static_cast<uint8_t> (scheme);
_sub_blocks = Impl::Partition (_symbol_size /
static_cast<uint8_t> (scheme),
tot_sub_blocks);
_blocks = static_cast<uint8_t> (scheme >> 24);
_size = common >> 24;
// (common >> 24) == total file size
if (_size > max_data)
return;
const uint64_t total_symbols = static_cast<uint64_t> (ceil (
_size / static_cast<double> (_symbol_size)));
part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
pool_last_reported = -1;
_pool_notify = std::make_shared<std::condition_variable>();
_pool_mtx = std::make_shared<std::mutex>();
use_pool = true;
exiting = false;
}
Decoder (const uint64_t size, const uint16_t symbol_size,
const uint16_t sub_blocks,
const uint8_t blocks,
const uint8_t alignment)
:_size (size), _symbol_size (symbol_size), _blocks (blocks),
_alignment(alignment)
{
if (_size > max_data)
return;
const uint64_t total_symbols = static_cast<uint64_t> (ceil (
_size / static_cast<double> (_symbol_size)));
_sub_blocks = Impl::Partition (_symbol_size / _alignment, sub_blocks);
part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
_pool_notify = std::make_shared<std::condition_variable>();
_pool_mtx = std::make_shared<std::mutex>();
pool_last_reported = -1;
use_pool = true;
exiting = false;
}
std::future<std::pair<Error, uint8_t>> compute (const Compute flags);
// if you can tell there is no more input, we can avoid locking
// forever and return an error.
void end_of_input (const uint8_t block);
void end_of_input();
// result in BYTES
uint64_t decode_bytes (Fwd_It &start, const Fwd_It end, const uint8_t skip);
size_t decode_block_bytes (Fwd_It &start, const Fwd_It end,
const uint8_t skip,
const uint8_t sbn);
// result in ITERATORS
// last *might* be half written depending on data alignments
std::pair<uint64_t, uint8_t> decode_aligned (Fwd_It &start,const Fwd_It end,
const uint8_t skip);
std::pair<size_t, uint8_t> decode_block_aligned (Fwd_It &start,
const Fwd_It end,
const uint8_t skip,
const uint8_t sbn);
// id: 8-bit sbn + 24 bit esi
Error add_symbol (In_It &start, const In_It end, const uint32_t id);
Error add_symbol (In_It &start, const In_It end, const uint32_t esi,
const uint8_t sbn);
void free (const uint8_t sbn);
uint64_t bytes() const;
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
// using shared pointers to avoid locking too much or
// worrying about deleting used stuff.
class RAPTORQ_LOCAL Block_Work final : public Impl::Pool_Work {
public:
std::weak_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> work;
std::weak_ptr<std::condition_variable> notify;
Work_Exit_Status do_work (RaptorQ__v1::Work_State *state) override;
~Block_Work() override;
};
// TODO: tagged pointer
class RAPTORQ_LOCAL Dec {
public:
Dec (const RaptorQ__v1::Block_Size symbols, const uint16_t symbol_size)
{
dec = std::make_shared<RaptorQ__v1::Impl::Raw_Decoder<In_It>> (
reported = false;
}
std::shared_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> dec;
bool reported;
};
static void wait_threads (Decoder<In_It, Fwd_It> *obj, const Compute flags,
std::promise<std::pair<Error, uint8_t>> p);
std::pair<Error, uint8_t> get_report (const Compute flags);
std::shared_ptr<std::condition_variable> _pool_notify;
uint64_t _size;
Impl::Partition part, _sub_blocks;
std::map<uint8_t, Dec> decoders;
std::mutex _mtx;
uint16_t _symbol_size;
int16_t pool_last_reported;
uint8_t _blocks, _alignment;
bool use_pool, exiting;
/////////////////
//
// Encoder
//
/////////////////
exiting = true; // stop notifying thread
std::unique_lock<std::mutex> enc_lock (_mtx);
for (auto &it : encoders) { // stop existing computations
auto ptr = it.second.enc;
if (ptr != nullptr)
ptr->stop();
}
enc_lock.unlock();
_pool_notify->notify_all();
while (pool_wait.size() != 0) {
RQ_OTI_Common_Data Encoder<Rnd_It, Fwd_It>::OTI_Common() const
if (!interleave)
return 0;
RQ_OTI_Common_Data ret;
// first 40 bits: data length.
ret = (static_cast<uint64_t> (_data_to - _data_from) *
sizeof(typename std::iterator_traits<Rnd_It>::value_type)) << 24;
// 8 bits: reserved
// last 16 bits: symbol size
ret += _symbol_size;
return ret;
RQ_OTI_Scheme_Specific_Data Encoder<Rnd_It, Fwd_It>::OTI_Scheme_Specific() const
if (!interleave)
return 0;
RQ_OTI_Scheme_Specific_Data ret;
// 8 bit: source blocks
ret = static_cast<uint32_t> (interleave.blocks()) << 24;
// 16 bit: sub-blocks number (N)
ret += static_cast<uint32_t> (interleave.sub_blocks()) << 8;
// 8 bit: alignment
ret += sizeof(typename std::iterator_traits<Rnd_It>::value_type);
return ret;
template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::precompute_max_memory ()
// give a good estimate on the amount of memory neede for the precomputation
// of one block;
// this will help you understand how many concurrent precomputations
// you want to do :)
if (!interleave)
return 0;
uint16_t symbols = interleave.source_symbols (0);
uint16_t K_idx;
for (K_idx = 0; K_idx < RaptorQ__v1::Impl::K_padded.size(); ++K_idx) {
if (symbols < RaptorQ__v1::Impl::K_padded[K_idx])
break;
}
if (K_idx == RaptorQ__v1::Impl::K_padded.size())
return 0;
auto S_H = RaptorQ__v1::Impl::S_H_W[K_idx];
uint16_t matrix_cols = RaptorQ__v1::Impl::K_padded[K_idx] +
std::get<0> (S_H) +
std::get<1> (S_H);
// Rough memory estimate: Matrix A, matrix X (=> *2) and matrix D.
return matrix_cols * matrix_cols * 2 + _symbol_size * matrix_cols;
template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::Block_Work::~Block_Work()
{
// cleanup. have we benn called before the computation finished?
auto locked_enc = work.lock();
auto locked_notify = notify.lock();
auto locked_mtx = lock.lock();
std::unique_lock<std::mutex> p_lock (*locked_mtx);
RQ_UNUSED(p_lock);
Work_Exit_Status Encoder<Rnd_It, Fwd_It>::Block_Work::do_work (
auto locked_enc = work.lock();
auto locked_notify = notify.lock();
auto locked_mtx = lock.lock();
// encoding always works. It's one of the few constants of the universe.
if (!locked_enc->generate_symbols (state))
return Work_Exit_Status::STOPPED; // or maybe not so constant
work.reset();
std::unique_lock<std::mutex> p_lock (*locked_mtx);
RQ_UNUSED(p_lock);
locked_notify->notify_all();
}
return Work_Exit_Status::DONE;
}
template <typename Rnd_It, typename Fwd_It>
std::future<std::pair<Error, uint8_t>> Encoder<Rnd_It, Fwd_It>::compute (
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
using ret_t = std::pair<Error, uint8_t>;
std::promise<ret_t> p;
bool error = !interleave;
// need some flags
if (flags == Compute::NONE)
error = true;
// flag incompatibilities
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(Compute::NONE != (flags & (Compute::PARTIAL_ANY |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::PARTIAL_ANY) &&
(Compute::NONE != (flags & (Compute::PARTIAL_FROM_BEGINNING |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::COMPLETE) &&
Compute::NONE != (flags &(Compute::PARTIAL_FROM_BEGINNING |
Compute::PARTIAL_ANY |
Compute::NO_POOL))) {
error = true;
}
if (Compute::NONE != (flags & Compute::NO_POOL)) {
std::unique_lock<std::mutex> lock (_mtx);
RQ_UNUSED(lock);
if (encoders.size() != 0) {
// You can only say you won't use the pool *before* you start
// decoding something!
error = true;
} else {
use_pool = false;
p.set_value ({Error::NONE, 0});
return p.get_future();
}
}
if (error) {
p.set_value ({Error::WRONG_INPUT, 0});
return p.get_future();
}
// flags are fine, add work to pool
std::unique_lock<std::mutex> lock (_mtx);
for (uint8_t block = 0; block < blocks(); ++block) {
auto enc = encoders.find (block);
if (enc == encoders.end()) {
bool success;
std::tie (enc, success) = encoders.emplace (
std::piecewise_construct,
std::forward_as_tuple (block),
std::forward_as_tuple (&interleave, block));
assert (success == true);
std::unique_ptr<Block_Work> work = std::unique_ptr<Block_Work>(
new Block_Work());
work->work = enc->second.enc;
work->notify = _pool_notify;
Thread_Pool::get().add_work (std::move(work));
}
}
lock.unlock();
// spawn thread waiting for other thread exit.
// this way we can set_value to the future when needed.
auto future = p.get_future();
if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
wait_threads (this, flags, std::move(p));
} else {
std::unique_lock<std::mutex> pool_wait_lock (*_pool_mtx);
RQ_UNUSED(pool_wait_lock);
pool_wait.emplace_back(wait_threads, this, flags, std::move(p));
}
return future;
void Encoder<Rnd_It, Fwd_It>::wait_threads (Encoder<Rnd_It, Fwd_It> *obj,
const Compute flags,
std::promise<std::pair<Error, uint8_t>> p)
std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
if (obj->exiting) {
p.set_value ({Error::EXITING, 0});
break;
}
auto status = obj->get_report (flags);
if (Error::WORKING != status.first) {
p.set_value (status);
break;
}
_notify->wait (lock);
lock.unlock();
}
// delete ourselves from the waiting thread vector.
std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
if (it->get_id() == std::this_thread::get_id()) {
it->detach();
obj->pool_wait.erase (it);
break;
}
}
std::pair<Error, uint8_t> Encoder<Rnd_It, Fwd_It>::get_report (
if (encoders.size() == 0)
return {Error::WORKING, 0};
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
if (Compute::NONE != (flags & Compute::COMPLETE) ||
Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING)) {
auto it = encoders.begin();
for (; it != encoders.end(); ++it) {
auto ptr = it->second.enc;
if (ptr != nullptr) {
if (!ptr->ready()) {
if (ptr->is_stopped())
return{Error::EXITING, 0};
break;
}
}
}
if (it == encoders.end()) {
pool_last_reported = static_cast<int16_t> (encoders.size() - 1);
return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
}
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(pool_last_reported < (it->first - 1))) {
pool_last_reported = it->first - 1;
return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
}
return {Error::WORKING, 0};
}
if (Compute::NONE != (flags & Compute::PARTIAL_ANY)) {
for (auto &it : encoders) {
if (!it.second.reported) {
auto ptr = it.second.enc;
if (ptr != nullptr) {
if (ptr->ready())
return {Error::NONE, it.first};
if (ptr->is_stopped())
return{Error::EXITING, 0};
}
}
}
}
return {Error::WORKING, 0}; // should never be reached
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
const uint32_t mask_8 = static_cast<uint32_t> (std::pow (2, 8)) - 1;
const uint32_t mask = ~(mask_8 << 24);
return encode (output, end, id & mask, static_cast<uint8_t> (id & mask_8));
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
if (sbn >= interleave.blocks())
return 0;
std::unique_lock<std::mutex> lock (_mtx);
auto it = encoders.find (sbn);
if (use_pool) {
if (it == encoders.end())
return 0;
auto shared_enc = it->second.enc;
if (!shared_enc->ready())
return 0;
lock.unlock();
return shared_enc->Enc (esi, output, end);
} else {
if (it == encoders.end()) {
bool success;
std::tie (it, success) = encoders.emplace (std::make_pair (sbn,
Enc (&interleave, sbn)));
auto shared_enc = it->second.enc;
lock.unlock();
RaptorQ__v1::Work_State state =
RaptorQ__v1::Work_State::KEEP_WORKING;
shared_enc->generate_symbols (&state);
return shared_enc->Enc (esi, output, end);
} else {
auto shared_enc = it->second.enc;
lock.unlock();
if (!shared_enc->ready())
return 0;
return shared_enc->Enc (esi, output, end);
}
}
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::free (const uint8_t sbn)
std::unique_lock<std::mutex> lock (_mtx);
RQ_UNUSED(lock);
auto it = encoders.find (sbn);
if (it != encoders.end())
encoders.erase (it);
template <typename Rnd_It, typename Fwd_It>
uint8_t Encoder<Rnd_It, Fwd_It>::blocks() const
if (!interleave)
return 0;
return interleave.blocks();
template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::block_size (const uint8_t sbn) const
if (!interleave)
return 0;
return interleave.source_symbols (sbn) * interleave.symbol_size();
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbol_size() const
if (!interleave)
return 0;
return interleave.symbol_size();
template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbols (const uint8_t sbn) const
if (!interleave)
return 0;
return interleave.source_symbols (sbn);
template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::max_repair (const uint8_t sbn) const
if (!interleave)
return 0;
return static_cast<uint32_t> (std::pow (2, 20)) -
interleave.source_symbols (sbn);
/////////////////
//
// Decoder
//
/////////////////
template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::~Decoder()
{
for (auto &it : decoders) { // stop existing computations
auto ptr = it.second.dec;
if (ptr != nullptr)
ptr->stop();
}
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::free (const uint8_t sbn)
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
uint32_t esi = (id << 8 ) >> 8;
uint8_t sbn = id >> 24;
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
if (sbn >= _blocks)
return Error::WRONG_INPUT;
std::unique_lock<std::mutex> lock (_mtx);
auto it = decoders.find (sbn);
if (it == decoders.end()) {
const uint16_t symbols = sbn < part.num (0) ?
part.size(0) : part.size(1);
bool success;
std::tie (it, success) = decoders.emplace (std::make_pair(sbn,
Dec (static_cast<RaptorQ__v1::Block_Size>(symbols), _symbol_size)));
assert (success);
}
auto dec = it->second.dec;
lock.unlock();
auto err = dec->add_symbol (start, end, esi);
if (err != Error::NONE)
return err;
// automatically add work to pool if we use it and have enough data
std::unique_lock<std::mutex> pool_lock (*_pool_mtx);
RQ_UNUSED(pool_lock);
if (use_pool && dec->can_decode()) {
bool add_work = dec->add_concurrent (max_block_decoder_concurrency);
if (add_work) {
std::unique_ptr<Block_Work> work = std::unique_ptr<Block_Work>(
new Block_Work());
work->work = dec;
work->notify = _pool_notify;
Impl::Thread_Pool::get().add_work (std::move(work));
}
}
return Error::NONE;
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::end_of_input()
{
std::unique_lock<std::mutex> pool_lock (*_pool_mtx);
std::unique_lock<std::mutex> dec_lock (_mtx);
for (auto &it : decoders)
it.second.dec->end_of_input = true;
dec_lock.unlock();
pool_lock.unlock();
_pool_notify->notify_all();
}
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::end_of_input (const uint8_t block)
{
std::unique_lock<std::mutex> pool_lock (*_pool_mtx);
std::unique_lock<std::mutex> dec_lock (_mtx);
auto it = decoders.find(block);
if (it != decoders.end()) {
it->second.dec->end_of_input = true;
dec_lock.unlock();
pool_lock.unlock();
_pool_notify->notify_all();
}
}
template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::Block_Work::~Block_Work()
{
// have we been called before the computation finished?
auto locked_dec = work.lock();
auto locked_notify = notify.lock();
auto locked_mtx = lock.lock();
std::unique_lock<std::mutex> p_lock (*locked_mtx);
RQ_UNUSED(p_lock);
template <typename In_It, typename Fwd_It>
Work_Exit_Status Decoder<In_It, Fwd_It>::Block_Work::do_work (
auto locked_notify = notify.lock();
auto locked_mtx = lock.lock();
std::unique_lock<std::mutex> p_lock (*locked_mtx, std::defer_lock);
switch (ret) {
case RaptorQ__v1::Impl::Raw_Decoder<In_It>::Decoder_Result::DECODED:
work.reset();
return Work_Exit_Status::DONE;
case RaptorQ__v1::Impl::Raw_Decoder<In_It>::Decoder_Result::NEED_DATA:
p_lock.lock();
if (locked_dec->can_decode()) {
// check again to avoid race between threads
return Work_Exit_Status::REQUEUE;
} else {
locked_dec->drop_concurrent();
if (locked_dec->end_of_input && locked_dec->threads() == 0)
locked_notify->notify_all();
p_lock.unlock();
work.reset();
return Work_Exit_Status::DONE;
}
case RaptorQ__v1::Impl::Raw_Decoder<In_It>::Decoder_Result::STOPPED:
p_lock.lock();
if (locked_dec->ready()) // did an other thread stop us?
locked_dec->drop_concurrent();
work.reset();
return Work_Exit_Status::DONE;
return Work_Exit_Status::STOPPED;
case RaptorQ__v1::Impl::Raw_Decoder<In_It>::Decoder_Result::CAN_RETRY:
return Work_Exit_Status::REQUEUE;
}
}
return Work_Exit_Status::DONE;
std::future<std::pair<Error, uint8_t>> Decoder<In_It, Fwd_It>::compute (
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
using ret_t = std::pair<Error, uint8_t>;
std::promise<ret_t> p;
bool error = false;
// need some flags
if (flags == Compute::NONE)
error = true;
// flag incompatibilities
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(Compute::NONE != (flags & (Compute::PARTIAL_ANY |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::PARTIAL_ANY) &&
(Compute::NONE != (flags & (Compute::PARTIAL_FROM_BEGINNING |
Compute::COMPLETE |
Compute::NO_POOL)))) {
error = true;
} else if (Compute::NONE != (flags & Compute::COMPLETE) &&
Compute::NONE != (flags &(Compute::PARTIAL_FROM_BEGINNING |
Compute::PARTIAL_ANY |
Compute::NO_POOL))) {
error = true;
}
if (Compute::NONE != (flags & Compute::NO_POOL)) {
std::unique_lock<std::mutex> lock (_mtx);
RQ_UNUSED(lock);
if (decoders.size() != 0) {
// You can only say you won't use the pool *before* you start
// decoding something!
error = true;
} else {
use_pool = false;
p.set_value ({Error::NONE, 0});
return p.get_future();
}
}
if (error) {
p.set_value ({Error::WRONG_INPUT, 0});
return p.get_future();
}
// do not add work to the pool to save up memory.
// let "add_symbol craete the Decoders as needed.
// spawn thread waiting for other thread exit.
// this way we can set_value to the future when needed.
auto future = p.get_future();
if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
wait_threads (this, flags, std::move(p));
} else {
std::unique_lock<std::mutex> pool_wait_lock (*_pool_mtx);
RQ_UNUSED(pool_wait_lock);
pool_wait.emplace_back (wait_threads, this, flags, std::move(p));
}
return future;
void Decoder<In_It, Fwd_It>::wait_threads (Decoder<In_It, Fwd_It> *obj,
const Compute flags,
std::promise<std::pair<Error, uint8_t>> p)
std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::EXITING, 0});
break;
}
auto status = obj->get_report (flags);
if (Error::WORKING != status.first) {
p.set_value (status);
break;
}
_notify->wait (lock);
lock.unlock();
}
// delete ourselves from the waiting thread vector.
std::unique_lock<std::mutex> lock (*obj->_pool_mtx);
for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
if (it->get_id() == std::this_thread::get_id()) {
it->detach();
obj->pool_wait.erase (it);