Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/*
* Copyright (c) 2015, Luca Fulchir<luca@fulchir.it>, All rights reserved.
*
* This file is part of "libRaptorQ".
*
* libRaptorQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3
* of the License, or (at your option) any later version.
*
* libRaptorQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* and a copy of the GNU Lesser General Public License
* along with libRaptorQ. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef RAPTORQ_HPP
#define RAPTORQ_HPP
/////////////////////
//
// These templates are just a wrapper around the
// functionalities offered by the RaptorQ::Impl namespace
// So if you want to see what the algorithm looks like,
// you are in the wrong place
//
/////////////////////
#include "Interleaver.hpp"
#include "De_Interleaver.hpp"
#include "Encoder.hpp"
#include "Decoder.hpp"
#include <array>
#include <cassert>
#include <cmath>
#include <iterator>
#include <map>
#include <memory>
#include <mutex>
#include <thread>
#include <tuple>
#include <type_traits>
#include <utility>
#include <vector>
namespace RaptorQ {
template <typename T>
class RAPTORQ_API Symbol
{
public:
Symbol (Encoder<T> *enc, const uint32_t esi, const uint8_t sbn)
: _enc (enc), _esi (esi), _sbn (sbn) {}
std::vector<T> operator()()
{
std::vector<T> ret;
(*this) (ret, 0);
return ret;
}
bool operator() (std::vector<T> &output, const size_t offset = 0)
{
return _enc->encode (output, _esi, _sbn);
}
bool operator() (T *output);
private:
Encoder<T> *_enc;
const uint32_t _esi;
const uint8_t _sbn;
};
Symbol_Iterator (Encoder<T> *enc, const uint32_t esi, const uint8_t sbn)
: _enc (enc), _esi (esi), _sbn (sbn) {}
Symbol<T> operator*()
return it._esi != _esi || it._sbn != _sbn;
}
private:
Encoder<T> *_enc;
uint32_t _esi;
const uint8_t _sbn;
};
template <typename T>
class RAPTORQ_API Block
{
public:
Block (Encoder<T> *enc, const uint16_t symbols, const uint8_t sbn)
: _enc (enc), _symbols (symbols), _sbn (sbn) {}
Symbol_Iterator<T> begin_source() const
{
return Symbol_Iterator<T> (_enc, 0, _sbn);
Symbol_Iterator<T> end_repair (const uint32_t max_repair) const
uint32_t max_r = max_repair;
if (max_repair >= std::pow (2, 20) - _symbols)
max_r = std::pow (2, 20) - _symbols;
return Symbol_Iterator<T> (_enc, _symbols + max_r, _sbn);
template <typename T>
class RAPTORQ_API Block_Iterator :
public std::iterator<std::input_iterator_tag, Block<T>>
Block_Iterator (Encoder<T> *enc, const Impl::Partition part, uint8_t sbn)
:_enc (enc), _part (part), _sbn (sbn) {}
Block<T> operator*()
if (_sbn > _part.num (0))
return Block<T> (_enc, _part.size (0), _sbn);
return Block<T> (_enc, _part.size (1), _sbn);
Block_Iterator ret = *this;
ret._sbn += i;
return ret;
}
bool operator== (const Block_Iterator it) const
{
return it._sbn == _sbn;
uint8_t _sbn;
};
static const uint64_t max_data = 946270874880;
typedef uint64_t OTI_Common_Data;
typedef uint32_t OTI_Scheme_Specific_Data;
template <typename T>
class RAPTORQ_API Encoder
{
public:
const uint16_t _symbol_size;
Encoder (std::shared_ptr<std::vector<T>> data,
const uint16_t min_subsymbol_size,
const uint16_t symbol_size,
const size_t max_memory)
: _data (data), _symbol_size (symbol_size),
_min_subsymbol (min_subsymbol_size), _mem (max_memory)
{
static_assert(std::is_unsigned<T>::value,
"RaptorQ::Encoder: can only be used with unsigned types");
// max size: between 2^39 and 2^40
if (data == nullptr || data->size() *sizeof(T) > max_data)
return;
interleave = std::unique_ptr<Impl::Interleaver<T>> (
new Impl::Interleaver<T> (data.get(),
return Block_Iterator<T> (this, interleave->get_partition(), 0);
auto part = interleave->get_partition();
return Block_Iterator<T> (this, part, part.num(0) + part.num(1));
bool operator()() const { return interleave != nullptr; }
OTI_Common_Data OTI_Common() const;
OTI_Scheme_Specific_Data OTI_Scheme_Specific() const;
void precompute (const uint8_t threads, const bool background);
bool encode (std::vector<T> &output, uint32_t esi, uint8_t sbn);
bool encode (std::vector<T> &output, uint32_t &id);
void free (const uint8_t sbn);
class RAPTORQ_LOCAL Locked_Encoder
{
public:
Locked_Encoder (const Impl::Interleaver<T> &symbols, const uint8_t SBN)
:_enc (symbols, SBN)
{}
std::mutex _mtx;
Impl::Encoder<T> _enc;
};
std::shared_ptr<std::vector<T>> _data;
std::unique_ptr<Impl::Interleaver<T>> interleave = nullptr;
std::map<uint8_t, std::shared_ptr<Locked_Encoder>> encoders;
const size_t _mem;
std::mutex _mtx;
const uint16_t _min_subsymbol;
static void precompute_block_all (Encoder<T> *obj, const uint8_t threads);
static void precompute_thread (Encoder<T> *obj, uint8_t *sbn,
const uint8_t single_sbn);
};
template <typename T, typename InputIterator>
class RAPTORQ_API Decoder
{
public:
// using shared pointers to avoid locking too much or
// worrying about deleting used stuff.
using Dec_ptr = std::shared_ptr<RaptorQ::Impl::Decoder<T>>;
// rfc 6330, pg 6
// easy explanation for OTI_* comes next.
// we do NOT use bitfields as compilators are not actually forced to put
// them in any particular order. meaning tey're useless.
//
//union OTI_Common_Data {
// uint64_t raw;
// struct {
// uint64_t size:40;
// uint8_t reserved:8;
// uint16_t symbol_size:16;
// };
//};
//union OTI_Scheme_Specific_Data {
// uint32_t raw;
// struct {
// uint8_t source_blocks;
// uint16_t sub_blocks;
// uint8_t alignment;
// };
//};
Decoder (const OTI_Common_Data common,const OTI_Scheme_Specific_Data scheme)
// see the above commented bitfields for quick reference
_symbol_size = static_cast<uint16_t> (common);
_sub_blocks = static_cast<uint16_t> (scheme >> 8);
_blocks = static_cast<uint8_t> (common >> 24);
assert (static_cast<uint8_t> (scheme) <= sizeof(T) &&
// (common >> 24) == total file size
const uint64_t size = common >> 24;
if (size > max_data)
return;
const uint64_t total_symbols = static_cast<uint64_t> (ceil (
part = Impl::Partition (total_symbols,
static_cast<uint8_t> (scheme >> 24));
//FIXME: check that the OSI and "part" agree on the data.
Decoder (uint16_t symbol_size, uint16_t sub_blocks, uint8_t blocks)
:_symbol_size (symbol_size), _sub_blocks (sub_blocks), _blocks (blocks)
{}
uint32_t decode (InputIterator &start, const InputIterator end);
uint32_t decode (InputIterator &start, const InputIterator end,
const uint8_t sbn);
// id: 8-bit sbn + 24 bit esi
bool add_symbol (const std::vector<T> &symbol, const uint32_t id);
bool add_symbol (const std::vector<T> &symbol, const uint32_t esi,
const uint8_t sbn);
void free (const uint8_t sbn);
private:
Impl::Partition part;
std::map<uint8_t, Dec_ptr> decoders;
std::mutex _mtx;
};
/////////////////
//
// Encoder
//
/////////////////
template <typename T>
OTI_Common_Data Encoder<T>::OTI_Common() const
template <typename T>
OTI_Scheme_Specific_Data Encoder<T>::OTI_Scheme_Specific() const
template <typename T>
size_t Encoder<T>::precompute_max_memory ()
{
// give a good estimate on the amount of memory neede for the precomputation
// of one block;
// this will help you understand how many concurrent precomputations
// you want to do :)
if (interleave == nullptr)
return 0;
uint16_t symbols = interleave->source_symbols(0);
uint16_t K_idx;
for (K_idx = 0; K_idx < Impl::K_padded.size(); ++K_idx) {
if (symbols < Impl::K_padded[K_idx])
break;
}
if (K_idx == Impl::K_padded.size())
return 0;
auto S_H = Impl::S_H_W[K_idx];
uint16_t matrix_cols = Impl::K_padded[K_idx] + std::get<0> (S_H) +
std::get<1> (S_H);
// Rough memory estimate: Matrix A, matrix X (=> *2) and matrix D.
return matrix_cols * matrix_cols * 2 + _symbol_size * matrix_cols;
template <typename T>
void Encoder<T>::precompute_thread (Encoder<T> *obj, uint8_t *sbn,
const uint8_t single_sbn)
// if "sbn" pointer is NOT nullptr, than we are a thread from
// from a precompute_block_all. This means that we need to update
// the value of sbn as soon as we get our work.
//
// if sbn == nullptr, then we have been called to work on a single
// sbn, and not from "precompute_block_all".
// This means we work on "single_sbn", and do not touch "sbn"
uint8_t *sbn_ptr = sbn;
if (sbn_ptr == nullptr)
sbn_ptr = const_cast<uint8_t*> (&single_sbn);
while (*sbn_ptr < obj->interleave->blocks()) {
obj->_mtx.lock();
if (*sbn_ptr >= obj->interleave->blocks()) {
obj->_mtx.unlock();
return;
}
auto it = obj->encoders.find (*sbn_ptr);
if (it == obj->encoders.end()) {
std::tie (it, success) = obj->encoders.insert ({*sbn_ptr,
std::make_shared<Locked_Encoder> (*obj->interleave, *sbn_ptr)
});
bool locked = enc_ptr->_mtx.try_lock();
if (sbn != nullptr)
++(*sbn);
obj->_mtx.unlock();
if (locked) { // if not locked, someone else is already waiting
// on this. so don't do the same work twice.
enc_ptr->_enc.generate_symbols();
enc_ptr->_mtx.unlock();
template <typename T>
void Encoder<T>::precompute (const uint8_t threads, const bool background)
{
if (background) {
std::thread t (precompute_block_all, this, threads);
t.detach();
} else {
return precompute_block_all (this, threads);
}
}
template <typename T>
void Encoder<T>::precompute_block_all (Encoder<T> *obj, const uint8_t threads)
{
// precompute all intermediate symbols, do it with more threads.
return;
std::vector<std::thread> t;
uint8_t spawned = threads - 1;
if (spawned == 0)
spawned = std::thread::hardware_concurrency();
if (spawned > 0)
t.reserve (spawned);
uint8_t sbn = 0;
// spawn n-1 threads
for (uint8_t id = 0; id < spawned; ++id)
// do the work ourselves
precompute_thread (obj, &sbn, 0);
// join other threads
for (uint8_t id = 0; id < spawned; ++id)
t[id].join();
}
template <typename T>
bool Encoder<T>::encode (std::vector<T> &output, uint32_t &id)
const uint32_t mask_8 = static_cast<uint32_t> (std::pow (2, 8)) - 1;
const uint32_t mask = ~(mask_8 << 24);
return encode (output, id & mask, static_cast<uint8_t> (id & mask_8));
template <typename T>
bool Encoder<T>::encode (std::vector<T> &output, uint32_t esi, uint8_t sbn)
_mtx.lock();
auto it = encoders.find (sbn);
if (it == encoders.end()) {
bool success;
std::tie (it, success) = encoders.emplace (sbn,
std::make_shared<Locked_Encoder> (*interleave, sbn));
std::thread background (precompute_thread, this, nullptr, sbn);
background.detach();
if (esi >= interleave->source_symbols (sbn)) {
// make sure we generated the intermediate symbols
enc_ptr->_mtx.lock();
enc_ptr->_enc.generate_symbols();
enc_ptr->_mtx.unlock();
}
template <typename T>
void Encoder<T>::free (const uint8_t sbn)
{
_mtx.lock();
auto it = encoders.find (sbn);
if (it != encoders.end())
encoders.erase (it);
_mtx.unlock();
}
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
/////////////////
//
// Decoder
//
/////////////////
template <typename T, typename InputIterator>
void Decoder<T, InputIterator>::free (const uint8_t sbn)
{
_mtx.lock();
auto it = decoders.find(sbn);
if (it != decoders.end())
decoders.erase(it);
_mtx.unlock();
}
template <typename T, typename InputIterator>
bool Decoder<T, InputIterator>::add_symbol (const std::vector<T> &symbol,
const uint32_t id)
{
union extract {
uint32_t raw;
struct {
uint8_t sbn;
uint32_t esi:24;
};
} extracted;
extracted.raw = id;
return add_symbol (symbol, extracted.esi, extracted.sbn);
}
template <typename T, typename InputIterator>
bool Decoder<T, InputIterator>::add_symbol (const std::vector<T> &symbol,
const uint32_t esi,
const uint8_t sbn)
{
if (sbn >= _blocks)
return false;
_mtx.lock();
auto it = decoders.find (sbn);
if (it == decoders.end()) {
const uint16_t symbols = sbn < part.num (0) ?
part.size(0) : part.size(1);
decoders.insert ({sbn, std::make_shared<Impl::Decoder<T>> (
symbols, _symbol_size)});
it = decoders.find (sbn);
}
auto dec = it->second;
_mtx.unlock();
return dec->add_symbol (esi, symbol);
}
template <typename T, typename InputIterator>
uint32_t Decoder<T, InputIterator>::decode (InputIterator &start,
const InputIterator end)
{
// TODO: incomplete decoding
bool missing = false;
for (uint8_t sbn = 0; sbn < _blocks; ++sbn) {
_mtx.lock();
auto it = decoders.find (sbn);
if (it == decoders.end()) {
missing = true;
continue;
}
auto dec = it->second;
_mtx.unlock();
if (!dec->decode())
return 0;
}
if (missing)
return 0;
uint32_t written = 0;
for (uint8_t sbn = 0; sbn < _blocks; ++sbn) {
_mtx.lock();
auto it = decoders.find (sbn);
if (it == decoders.end())
return written;
auto dec = it->second;
_mtx.unlock();
Impl::De_Interleaver<T, InputIterator> de_interleaving (
dec->get_symbols(), _sub_blocks);
written += de_interleaving (start, end);
}
return written;
}
template <typename T, typename InputIterator>
uint32_t Decoder<T, InputIterator>::decode (InputIterator &start,
const InputIterator end, const uint8_t sbn)
{
if (sbn >= _blocks)
return 0;
_mtx.lock();
auto it = decoders.find (sbn);
if (it == decoders.end()) {
_mtx.unlock();
return 0;
}
auto dec = it->second;
_mtx.unlock();
if (!dec->decode())
return 0;
Impl::De_Interleaver<T, InputIterator> de_interleaving (dec->get_symbols(),
_sub_blocks);
return de_interleaving (start, end);
}
} //RaptorQ
#endif