Newer
Older
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
std::unique_ptr<Block_Work> work =std::unique_ptr<Block_Work>(
new Block_Work());
work->work = dec;
work->notify = pool_lock;
Impl::Thread_Pool::get().add_work (std::move(work));
}
}
}
lock.unlock();
// spawn thread waiting for other thread exit.
// this way we can set_value to the future when needed.
auto future = p.get_future();
if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
wait_threads (this, flags, std::move(p));
} else {
std::unique_lock<std::mutex> pool_wait_lock (_mtx);
UNUSED(pool_wait_lock);
pool_wait.emplace_back(wait_threads, this, flags, std::move(p));
}
return future;
void Decoder<In_It, Fwd_It>::wait_threads (Decoder<In_It, Fwd_It> *obj,
const Compute flags,
std::promise<std::pair<Error, uint8_t>> p)
do {
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
// pool is global (static), so wait only for our stuff.
std::unique_lock<std::mutex> lock (obj->pool_lock->first);
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
auto status = obj->get_report (flags);
if (Error::WORKING != status.first) {
p.set_value (status);
break;
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
obj->pool_lock->second.wait (lock); // conditional wait
if (obj->exiting) { // make sure we can exit
p.set_value ({Error::NONE, 0});
break;
}
status = obj->get_report (flags);
lock.unlock();
if (Error::WORKING != status.first) {
p.set_value (status);
break;
}
} while (true);
// delete ourselves from the waiting thread vector.
std::unique_lock<std::mutex> lock (obj->_mtx);
UNUSED (lock);
for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
if (it->get_id() == std::this_thread::get_id()) {
it->detach();
obj->pool_wait.erase (it);
}
}
}
template <typename In_It, typename Fwd_It>
std::pair<Error, uint8_t> Decoder<In_It, Fwd_It>::get_report (
const Compute flags)
{
if (Compute::NONE != (flags & Compute::COMPLETE) ||
Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING)) {
auto it = decoders.begin();
for (; it != decoders.end(); ++it) {
auto ptr = it->second.dec;
if (ptr != nullptr && !ptr->ready())
break;
}
if (it == decoders.end()) {
pool_last_reported = static_cast<int16_t> (decoders.size() - 1);
return {Error::NONE, pool_last_reported};
}
if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
(pool_last_reported < (it->first - 1))) {
pool_last_reported = it->first - 1;
return {Error::NONE, pool_last_reported};
}
return {Error::WORKING, 0};
}
if (Compute::NONE != (flags & Compute::PARTIAL_ANY)) {
for (auto &it : decoders) {
if (!it.second.reported) {
auto ptr = it.second.dec;
if (ptr != nullptr && ptr->ready()) {
return {Error::NONE, it.first};
}
}
}
// can be reached if computing thread was stopped
return {Error::WORKING, 0};
}
template <typename In_It, typename Fwd_It>
std::pair<size_t, uint8_t> Decoder<In_It, Fwd_It>::decode (Fwd_It &start,
const Fwd_It end,
const uint8_t skip)
{
// Decode from the beginning, up untill we can.
uint8_t new_skip = skip;
for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
std::unique_lock<std::mutex> block_lock (_mtx);
return {written, new_skip};
auto dec_ptr = it->second.dec;
block_lock.unlock();
if (!dec_ptr->ready()) {
if (!use_pool && dec_ptr->can_decode()) {
Work_State state = Work_State::KEEP_WORKING;
auto ret = dec_ptr->decode (&state);
if (Impl::Decoder<In_It>::Decoder_Result::DECODED != ret)
return {written, new_skip};;
} else {
return {written, new_skip};
}
}
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
uint64_t tmp_written = de_interleaving (tmp_start, end, new_skip);
return {written, new_skip};
written += static_cast<size_t> (tmp_written);
new_skip = block_size (sbn) %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
// if we ended decoding in the middle of a Fwd_It, do not advance
// start too much, or we will end up having additional zeros.
start = tmp_start;
} else {
// tmp_written > 0 due to earlier return
// moreover, RaptorQ handles at most 881GB per rfc, so
// casting uint64 to int64 is safe
// we can not do "--start" since it's a forward iterator
start += static_cast<int64_t> (tmp_written - 1);
}
uint64_t Decoder<In_It, Fwd_It>::decode_block (Fwd_It &start, const Fwd_It end,
std::shared_ptr<RaptorQ__v1::Impl::Decoder<In_It>> dec_ptr = nullptr;
std::unique_lock<std::mutex> lock (_mtx);
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
if (it == decoders.end())
return 0; // did not receiveany data yet.
if (use_pool) {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready())
return 0; // did not receive enough data, or could not decode yet.
} else {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready()) {
if (!dec_ptr->can_decode())
return 0;
Work_State keep_working = Work_State::KEEP_WORKING;
dec_ptr->decode (&keep_working);
if (!dec_ptr->ready())
return 0;
}
}
// decoder has decoded the block
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::bytes() const
template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks() const
template <typename In_It, typename Fwd_It>
uint32_t Decoder<In_It, Fwd_It>::block_size (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0) * _symbol_size;
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1) * _symbol_size;
}
return 0;
}
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0);
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1);
}
return 0;
}