Newer
Older
* Copyright (c) 2015-2016, Luca Fulchir<luca@fulchir.it>, All rights reserved.
*
* This file is part of "libRaptorQ".
*
* libRaptorQ is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation, either version 3
* of the License, or (at your option) any later version.
*
* libRaptorQ is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* and a copy of the GNU Lesser General Public License
* along with libRaptorQ. If not, see <http://www.gnu.org/licenses/>.
*/
/////////////////////
//
// These templates are just a wrapper around the
// So if you want to see what the algorithm looks like,
// you are in the wrong place
//
/////////////////////
#include "RaptorQ/v1/Interleaver.hpp"
#include "RaptorQ/v1/De_Interleaver.hpp"
#include "RaptorQ/v1/Decoder.hpp"
#include "RaptorQ/v1/Encoder.hpp"
#include "RaptorQ/v1/Shared_Computation/Decaying_LF.hpp"
#include "RaptorQ/v1/Thread_Pool.hpp"
#include <cassert>
#include <future>
typedef uint64_t RQ_OTI_Common_Data;
typedef uint32_t RQ_OTI_Scheme_Specific_Data;
// maximum times a single block can be decoded at the same time.
// the decoder can be launched multiple times with different combinations
// of repair symbols. This can be useful as the decoding is actually
// probabilistic, and dropping a set of repair symbols *MIGHT* make things
// decodable again.
// keep this low. 1, 2, 3 should be ok.
static uint16_t max_block_decoder_concurrency = 1;
static const uint64_t max_data = 946270874880; // ~881 GB
const uint16_t min_subsymbol_size,
const uint16_t symbol_size,
const size_t max_memory)
_min_subsymbol (min_subsymbol_size),
interleave (_data_from,
_data_to,
_min_subsymbol,
_mem,
_symbol_size)
IS_RANDOM(Rnd_It, "RFC6330__v1::Encoder");
IS_FORWARD(Fwd_It, "RFC6330__v1::Encoder");
auto _alignment = sizeof(typename
std::iterator_traits<Rnd_It>::value_type);
assert(_symbol_size >= _alignment &&
"RaptorQ: symbol_size must be >= alignment");
assert((_symbol_size % _alignment) == 0 &&
"RaptorQ: symbol_size must be multiple of alignment");
assert(min_subsymbol_size >= _alignment &&
"RaptorQ: minimum subsymbol must be at least aligment");
assert(min_subsymbol_size <= _symbol_size &&
"RaptorQ: minimum subsymbol must be at most symbol_size");
assert((min_subsymbol_size % _alignment) == 0 &&
"RaptorQ: minimum subsymbol must be multiple of alignment");
assert((_symbol_size % min_subsymbol_size == 0) &&
"RaptorQ: symbol size must be multiple of subsymbol size");
if (static_cast<uint64_t> (data_to - data_from) *
sizeof(typename std::iterator_traits<Rnd_It>::value_type)
> max_data) {
pool_lock = std::make_shared<std::pair<std::mutex,
std::condition_variable>> ();
pool_last_reported = -1;
use_pool = true;
exiting = false;
auto part = interleave.get_partition();
return Block_Iterator<Rnd_It, Fwd_It> (this, part,
operator bool() const { return interleave; }
RQ_OTI_Common_Data OTI_Common() const;
RQ_OTI_Scheme_Specific_Data OTI_Scheme_Specific() const;
// TODO: introduce memory limits on threading ?
std::future<std::pair<Error, uint8_t>> compute (const Compute flags);
size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t esi,
size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t &id);
uint8_t blocks() const;
uint32_t block_size (const uint8_t sbn) const;
uint16_t symbol_size() const;
uint16_t symbols (const uint8_t sbn) const;
uint32_t max_repair (const uint8_t sbn) const;
static void wait_threads (Encoder<Rnd_It, Fwd_It> *obj, const Compute flags,
std::promise<std::pair<Error, uint8_t>> p);
class Block_Work : public Impl::Pool_Work {
public:
std::weak_ptr<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It>> work;
std::weak_ptr<std::pair<std::mutex, std::condition_variable>> notify;
Work_Exit_Status do_work (RaptorQ__v1::Work_State *state) override;
~Block_Work() override {}
};
// TODO: tagged pointer
class Enc {
Enc (Impl::Interleaver<Rnd_It> *interleaver, const uint8_t sbn)
enc = std::make_shared<
RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It>> (
std::shared_ptr<RaptorQ__v1::Impl::Raw_Encoder<Rnd_It, Fwd_It>> enc;
std::pair<Error, uint8_t> get_report (const Compute flags);
std::shared_ptr<std::pair<std::mutex, std::condition_variable>> pool_lock;
std::deque<std::thread> pool_wait;
std::map<uint8_t, Enc> encoders;
const size_t _mem;
const Rnd_It _data_from, _data_to;
const uint16_t _symbol_size;
bool use_pool, exiting;
int16_t pool_last_reported;
Loading full blame...