Newer
Older
if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
wait_threads (this, flags, std::move(p));
} else {
std::unique_lock<std::mutex> pool_wait_lock (_mtx);
UNUSED(pool_wait_lock);
pool_wait.emplace_back(wait_threads, this, flags, std::move(p));
}
return future;
void Decoder<In_It, Fwd_It>::wait_threads (Decoder<In_It, Fwd_It> *obj,
const Compute flags,
std::promise<std::pair<Error, uint8_t>> p)
do {
if (obj->exiting) { // make sure we can exit
break;
}
// pool is global (static), so wait only for our stuff.
std::unique_lock<std::mutex> lock (obj->pool_lock->first);
if (obj->exiting) { // make sure we can exit
break;
}
auto status = obj->get_report (flags);
if (Error::WORKING != status.first) {
p.set_value (status);
break;
obj->pool_lock->second.wait (lock); // conditional wait
if (obj->exiting) { // make sure we can exit
break;
}
status = obj->get_report (flags);
lock.unlock();
if (Error::WORKING != status.first) {
p.set_value (status);
break;
}
} while (true);
// delete ourselves from the waiting thread vector.
std::unique_lock<std::mutex> lock (obj->_mtx);
UNUSED (lock);
for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
if (it->get_id() == std::this_thread::get_id()) {
it->detach();
obj->pool_wait.erase (it);
}
}
}
template <typename In_It, typename Fwd_It>
std::pair<Error, uint8_t> Decoder<In_It, Fwd_It>::get_report (
const Compute flags)
{
if (Compute::COMPLETE == (flags & Compute::COMPLETE) ||
Compute::PARTIAL_FROM_BEGINNING ==
(flags & Compute::PARTIAL_FROM_BEGINNING)) {
// get first non-reported block.
for (;it != decoders.end(); ++it) {
if (pool_last_reported <= it->first)
break;
}
uint16_t reportable = 0;
// get last reportable block
for (; it != decoders.end(); ++it) {
auto ptr = it->second.dec;
if (ptr != nullptr && !ptr->ready())
break;
if (reportable > 0) {
pool_last_reported += reportable;
if (Compute::PARTIAL_FROM_BEGINNING ==
(flags & Compute::PARTIAL_FROM_BEGINNING)) {
return {Error::NONE, pool_last_reported};
} else {
// complete
if (pool_last_reported == _blocks)
return {Error::NONE, pool_last_reported};
}
} else if (Compute::PARTIAL_ANY == (flags & Compute::PARTIAL_ANY)) {
for (auto &it : decoders) {
if (!it.second.reported) {
auto ptr = it.second.dec;
if (ptr != nullptr && ptr->ready()) {
return {Error::NONE, it.first};
}
}
}
// can be reached if computing thread was stopped
return {Error::WORKING, 0};
}
template <typename In_It, typename Fwd_It>
std::pair<size_t, uint8_t> Decoder<In_It, Fwd_It>::decode (Fwd_It &start,
const Fwd_It end,
const uint8_t skip)
{
// Decode from the beginning, up untill we can.
// return number of fwd_it written, plus skip bytes in case of
// blocks and iterators not being aligned.
uint8_t new_skip = skip;
for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
std::unique_lock<std::mutex> block_lock (_mtx);
return {written, new_skip};
auto dec_ptr = it->second.dec;
block_lock.unlock();
if (!dec_ptr->ready()) {
if (!use_pool && dec_ptr->can_decode()) {
Work_State state = Work_State::KEEP_WORKING;
auto ret = dec_ptr->decode (&state);
if (Impl::Decoder<In_It>::Decoder_Result::DECODED != ret)
return {written, new_skip};;
} else {
return {written, new_skip};
}
}
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
uint64_t tmp_written = de_interleaving (tmp_start, end, new_skip);
return {written, new_skip};
written += static_cast<size_t> (tmp_written);
new_skip = block_size (sbn) %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
// if we ended decoding in the middle of a Fwd_It, do not advance
// start too much, or we will end up having additional zeros.
start = tmp_start;
} else {
// tmp_written > 0 due to earlier return
// moreover, RaptorQ handles at most 881GB per rfc, so
// casting uint64 to int64 is safe
// we can not do "--start" since it's a forward iterator
start += static_cast<int64_t> (tmp_written - 1);
}
uint64_t Decoder<In_It, Fwd_It>::decode_block (Fwd_It &start, const Fwd_It end,
std::shared_ptr<RaptorQ__v1::Impl::Decoder<In_It>> dec_ptr = nullptr;
std::unique_lock<std::mutex> lock (_mtx);
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
if (it == decoders.end())
return 0; // did not receiveany data yet.
if (use_pool) {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready())
return 0; // did not receive enough data, or could not decode yet.
} else {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready()) {
if (!dec_ptr->can_decode())
return 0;
Work_State keep_working = Work_State::KEEP_WORKING;
dec_ptr->decode (&keep_working);
if (!dec_ptr->ready())
return 0;
}
}
// decoder has decoded the block
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::bytes() const
template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks() const
template <typename In_It, typename Fwd_It>
uint32_t Decoder<In_It, Fwd_It>::block_size (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0) * _symbol_size;
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1) * _symbol_size;
}
return 0;
}
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0);
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1);
}
return 0;
}