Newer
Older
if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
wait_threads (this, flags, std::move(p));
} else {
std::unique_lock<std::mutex> pool_wait_lock (_mtx);
UNUSED(pool_wait_lock);
pool_wait.emplace_back(wait_threads, this, flags, std::move(p));
}
return future;
void Decoder<In_It, Fwd_It>::wait_threads (Decoder<In_It, Fwd_It> *obj,
const Compute flags,
std::promise<std::pair<Error, uint8_t>> p)
do {
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
// pool is global (static), so wait only for our stuff.
std::unique_lock<std::mutex> lock (obj->pool_lock->first);
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
auto status = obj->get_report (flags);
if (Error::WORKING != status.first) {
p.set_value (status);
break;
obj->pool_lock->second.wait (lock); // conditional wait
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
status = obj->get_report (flags);
lock.unlock();
if (Error::WORKING != status.first) {
p.set_value (status);
break;
}
} while (true);
// delete ourselves from the waiting thread vector.
std::unique_lock<std::mutex> lock (obj->_mtx);
UNUSED (lock);
for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
if (it->get_id() == std::this_thread::get_id()) {
it->detach();
obj->pool_wait.erase (it);
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
}
}
}
template <typename In_It, typename Fwd_It>
std::pair<Error, uint8_t> Decoder<In_It, Fwd_It>::get_report (
const Compute flags)
{
if (Compute::NONE != (flags & Compute::COMPLETE) ||
Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING)) {
auto it = decoders.begin();
for (; it != decoders.end(); ++it) {
auto ptr = it->second.dec;
if (ptr != nullptr && !ptr->ready())
break;
}
if (it == decoders.end()) {
pool_last_reported = static_cast<int16_t> (decoders.size() - 1);
return {Error::NONE, pool_last_reported};
}
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(pool_last_reported < (it->first - 1))) {
pool_last_reported = it->first - 1;
return {Error::NONE, pool_last_reported};
}
return {Error::WORKING, 0};
}
if (Compute::NONE != (flags & Compute::PARTIAL_ANY)) {
for (auto &it : decoders) {
if (!it.second.reported) {
auto ptr = it.second.dec;
if (ptr != nullptr && ptr->ready()) {
return {Error::NONE, it.first};
}
}
}
// can be reached if computing thread was stopped
return {Error::WORKING, 0};
}
template <typename In_It, typename Fwd_It>
std::pair<size_t, uint8_t> Decoder<In_It, Fwd_It>::decode (Fwd_It &start,
const Fwd_It end,
const uint8_t skip)
{
// Decode from the beginning, up untill we can.
uint8_t new_skip = skip;
for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
std::unique_lock<std::mutex> block_lock (_mtx);
return {written, new_skip};
auto dec_ptr = it->second.dec;
block_lock.unlock();
if (!dec_ptr->ready()) {
if (!use_pool && dec_ptr->can_decode()) {
Work_State state = Work_State::KEEP_WORKING;
auto ret = dec_ptr->decode (&state);
if (Impl::Decoder<In_It>::Decoder_Result::DECODED != ret)
return {written, new_skip};;
} else {
return {written, new_skip};
}
}
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
uint64_t tmp_written = de_interleaving (tmp_start, end, new_skip);
return {written, new_skip};
written += static_cast<size_t> (tmp_written);
new_skip = block_size (sbn) %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
// if we ended decoding in the middle of a Fwd_It, do not advance
// start too much, or we will end up having additional zeros.
start = tmp_start;
} else {
// tmp_written > 0 due to earlier return
// moreover, RaptorQ handles at most 881GB per rfc, so
// casting uint64 to int64 is safe
// we can not do "--start" since it's a forward iterator
start += static_cast<int64_t> (tmp_written - 1);
}
uint64_t Decoder<In_It, Fwd_It>::decode_block (Fwd_It &start, const Fwd_It end,
std::shared_ptr<RaptorQ__v1::Impl::Decoder<In_It>> dec_ptr = nullptr;
std::unique_lock<std::mutex> lock (_mtx);
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
if (it == decoders.end())
return 0; // did not receiveany data yet.
if (use_pool) {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready())
return 0; // did not receive enough data, or could not decode yet.
} else {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready()) {
if (!dec_ptr->can_decode())
return 0;
Work_State keep_working = Work_State::KEEP_WORKING;
dec_ptr->decode (&keep_working);
if (!dec_ptr->ready())
return 0;
}
}
// decoder has decoded the block
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::bytes() const
template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks() const
template <typename In_It, typename Fwd_It>
uint32_t Decoder<In_It, Fwd_It>::block_size (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0) * _symbol_size;
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1) * _symbol_size;
}
return 0;
}
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0);
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1);
}
return 0;
}