Newer
Older
if (error) {
p.set_value ({Error::WRONG_INPUT, 0});
return p.get_future();
}
// do not add work to the pool to save up memory.
// let "add_symbol craete the Decoders as needed.
// spawn thread waiting for other thread exit.
// this way we can set_value to the future when needed.
auto future = p.get_future();
if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
wait_threads (this, flags, std::move(p));
} else {
std::unique_lock<std::mutex> pool_wait_lock (_mtx);
UNUSED(pool_wait_lock);
pool_wait.emplace_back(wait_threads, this, flags, std::move(p));
}
return future;
void Decoder<In_It, Fwd_It>::wait_threads (Decoder<In_It, Fwd_It> *obj,
const Compute flags,
std::promise<std::pair<Error, uint8_t>> p)
do {
if (obj->exiting) { // make sure we can exit
break;
}
// pool is global (static), so wait only for our stuff.
std::unique_lock<std::mutex> lock (obj->pool_lock->first);
if (obj->exiting) { // make sure we can exit
break;
}
auto status = obj->get_report (flags);
if (Error::WORKING != status.first) {
p.set_value (status);
break;
obj->pool_lock->second.wait (lock); // conditional wait
if (obj->exiting) { // make sure we can exit
break;
}
status = obj->get_report (flags);
lock.unlock();
if (Error::WORKING != status.first) {
p.set_value (status);
break;
}
} while (true);
// delete ourselves from the waiting thread vector.
std::unique_lock<std::mutex> lock (obj->_mtx);
UNUSED (lock);
for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
if (it->get_id() == std::this_thread::get_id()) {
it->detach();
obj->pool_wait.erase (it);
}
}
}
template <typename In_It, typename Fwd_It>
std::pair<Error, uint8_t> Decoder<In_It, Fwd_It>::get_report (
const Compute flags)
{
if (Compute::COMPLETE == (flags & Compute::COMPLETE) ||
Compute::PARTIAL_FROM_BEGINNING ==
(flags & Compute::PARTIAL_FROM_BEGINNING)) {
// get first non-reported block.
for (;it != decoders.end(); ++it) {
if (pool_last_reported <= it->first)
break;
}
uint16_t reportable = 0;
// get last reportable block
for (; it != decoders.end(); ++it) {
auto ptr = it->second.dec;
if (ptr != nullptr && !ptr->ready())
break;
if (reportable > 0) {
pool_last_reported += reportable;
if (Compute::PARTIAL_FROM_BEGINNING ==
(flags & Compute::PARTIAL_FROM_BEGINNING)) {
return {Error::NONE, pool_last_reported};
} else {
// complete
if (pool_last_reported == _blocks)
return {Error::NONE, pool_last_reported};
}
} else if (Compute::PARTIAL_ANY == (flags & Compute::PARTIAL_ANY)) {
for (auto &it : decoders) {
if (!it.second.reported) {
auto ptr = it.second.dec;
if (ptr != nullptr && ptr->ready()) {
return {Error::NONE, it.first};
}
}
}
// can be reached if computing thread was stopped
return {Error::WORKING, 0};
}
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::decode_bytes (Fwd_It &start, const Fwd_It end,
const uint8_t skip)
{
// Decode from the beginning, up untill we can.
// return number of BYTES written, starting at "start + skip" bytes
//
// in case the last iterator is only half written, "start" will
// point to the half-written iterator.
uint8_t new_skip = skip;
for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
std::unique_lock<std::mutex> block_lock (_mtx);
auto dec_ptr = it->second.dec;
block_lock.unlock();
if (!dec_ptr->ready()) {
if (!use_pool && dec_ptr->can_decode()) {
Work_State state = Work_State::KEEP_WORKING;
auto ret = dec_ptr->decode (&state);
if (Impl::Decoder<In_It>::Decoder_Result::DECODED != ret)
}
}
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
// FIXME FIXME FIXME FIXME FIXME:
// the size of the user data is *NOT* always aligned with the
// symbol, and not with the iterator.
// Therefore we need to limit the amount of written data
// when writing the last block. limit with data size, not block size.
uint64_t max_bytes = block_size (sbn);
if (sbn == (blocks() - 1)) {
// size of the data (_size) is different from the sum of the size of
// all blocks. get the real size, so we do not write more.
// we obviously need to consider this only for the last block.
uint64_t all_blocks = 0;
for (uint8_t id = 0; id < blocks(); ++id)
all_blocks += block_size (sbn);
const uint64_t diff = all_blocks - _size;
max_bytes -= diff;
}
uint64_t bytes_written = de_interleaving (tmp_start, end, max_bytes,
new_skip);
written += bytes_written;
uint64_t bytes_and_skip = new_skip + bytes_written;
new_skip = bytes_and_skip %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
if (bytes_written == 0)
return written;
//new_skip = block_size (sbn) %
// sizeof(typename std::iterator_traits<Fwd_It>::value_type);
// if we ended decoding in the middle of a Fwd_It, do not advance
// start too much, or we will end up having additional zeros.
uint64_t it_written = bytes_and_skip /
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
// RaptorQ handles at most 881GB per rfc, so
// casting uint64 to int64 is safe
// we can not do "--start" since it's a forward iterator
start += std::max (static_cast<int64_t> (0),
static_cast<int64_t> (it_written - 1));
uint64_t Decoder<In_It, Fwd_It>::decode_block_bytes (Fwd_It &start,
const Fwd_It end,
const uint8_t skip,
std::shared_ptr<RaptorQ__v1::Impl::Decoder<In_It>> dec_ptr = nullptr;
std::unique_lock<std::mutex> lock (_mtx);
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
if (it == decoders.end())
return 0; // did not receiveany data yet.
if (use_pool) {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready())
return 0; // did not receive enough data, or could not decode yet.
} else {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready()) {
if (!dec_ptr->can_decode())
return 0;
Work_State keep_working = Work_State::KEEP_WORKING;
dec_ptr->decode (&keep_working);
if (!dec_ptr->ready())
return 0;
}
}
// decoder has decoded the block
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
uint64_t max_bytes = block_size (sbn);
if (sbn == (blocks() - 1)) {
// size of the data (_size) is different from the sum of the size of
// all blocks. get the real size, so we do not write more.
// we obviously need to consider this only for the last block.
uint64_t all_blocks = 0;
for (uint8_t id = 0; id < blocks(); ++id)
all_blocks += block_size (sbn);
const uint64_t diff = all_blocks - _size;
max_bytes -= diff;
}
return de_interleaving (start, end, max_bytes, skip);
}
template <typename In_It, typename Fwd_It>
std::pair<size_t, uint8_t> Decoder<In_It, Fwd_It>::decode_aligned (
Fwd_It &start,
const Fwd_It end,
const uint8_t skip)
{
const uint64_t bytes = decode_bytes (start, end, skip);
const uint64_t skip_and_bytes = skip + bytes;
const uint64_t iterators = skip_and_bytes /
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
const uint8_t new_skip = skip_and_bytes %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
return {iterators, new_skip};
}
template <typename In_It, typename Fwd_It>
std::pair<size_t, uint8_t> Decoder<In_It, Fwd_It>::decode_block_aligned (
Fwd_It &start,
const Fwd_It end,
const uint8_t skip,
const uint8_t sbn)
{
const uint64_t bytes = decode_block_bytes (start, end, skip, sbn);
const uint64_t skip_and_bytes = skip + bytes;
const uint64_t iterators = skip_and_bytes /
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
const uint8_t new_skip = skip_and_bytes %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
return {iterators, new_skip};
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::bytes() const
template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks() const
template <typename In_It, typename Fwd_It>
uint32_t Decoder<In_It, Fwd_It>::block_size (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0) * _symbol_size;
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1) * _symbol_size;
}
return 0;
}
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0);
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1);
}
return 0;
}