Skip to content
Commits on Source (2)
......@@ -85,7 +85,8 @@ public:
Raw_Decoder (Raw_Decoder&&) = default;
Raw_Decoder& operator= (Raw_Decoder&&) = default;
Error add_symbol (In_It &start, const In_It end, const uint32_t esi);
Error add_symbol (In_It &start, const In_It end, const uint32_t esi,
bool padded);
Decoder_Result decode (Work_State *thread_keep_working);
DenseMtx* get_symbols();
bool has_symbol (const uint16_t symbol) const;
......@@ -241,10 +242,14 @@ bool Raw_Decoder<In_It>::has_symbol (const uint16_t symbol) const
template <typename In_It>
Error Raw_Decoder<In_It>::add_symbol (In_It &start, const In_It end,
const uint32_t esi)
const uint32_t esi, bool padded)
{
// true if added succesfully
// only the last symbol from the RFC interface should be padded,
// but we do not track here which one that is, so we leave that to
// the rfc interface
// if we were lucky to get a random access iterator, quickly check that
// the we have enough data for the symbol.
if (std::is_same<typename std::iterator_traits<In_It>::iterator_category,
......@@ -277,8 +282,14 @@ Error Raw_Decoder<In_It>::add_symbol (In_It &start, const In_It end,
}
// input iterator might reach end before we get enough data
// for the symbol.
if (col != source_symbols.cols())
return Error::WRONG_INPUT;
if (col != source_symbols.cols()) {
if (padded) {
while (col != source_symbols.cols())
source_symbols (static_cast<int32_t>(esi), col++) = 0;
} else {
return Error::WRONG_INPUT;
}
}
} else {
Vect v = Vect (source_symbols.cols());
for (; start != end && col != source_symbols.cols(); ++start) {
......
......@@ -142,6 +142,8 @@ public:
const uint8_t sbn);
// id: 8-bit sbn + 24 bit esi
size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t &id);
size_t encode_packet (Fwd_It &output, const Fwd_It end, const uint32_t &id);
void free (const uint8_t sbn);
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
......@@ -165,7 +167,6 @@ private:
~Block_Work() override;
};
// TODO: tagged pointer
class Enc {
public:
Enc (Impl::Interleaver<Rnd_It> *interleaver, const uint8_t sbn)
......@@ -334,6 +335,8 @@ public:
Error add_symbol (In_It &start, const In_It end, const uint32_t id);
Error add_symbol (In_It &start, const In_It end, const uint32_t esi,
const uint8_t sbn);
Error add_packet (In_It &start, const In_It end);
uint8_t blocks_ready();
bool is_ready();
bool is_block_ready (const uint8_t block);
......@@ -356,7 +359,7 @@ private:
Work_Exit_Status do_work (RaptorQ__v1::Work_State *state) override;
~Block_Work() override;
};
// TODO: tagged pointer
class RAPTORQ_LOCAL Dec {
public:
Dec (const RaptorQ__v1::Block_Size symbols, const uint16_t symbol_size,
......@@ -684,9 +687,11 @@ template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
const uint32_t &id)
{
const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h<uint32_t> (id);
constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
return encode (output, end, id & mask, static_cast<uint8_t> (id >> 24));
return encode (output, end, host_id & mask,
static_cast<uint8_t> (host_id >> 24));
}
template <typename Rnd_It, typename Fwd_It>
......@@ -733,6 +738,109 @@ size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
}
}
template <typename Rnd_It, typename Fwd_It>
size_t Encoder<Rnd_It, Fwd_It>::encode_packet (Fwd_It &output, const Fwd_It end,
const uint32_t &id)
{
// RFC packet, section 4.4.2 page 11
// return the size of the packet in BYTES
using T = typename std::iterator_traits<Fwd_It>::value_type;
// each packet has an header of 32 bits. We can not start writing the
// encoded symbols in the middle of an iterator yet.
if (sizeof(T) > sizeof(uint32_t) || sizeof(T) == 3) {
assert (false && "libRaptorQ: sorry, encde_packets can only be used "
"with types of at most 32 bits for now\n");
return 0;
}
// first of all, check if we have enough space
size_t max_pkt_len;
if (std::is_same<typename std::iterator_traits<Fwd_It>::iterator_category,
std::random_access_iterator_tag>::value) {
// we were lucky with a random iterator.
max_pkt_len = sizeof(T) * (end - output);
} else {
max_pkt_len = 0;
auto out_copy = output;
while (out_copy != end) {
max_pkt_len += sizeof(T);
++out_copy;
}
}
if (max_pkt_len <= sizeof(uint32_t))
return 0; // we can only write the header, or not even that.
max_pkt_len -= sizeof(uint32_t);
constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h<uint32_t> (id);
const uint8_t sbn = host_id >> 24;
uint32_t symbol = host_id & mask;
const uint32_t source_symbols = symbols (sbn);
const bool only_source = (symbol >= source_symbols);
size_t symbols_to_write = 0;
while (max_pkt_len >= _symbol_size) {
if (only_source && symbol >= source_symbols)
break;
uint32_t real_symbol_size;
if (!only_source) {
real_symbol_size = _symbol_size;
} else if (sbn == (blocks() - 1) && symbol == source_symbols - 1) {
// the last symbol the last block can have less bytes than
// a full symbol, and we do not need to send the padding bytes.
real_symbol_size = block_size (sbn) % _symbol_size;
if (real_symbol_size == 0)
real_symbol_size = _symbol_size;
}
if (max_pkt_len <= real_symbol_size)
break;
max_pkt_len -= real_symbol_size;
++symbol;
++symbols_to_write;
}
if (symbols_to_write == 0)
return 0;
// ok, now we can finally start writing something.
// write the header
const uint8_t *p = reinterpret_cast<const uint8_t*> (&id);
uint8_t *p_out = reinterpret_cast<uint8_t*> (&*output);
// manual loop unrolling ftw
*(p_out++) = *(p++);
if (sizeof(T) == 1) {
++output;
p_out = reinterpret_cast<uint8_t*> (&*output);
}
*(p_out++) = *(p++);
if (sizeof(T) == 2 || sizeof(T) == 1) {
++output;
p_out = reinterpret_cast<uint8_t*> (&*output);
}
*(p_out++) = *(p++);
if (sizeof(T) == 1) {
++output;
p_out = reinterpret_cast<uint8_t*> (&*output);
}
*(p_out++) = *(p++);
// sizeof(T) can only be 1,2,4, so we can safely just increment output here
++output;
// FINALLY write the symbols
size_t written = sizeof(uint32_t);
symbol = static_cast<uint8_t> (id >> 24);
while (symbols_to_write > 0) {
size_t tmp_written = encode (output, end, symbol, sbn);
if (tmp_written == 0)
return written == sizeof(uint32_t) ? 0 : written;
++symbol;
}
return written;
}
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::free (const uint8_t sbn)
......@@ -835,9 +943,10 @@ template <typename In_It, typename Fwd_It>
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
const uint32_t id)
{
const uint32_t _id = RaptorQ__v1::Impl::Endian::b_to_h (id);
const uint32_t esi = _id & 0x00FFFFFF;
const uint8_t sbn = _id >> 24;
constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
const uint32_t host_id = RaptorQ__v1::Impl::Endian::b_to_h (id);
const uint32_t esi = host_id & mask;
const uint8_t sbn = host_id >> 24;
return add_symbol (start, end, esi, sbn);
}
......@@ -871,7 +980,11 @@ Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
auto dec = it->second.dec;
lock.unlock();
auto err = dec->add_symbol (start, end, real_esi);
// the last symbol in a block can have less size than the symbol size,
// in which case we should add padding
const bool add_padding = (esi == syms);
auto err = dec->add_symbol (start, end, real_esi, add_padding);
if (err != Error::NONE)
return err;
// automatically add work to pool if we use it and have enough data
......@@ -891,6 +1004,97 @@ Error Decoder<In_It, Fwd_It>::add_symbol (In_It &start, const In_It end,
return Error::NONE;
}
template <typename In_It, typename Fwd_It>
Error Decoder<In_It, Fwd_It>::add_packet (In_It &start, const In_It end)
{
// RFC packet, section 4.4.2 page 11
using T = typename std::iterator_traits<Fwd_It>::value_type;
// each packet has an header of 32 bits. We can not start writing the
// encoded symbols in the middle of an iterator yet.
if (sizeof(T) > sizeof(uint32_t) || sizeof(T) == 3) {
assert (false && "libRaptorQ: sorry, encde_packets can only be used "
"with types of at most 32 bits for now\n");
return Error::INITIALIZATION;
}
// first of all, check the packet length
size_t pkt_len;
if (std::is_same<typename std::iterator_traits<Fwd_It>::iterator_category,
std::random_access_iterator_tag>::value) {
// we were lucky with a random iterator.
pkt_len = sizeof(T) * (end - start);
} else {
pkt_len = 0;
auto start_copy = start;
while (start_copy != end) {
pkt_len += sizeof(T);
++start_copy;
}
}
if (pkt_len < (sizeof(uint32_t) + 1))
return Error::NEED_DATA;
const uint8_t *p = reinterpret_cast<const uint8_t*> (&*start);
uint32_t symbol_id;
uint8_t *p_out = reinterpret_cast<uint8_t*> (&symbol_id);
// manual loop unrolling ftw
*(p_out++) = *(p++);
if (sizeof(T) == 1) {
++start;
p_out = reinterpret_cast<uint8_t*> (&*start);
}
*(p_out++) = *(p++);
if (sizeof(T) == 2 || sizeof(T) == 1) {
++start;
p_out = reinterpret_cast<uint8_t*> (&*start);
}
*(p_out++) = *(p++);
if (sizeof(T) == 1) {
++start;
p_out = reinterpret_cast<uint8_t*> (&*start);
}
*(p_out++) = *(p++);
// sizeof(T) can only be 1,2,4, so we can safely just increment start here
++start;
constexpr uint32_t mask = ~(static_cast<uint32_t>(0xFF) << 24);
const uint32_t host_symbol_id = RaptorQ__v1::Impl::Endian::b_to_h<uint32_t>(
symbol_id);
const uint8_t sbn = host_symbol_id >> 24;
uint32_t symbol = host_symbol_id & mask;
bool only_source = symbol >= symbols (sbn);
pkt_len -= sizeof(uint32_t);
while (pkt_len > 0) {
// we can finally start reading the symbols
uint32_t symbol_length = _symbol_size;
if (only_source && sbn == (blocks() - 1) &&
symbol == (symbols (sbn) - 1)) {
// the last symbol might have much shorter size
// get the real symbol size
symbol_length = block_size (sbn) % _symbol_size;
if (symbol_length == 0)
symbol_length = _symbol_size;
}
if (pkt_len < symbol_length)
break;
auto cp_start = start;
auto err = add_symbol (cp_start, end, symbol, sbn);
if (err != Error::NONE && err != Error::NOT_NEEDED)
return Error::NONE;
start = cp_start;
pkt_len -= symbol_length;
++symbol;
if (only_source && symbol >= symbols (sbn))
return Error::NONE;
}
return Error::WRONG_INPUT;
}
template <typename In_It, typename Fwd_It>
std::vector<bool> Decoder<In_It, Fwd_It>::end_of_input (
const Fill_With_Zeros fill)
......
......@@ -73,7 +73,8 @@ public:
{
if (_enc == nullptr)
return 0;
return _enc->encode (start, end, _id);
return _enc->encode (start, end,
RaptorQ__v1::Impl::Endian::h_to_b<uint32_t> (_id));
}
uint32_t id() const
{ return RaptorQ__v1::Impl::Endian::h_to_b<uint32_t> (_id); }
......
......@@ -609,7 +609,7 @@ Error Decoder<In_It, Fwd_It>::add_symbol (In_It &from, const In_It to,
{
if (symbols_tracker.size() == 0)
return Error::INITIALIZATION;
auto ret = dec.add_symbol (from, to, esi);
auto ret = dec.add_symbol (from, to, esi, false);
if (ret == Error::NONE) {
if (esi < _symbols)
symbols_tracker [2 * esi].store (true);
......
......@@ -70,7 +70,7 @@ template<typename T>
constexpr T rev (const T in, const T acc, const uint8_t i)
{
return (i >= sizeof(T)) ? acc :
rev<T> (in >> 8, (acc << 8) | (in & T{0xFF}), i + 1);
rev<T> (in >> 8, (acc << 8) | (in & T{0xFF}), i + 1);
}
template<typename T>
......