Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/*
* Copyright (c) 2015, Luca Fulchir<luca@fulchir.it>, All rights reserved.
*
* This file is part of "libRaptorQ".
*
* libRaptorQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3
* of the License, or (at your option) any later version.
*
* libRaptorQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* and a copy of the GNU Lesser General Public License
* along with libRaptorQ. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef RAPTORQ_HPP
#define RAPTORQ_HPP
/////////////////////
//
// These templates are just a wrapper around the
// functionalities offered by the RaptorQ::Impl namespace
// So if you want to see what the algorithm looks like,
// you are in the wrong place
//
/////////////////////
#include "Interleaver.hpp"
#include "De_Interleaver.hpp"
#include "Encoder.hpp"
#include "Decoder.hpp"
#include <map>
#include <mutex>
#include <thread>
#include <type_traits>
#include <utility>
namespace RaptorQ {
Symbol (Encoder<Rnd_It, Out_It> *enc, const uint32_t esi, const uint8_t sbn)
uint32_t id()
{
uint32_t ret = _sbn;
ret <<= 24;
return ret + _sbn;
}
public std::iterator<std::input_iterator_tag, Symbol<Rnd_It, Out_It>>
Symbol_Iterator (Encoder<Rnd_It, Out_It> *enc, const uint32_t esi,
const uint8_t sbn)
bool operator== (const Symbol_Iterator<Rnd_It, Out_It> &it) const
bool operator!= (const Symbol_Iterator<Rnd_It, Out_It> &it) const
return it._esi != _esi || it._sbn != _sbn;
}
private:
Block (Encoder<Rnd_It, Out_It> *enc, const uint16_t symbols,
const uint8_t sbn)
return Symbol_Iterator<Rnd_It, Out_It> (_enc, 0, _sbn);
return Symbol_Iterator<Rnd_It, Out_It> (_enc, _symbols, _sbn);
return Symbol_Iterator<Rnd_It, Out_It> (_enc, _symbols, _sbn);
Symbol_Iterator<Rnd_It, Out_It> end_repair (const uint32_t max_repair)
const
return Symbol_Iterator<Rnd_It, Out_It> (_enc, _symbols + max_r,_sbn);
uint32_t max_repair()
{
return _enc->max_repair (_sbn);
}
public std::iterator<std::input_iterator_tag, Block<Rnd_It, Out_It>>
Block_Iterator (Encoder<Rnd_It, Out_It> *enc, const Impl::Partition part,
uint8_t sbn)
return Block<Rnd_It, Out_It> (_enc, _part.size (0), _sbn);
return Block<Rnd_It, Out_It> (_enc, _part.size (1), _sbn);
Block_Iterator ret = *this;
ret._sbn += i;
return ret;
}
bool operator== (const Block_Iterator &it) const
bool operator!= (const Block_Iterator &it) const
uint8_t _sbn;
};
static const uint64_t max_data = 946270874880;
typedef uint64_t OTI_Common_Data;
typedef uint32_t OTI_Scheme_Specific_Data;
const uint16_t min_subsymbol_size,
const uint16_t symbol_size,
const size_t max_memory)
IS_RANDOM(Rnd_It, "RaptorQ::Encoder");
IS_OUTPUT(Out_It, "RaptorQ::Encoder");
interleave = std::unique_ptr<Impl::Interleaver<Rnd_It>> (
new Impl::Interleaver<Rnd_It> (_data_from, _data_to,
return Block_Iterator<Rnd_It, Out_It> (this,
interleave->get_partition(), 0);
bool operator()() const { return interleave != nullptr; }
OTI_Common_Data OTI_Common() const;
OTI_Scheme_Specific_Data OTI_Scheme_Specific() const;
void precompute (const uint8_t threads, const bool background);
uint64_t encode (Out_It &output, const Out_It end, const uint32_t esi,
const uint8_t sbn);
uint64_t encode (Out_It &output, const Out_It end, const uint32_t &id);
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
uint32_t max_repair (const uint8_t sbn) const;
Locked_Encoder (const Impl::Interleaver<Rnd_It> &symbols,
const uint8_t SBN)
std::unique_ptr<Impl::Interleaver<Rnd_It>> interleave = nullptr;
std::map<uint8_t, std::shared_ptr<Locked_Encoder>> encoders;
const size_t _mem;
const Rnd_It _data_from, _data_to;
const uint16_t _symbol_size;
static void precompute_block_all (Encoder<Rnd_It, Out_It> *obj,
const uint8_t threads);
static void precompute_thread (Encoder<Rnd_It, Out_It> *obj, uint8_t *sbn,
class RAPTORQ_API Decoder
{
public:
// using shared pointers to avoid locking too much or
// worrying about deleting used stuff.
using Dec_ptr = std::shared_ptr<RaptorQ::Impl::Decoder<In_It>>;
// rfc 6330, pg 6
// easy explanation for OTI_* comes next.
// we do NOT use bitfields as compilators are not actually forced to put
// them in any particular order. meaning tey're useless.
//
//union OTI_Common_Data {
// uint64_t raw;
// struct {
// uint64_t size:40;
// uint8_t reserved:8;
// uint16_t symbol_size:16;
// };
//};
//union OTI_Scheme_Specific_Data {
// uint32_t raw;
// struct {
// uint8_t source_blocks;
// uint16_t sub_blocks;
// uint8_t alignment;
// };
//};
Decoder (const OTI_Common_Data common,const OTI_Scheme_Specific_Data scheme)
IS_INPUT(In_It, "RaptorQ::Decoder");
IS_OUTPUT(Out_It, "RaptorQ::Decoder");
// see the above commented bitfields for quick reference
_symbol_size = static_cast<uint16_t> (common);
_sub_blocks = static_cast<uint16_t> (scheme >> 8);
_blocks = static_cast<uint8_t> (scheme >> 24);
// (common >> 24) == total file size
const uint64_t size = common >> 24;
if (size > max_data)
using T_in = typename std::iterator_traits<In_It>::value_type;
part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
Decoder (const uint64_t size, const uint16_t symbol_size,
const uint16_t sub_blocks, const uint8_t blocks)
:_symbol_size (symbol_size), _sub_blocks (sub_blocks), _blocks (blocks)
{
if (size > max_data)
return;
using T_in = typename std::iterator_traits<In_It>::value_type;
const uint64_t total_symbols = static_cast<uint64_t> (ceil (
static_cast<double> (size * sizeof(T_in)) /
static_cast<double> (_symbol_size)));
part = Impl::Partition (total_symbols, static_cast<uint8_t> (_blocks));
}
uint64_t decode (Out_It &start, const Out_It end);
uint64_t decode (Out_It &start, const Out_It end, const uint8_t sbn);
bool add_symbol (In_It &start, const In_It end, const uint32_t id);
bool add_symbol (In_It &start, const In_It end, const uint32_t esi,
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
std::map<uint8_t, Dec_ptr> decoders;
std::mutex _mtx;
};
/////////////////
//
// Encoder
//
/////////////////
template <typename Rnd_It, typename Out_It>
OTI_Common_Data Encoder<Rnd_It, Out_It>::OTI_Common() const
template <typename Rnd_It, typename Out_It>
OTI_Scheme_Specific_Data Encoder<Rnd_It, Out_It>::OTI_Scheme_Specific() const
ret += sizeof(typename std::iterator_traits<Rnd_It>::value_type);
template <typename Rnd_It, typename Out_It>
size_t Encoder<Rnd_It, Out_It>::precompute_max_memory ()
{
// give a good estimate on the amount of memory neede for the precomputation
// of one block;
// this will help you understand how many concurrent precomputations
// you want to do :)
if (interleave == nullptr)
return 0;
uint16_t symbols = interleave->source_symbols(0);
uint16_t K_idx;
for (K_idx = 0; K_idx < Impl::K_padded.size(); ++K_idx) {
if (symbols < Impl::K_padded[K_idx])
break;
}
if (K_idx == Impl::K_padded.size())
return 0;
auto S_H = Impl::S_H_W[K_idx];
uint16_t matrix_cols = Impl::K_padded[K_idx] + std::get<0> (S_H) +
std::get<1> (S_H);
// Rough memory estimate: Matrix A, matrix X (=> *2) and matrix D.
return matrix_cols * matrix_cols * 2 + _symbol_size * matrix_cols;
template <typename Rnd_It, typename Out_It>
void Encoder<Rnd_It, Out_It>::precompute_thread (Encoder<Rnd_It, Out_It> *obj,
uint8_t *sbn,
// if "sbn" pointer is NOT nullptr, than we are a thread from
// from a precompute_block_all. This means that we need to update
// the value of sbn as soon as we get our work.
//
// if sbn == nullptr, then we have been called to work on a single
// sbn, and not from "precompute_block_all".
// This means we work on "single_sbn", and do not touch "sbn"
uint8_t *sbn_ptr = sbn;
if (sbn_ptr == nullptr)
sbn_ptr = const_cast<uint8_t*> (&single_sbn);
while (*sbn_ptr < obj->interleave->blocks()) {
obj->_mtx.lock();
if (*sbn_ptr >= obj->interleave->blocks()) {
obj->_mtx.unlock();
return;
}
auto it = obj->encoders.find (*sbn_ptr);
if (it == obj->encoders.end()) {
bool locked = enc_ptr->_mtx.try_lock();
if (sbn != nullptr)
++(*sbn);
obj->_mtx.unlock();
if (locked) { // if not locked, someone else is already waiting
// on this. so don't do the same work twice.
enc_ptr->_enc.generate_symbols();
enc_ptr->_mtx.unlock();
template <typename Rnd_It, typename Out_It>
void Encoder<Rnd_It, Out_It>::precompute (const uint8_t threads,
const bool background)
{
if (background) {
std::thread t (precompute_block_all, this, threads);
t.detach();
} else {
return precompute_block_all (this, threads);
}
}
template <typename Rnd_It, typename Out_It>
void Encoder<Rnd_It, Out_It>::precompute_block_all (
Encoder<Rnd_It, Out_It> *obj,
const uint8_t threads)
{
// precompute all intermediate symbols, do it with more threads.
ssize_t spawned = threads - 1;
if (spawned == -1)
spawned = std::thread::hardware_concurrency();
if (spawned > 0)
t.reserve (static_cast<size_t> (spawned));
for (int8_t id = 0; id < spawned; ++id)
// do the work ourselves
precompute_thread (obj, &sbn, 0);
// join other threads
for (uint8_t id = 0; id < spawned; ++id)
t[id].join();
}
template <typename Rnd_It, typename Out_It>
uint64_t Encoder<Rnd_It, Out_It>::encode (Out_It &output, const Out_It end,
const uint32_t mask_8 = static_cast<uint32_t> (std::pow (2, 8)) - 1;
const uint32_t mask = ~(mask_8 << 24);
return encode (output, end, id & mask, static_cast<uint8_t> (id & mask_8));
template <typename Rnd_It, typename Out_It>
uint64_t Encoder<Rnd_It, Out_It>::encode (Out_It &output, const Out_It end,
_mtx.lock();
auto it = encoders.find (sbn);
if (it == encoders.end()) {
bool success;
std::tie (it, success) = encoders.emplace (sbn,
std::make_shared<Locked_Encoder> (*interleave, sbn));
std::thread background (precompute_thread, this, nullptr, sbn);
background.detach();
if (esi >= interleave->source_symbols (sbn)) {
// make sure we generated the intermediate symbols
enc_ptr->_mtx.lock();
enc_ptr->_enc.generate_symbols();
enc_ptr->_mtx.unlock();
}
template <typename Rnd_It, typename Out_It>
void Encoder<Rnd_It, Out_It>::free (const uint8_t sbn)
{
_mtx.lock();
auto it = encoders.find (sbn);
if (it != encoders.end())
encoders.erase (it);
_mtx.unlock();
}
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
template <typename Rnd_It, typename Out_It>
uint8_t Encoder<Rnd_It, Out_It>::blocks() const
{
return interleave->blocks();
}
template <typename Rnd_It, typename Out_It>
uint32_t Encoder<Rnd_It, Out_It>::block_size (const uint8_t sbn) const
{
return interleave->source_symbols (sbn) * interleave->symbol_size();
}
template <typename Rnd_It, typename Out_It>
uint16_t Encoder<Rnd_It, Out_It>::symbol_size() const
{
if (interleave == nullptr)
return 0;
return interleave->symbol_size();
}
template <typename Rnd_It, typename Out_It>
uint16_t Encoder<Rnd_It, Out_It>::symbols (const uint8_t sbn) const
{
if (interleave == nullptr)
return 0;
return interleave->source_symbols (sbn);
}
template <typename Rnd_It, typename Out_It>
uint32_t Encoder<Rnd_It, Out_It>::max_repair (const uint8_t sbn) const
{
return static_cast<uint32_t> (std::pow (2, 20)) -
interleave->source_symbols (sbn);
/////////////////
//
// Decoder
//
/////////////////
template <typename In_It, typename Out_It>
void Decoder<In_It, Out_It>::free (const uint8_t sbn)
{
_mtx.lock();
auto it = decoders.find(sbn);
if (it != decoders.end())
decoders.erase(it);
_mtx.unlock();
}
template <typename In_It, typename Out_It>
bool Decoder<In_It, Out_It>::add_symbol (In_It &start, const In_It end,
template <typename In_It, typename Out_It>
bool Decoder<In_It, Out_It>::add_symbol (In_It &start, const In_It end,
const uint32_t esi,
const uint8_t sbn)
{
if (sbn >= _blocks)
return false;
_mtx.lock();
auto it = decoders.find (sbn);
if (it == decoders.end()) {
const uint16_t symbols = sbn < part.num (0) ?
part.size(0) : part.size(1);
decoders.insert ({sbn, std::make_shared<Impl::Decoder<In_It>> (
symbols, _symbol_size)});
it = decoders.find (sbn);
}
auto dec = it->second;
_mtx.unlock();
uint64_t Decoder<In_It, Out_It>::decode (Out_It &start, const Out_It end)
{
for (uint8_t sbn = 0; sbn < _blocks; ++sbn) {
_mtx.lock();
auto it = decoders.find (sbn);
if (it == decoders.end()) {
}
auto dec = it->second;
_mtx.unlock();
if (!dec->decode())
return 0;
}
uint32_t written = 0;
for (uint8_t sbn = 0; sbn < _blocks; ++sbn) {
_mtx.lock();
auto it = decoders.find (sbn);
if (it == decoders.end())
return written;
auto dec = it->second;
_mtx.unlock();
dec->get_symbols(), _sub_blocks);
written += de_interleaving (start, end);
}
return written;
}
uint64_t Decoder<In_It, Out_It>::decode (Out_It &start, const Out_It end,
{
if (sbn >= _blocks)
return 0;
_mtx.lock();
auto it = decoders.find (sbn);
if (it == decoders.end()) {
_mtx.unlock();
return 0;
}
auto dec = it->second;
_mtx.unlock();
if (!dec->decode())
return 0;
Impl::De_Interleaver<Out_It> de_interleaving (dec->get_symbols(),
template <typename In_It, typename Out_It>
uint8_t Decoder<In_It, Out_It>::blocks() const
{
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
}
template <typename In_It, typename Out_It>
uint32_t Decoder<In_It, Out_It>::block_size (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0) * _symbol_size;
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1) * _symbol_size;
}
return 0;
}
template <typename In_It, typename Out_It>
uint16_t Decoder<In_It, Out_It>::symbol_size() const
{
return _symbol_size;
}
template <typename In_It, typename Out_It>
uint16_t Decoder<In_It, Out_It>::symbols (const uint8_t sbn) const
{
if (sbn < part.num (0)) {
return part.size (0);
} else if (sbn - part.num (0) < part.num (1)) {
return part.size (1);
}
return 0;
}