Newer
Older
}
template <typename In_It, typename Fwd_It>
std::pair<Error, uint8_t> Decoder<In_It, Fwd_It>::get_report (
if (decoders.size() == 0)
return {Error::WORKING, 0};
if (Compute::COMPLETE == (flags & Compute::COMPLETE) ||
Compute::PARTIAL_FROM_BEGINNING ==
(flags & Compute::PARTIAL_FROM_BEGINNING)) {
uint16_t reportable = 0;
uint16_t next_expected = static_cast<uint16_t> (pool_last_reported + 1);
std::unique_lock<std::mutex> dec_lock (_mtx);
auto it = decoders.lower_bound (static_cast<uint8_t> (next_expected));
auto id = it->first;
if (id != next_expected)
break; // not consecutive
if (ptr == nullptr) {
assert(false && "RFC6330: decoder should never be nullptr.");
break;
}
if (!ptr->ready()) {
if (ptr->is_stopped())
return {Error::EXITING, 0};
if (ptr->end_of_input && ptr->threads() == 0)
return {Error::NEED_DATA, 0};
break; // still working
}
if (reportable > 0) {
pool_last_reported += reportable;
if (Compute::PARTIAL_FROM_BEGINNING ==
(flags & Compute::PARTIAL_FROM_BEGINNING)) {
return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
return {Error::NONE,
static_cast<uint8_t>(pool_last_reported)};
} else if (Compute::PARTIAL_ANY == (flags & Compute::PARTIAL_ANY)) {
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
// FIXME: locking might not be necessary. map emplace/erase do not
// invalidate other pointers.
auto undecodable = decoders.end();
std::unique_lock<std::mutex> dec_lock (_mtx);
RQ_UNUSED(dec_lock);
for (auto it = decoders.begin(); it != decoders.end(); ++it) {
if (!it->second.reported) {
auto ptr = it->second.dec;
if (ptr == nullptr) {
assert(false && "RFC6330: decoder should never be nullptr");
break;
}
if (ptr->ready()) {
it->second.reported = true;
return {Error::NONE, it->first};
}
if (ptr->is_stopped())
return {Error::EXITING, 0};
// first return all decodable blocks
// then return the ones we can not decode.
if (ptr->end_of_input && ptr->threads() == 0)
undecodable = it;
if (undecodable != decoders.end()) {
undecodable->second.reported = true;
return {Error::NEED_DATA, undecodable->first};
}
// can be reached if computing thread was stopped
return {Error::WORKING, 0};
}
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::decode_bytes (Fwd_It &start, const Fwd_It end,
const uint8_t skip)
{
// Decode from the beginning, up untill we can.
// return number of BYTES written, starting at "start + skip" bytes
//
// in case the last iterator is only half written, "start" will
// point to the half-written iterator.
uint8_t new_skip = skip;
for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
std::unique_lock<std::mutex> block_lock (_mtx);
auto dec_ptr = it->second.dec;
block_lock.unlock();
if (!dec_ptr->ready()) {
if (!use_pool && dec_ptr->can_decode()) {
RaptorQ__v1::Work_State state =
RaptorQ__v1::Work_State::KEEP_WORKING;
Decoder_Result::DECODED != ret) {
}
}
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
if (sbn == (blocks() - 1)) {
// size of the data (_size) is different from the sum of the size of
// all blocks. get the real size, so we do not write more.
// we obviously need to consider this only for the last block.
uint64_t all_blocks = 0;
for (uint8_t id = 0; id < blocks(); ++id)
all_blocks += block_size (sbn);
const size_t diff = static_cast<size_t> (all_blocks - _size);
uint64_t bytes_written = de_interleaving (tmp_start, end, max_bytes,
new_skip);
written += bytes_written;
uint64_t bytes_and_skip = new_skip + bytes_written;
new_skip = bytes_and_skip %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
if (bytes_written == 0)
return written;
//new_skip = block_size (sbn) %
// sizeof(typename std::iterator_traits<Fwd_It>::value_type);
// if we ended decoding in the middle of a Fwd_It, do not advance
// start too much, or we will end up having additional zeros.
uint64_t it_written = bytes_and_skip /
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
// RaptorQ handles at most 881GB per rfc, so
// casting uint64 to int64 is safe
// we can not do "--start" since it's a forward iterator
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wshorten-64-to-32"
#pragma clang diagnostic ignored "-Wsign-conversion"
start += std::max (static_cast<uint64_t>(0), it_written - 1);
#pragma clang diagnostic pop
size_t Decoder<In_It, Fwd_It>::decode_block_bytes (Fwd_It &start,
const Fwd_It end,
const uint8_t skip,
std::shared_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> dec_ptr = nullptr;
std::unique_lock<std::mutex> lock (_mtx);
if (it == decoders.end())
return 0; // did not receiveany data yet.
if (use_pool) {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready())
return 0; // did not receive enough data, or could not decode yet.
} else {
dec_ptr = it->second.dec;
lock.unlock();
if (!dec_ptr->ready()) {
if (!dec_ptr->can_decode())
return 0;
RaptorQ__v1::Work_State keep_working =
RaptorQ__v1::Work_State::KEEP_WORKING;
dec_ptr->decode (&keep_working);
if (!dec_ptr->ready())
return 0;
}
}
// decoder has decoded the block
Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
if (sbn == (blocks() - 1)) {
// size of the data (_size) is different from the sum of the size of
// all blocks. get the real size, so we do not write more.
// we obviously need to consider this only for the last block.
uint64_t all_blocks = 0;
for (uint8_t id = 0; id < blocks(); ++id)
all_blocks += block_size (sbn);
const uint64_t diff = all_blocks - _size;
}
return de_interleaving (start, end, max_bytes, skip);
}
template <typename In_It, typename Fwd_It>
std::pair<uint64_t, uint8_t> Decoder<In_It, Fwd_It>::decode_aligned (
Fwd_It &start,
const Fwd_It end,
const uint8_t skip)
{
const uint64_t bytes = decode_bytes (start, end, skip);
const uint64_t skip_and_bytes = skip + bytes;
const uint64_t iterators = skip_and_bytes /
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
const uint8_t new_skip = skip_and_bytes %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
return {iterators, new_skip};
}
template <typename In_It, typename Fwd_It>
std::pair<size_t, uint8_t> Decoder<In_It, Fwd_It>::decode_block_aligned (
Fwd_It &start,
const Fwd_It end,
const uint8_t skip,
const uint8_t sbn)
{
const size_t bytes = decode_block_bytes (start, end, skip, sbn);
const size_t skip_and_bytes = skip + bytes;
const size_t iterators = skip_and_bytes /
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
const uint8_t new_skip = skip_and_bytes %
sizeof(typename std::iterator_traits<Fwd_It>::value_type);
return {iterators, new_skip};
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::bytes() const
template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks() const
template <typename In_It, typename Fwd_It>
uint32_t Decoder<In_It, Fwd_It>::block_size (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0) * _symbol_size;
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1) * _symbol_size;
}
return 0;
}
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0);
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1);
}
return 0;
}
} // namespace Impl
} // namespace RFC6330__v1