Skip to content
RaptorQ.hpp 19.2 KiB
Newer Older
Luker's avatar
Luker committed
/*
 * Copyright (c) 2015-2016, Luca Fulchir<luca@fulchir.it>, All rights reserved.
 *
 * This file is part of "libRaptorQ".
 *
 * libRaptorQ is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation, either version 3
 * of the License, or (at your option) any later version.
 *
 * libRaptorQ is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * and a copy of the GNU Lesser General Public License
 * along with libRaptorQ.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once

#include "RaptorQ/v1/common.hpp"
Luker's avatar
Luker committed
#include "RaptorQ/v1/RaptorQ_Iterators.hpp"
Luker's avatar
Luker committed
#include "RaptorQ/v1/Encoder.hpp"
#include "RaptorQ/v1/Decoder.hpp"
Luker's avatar
Luker committed
#include <algorithm>
Luker's avatar
Luker committed
#include <future>
Luker's avatar
Luker committed
#include <mutex>
Luker's avatar
Luker committed


namespace RaptorQ__v1 {
namespace Impl {


template <typename Rnd_It, typename Fwd_It>
class RAPTORQ_LOCAL Encoder
{
public:
	~Encoder();
    // used for precomputation
    Encoder (const uint16_t symbols, const uint16_t symbol_size);
    // with data at the beginning. Less work.
    Encoder (const Rnd_It data_from, const Rnd_It data_to,
Luker's avatar
Luker committed
													const uint16_t symbol_size);
	uint16_t symbols() const;
	uint16_t symbol_size() const;
	uint32_t max_repair() const;

    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> begin_source();
	RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> end_source();
    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> begin_repair();
    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> end_repair
Luker's avatar
Luker committed
														(const uint32_t repair);
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	uint64_t add_data (Rnd_It &from, const Rnd_It to);
Luker's avatar
Luker committed
	void clear_data();
Luker's avatar
Luker committed
	bool compute_sync();
Luker's avatar
Luker committed
	uint64_t needed_bytes();
Luker's avatar
Luker committed

    std::future<Error> compute();
    uint64_t encode (Fwd_It &output, const Fwd_It end, const uint32_t &id);

private:
	enum class Data_State : uint8_t {
		NEED_DATA = 1,	// first constructor used. no interleaver until FULL
		FULL = 2,
		INIT = 3	// second constructor used: we already have the interleaver
	};

    std::unique_ptr<RFC6330__v1::Impl::Interleaver<Rnd_It>> interleaver;
	Raw_Encoder<Rnd_It, Fwd_It> encoder;
	DenseMtx precomputed;
	std::vector<typename std::iterator_traits<Rnd_It>::value_type> data;
Luker's avatar
Luker committed
	std::thread waiting;
Luker's avatar
Luker committed
	std::mutex data_mtx;
Luker's avatar
Luker committed
	const uint16_t _symbols, _symbol_size;
Luker's avatar
Luker committed
    Data_State state;
Luker's avatar
Luker committed


	static uint16_t real_symbol_size (const uint16_t symbol_size);
	static uint16_t calc_symbols (const Rnd_It data_from, const Rnd_It data_to,
													const uint16_t symbol_size);
Luker's avatar
Luker committed
	static void compute_thread (Encoder<Rnd_It, Fwd_It> *obj,
														std::promise<Error> p);
Luker's avatar
Luker committed
};

template <typename In_It, typename Fwd_It>
class RAPTORQ_LOCAL Decoder
{
public:

	enum class RAPTORQ_LOCAL Report : uint8_t {
		PARTIAL_FROM_BEGINNING = 1,
		PARTIAL_ANY = 2,
		COMPLETE = 3
	};

	~Decoder();
Luker's avatar
Luker committed
    Decoder (const uint64_t bytes, const uint16_t symbol_size,
															const Report type);

Luker's avatar
Luker committed
	uint16_t symbols() const;
	uint16_t symbol_size() const;

Luker's avatar
Luker committed
	RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> begin ();
    RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> end ();
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	Error add_symbol (In_It &from, const In_It to, const uint32_t esi);
Luker's avatar
Luker committed
	using Decoder_Result = typename Raw_Decoder<In_It>::Decoder_Result;

	bool can_decode() const;
	Decoder_Result decode();
	void stop();

Luker's avatar
Luker committed
	std::pair<Error, uint16_t> poll();
Luker's avatar
Luker committed
	std::pair<Error, uint16_t> wait_sync();
	std::future<std::pair<Error, uint16_t>> wait();
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	// returns number of iterators written
	uint64_t decode_symbol (Fwd_It &start, const Fwd_It end,const uint16_t esi);
Luker's avatar
Luker committed
	// return number of bytes written
Luker's avatar
Luker committed
	std::pair<uint64_t, size_t> decode_bytes (Fwd_It &start, const Fwd_It end,
Luker's avatar
Luker committed
									const size_t from_byte, const size_t skip);
private:
Luker's avatar
Luker committed
	const uint64_t _bytes;
	const uint16_t _symbols, _symbol_size;
	int32_t last_reported;
	const Report _type;
	RaptorQ__v1::Work_State work;
Luker's avatar
Luker committed
	Raw_Decoder<In_It> dec;
Luker's avatar
Luker committed
	// 2* symbols. Actually tracks available and reported symbols.
	// each symbol gets 2 bool: 1= available, 2=reported
	std::vector<bool> symbols_tracker;
	std::mutex _mtx;
	std::condition_variable _cond;
	std::vector<std::thread> waiting;
Luker's avatar
Luker committed

	static uint16_t get_symbols (const uint64_t bytes,
													const uint16_t symbol_size);
	static void waiting_thread (Decoder<In_It, Fwd_It> *obj,
									std::promise<std::pair<Error, uint16_t>> p);
Luker's avatar
Luker committed
};


///////////////////
//// Encoder
///////////////////

template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::~Encoder()
{
Luker's avatar
Luker committed
	encoder.stop();
Luker's avatar
Luker committed
	if (waiting.joinable())
		waiting.join();
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::Encoder (const uint16_t symbols,
													const uint16_t symbol_size)
Luker's avatar
Luker committed
	: interleaver (nullptr), encoder (symbols), _symbols (symbols),
													_symbol_size (symbol_size)
Luker's avatar
Luker committed
{
	IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder");
	IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder");
Luker's avatar
Luker committed
    state = Data_State::NEED_DATA;
}

template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::real_symbol_size (const uint16_t symbol_size)
{
	using T = typename std::iterator_traits<Rnd_It>::value_type;
	return symbol_size * sizeof(T);
}

template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::calc_symbols (const Rnd_It data_from,
													const Rnd_It data_to,
													const uint16_t symbol_size)
{
	using T = typename std::iterator_traits<Rnd_It>::value_type;

	uint64_t size = static_cast<uint64_t> (data_to - data_from) * sizeof(T);
	uint16_t symbols = static_cast<uint16_t> (size / symbol_size);
	if ((size % symbol_size) != 0)
		++symbols;
	return symbols;
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::Encoder (const Rnd_It data_from, const Rnd_It data_to,
                                            const uint16_t symbol_size)
    : interleaver (new RFC6330__v1::Impl::Interleaver<Rnd_It> (data_from,
Luker's avatar
Luker committed
									data_to, real_symbol_size(symbol_size),
									SIZE_MAX, real_symbol_size(symbol_size))),
	  encoder (interleaver.get(), 0),
		_symbols (calc_symbols (data_from, data_to,
												real_symbol_size(symbol_size))),
		_symbol_size (real_symbol_size(symbol_size))
Luker's avatar
Luker committed
{
	IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder");
	IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder");
Luker's avatar
Luker committed
    state = Data_State::INIT;
}

template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbols() const
{
	return _symbols;
}

template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbol_size() const
{
	return _symbol_size;
}

template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::max_repair() const
{
	return static_cast<uint32_t> (std::pow(2, 20)) - _symbols;
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
										Encoder<Rnd_It, Fwd_It>::begin_source()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (this, 0);
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
										Encoder<Rnd_It, Fwd_It>::end_source()
{
	return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (this,
																	_symbols);
}

template <typename Rnd_It, typename Fwd_It>
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
										Encoder<Rnd_It, Fwd_It>::begin_repair()
{
	return end_source();
}

template <typename Rnd_It, typename Fwd_It>
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
					Encoder<Rnd_It, Fwd_It>::end_repair (const uint32_t repair)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (nullptr,
Luker's avatar
Luker committed
															_symbols + repair);
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
uint64_t Encoder<Rnd_It, Fwd_It>::add_data (Rnd_It &from, const Rnd_It to)
Luker's avatar
Luker committed
{
	uint64_t written = 0;
	using T = typename std::iterator_traits<Rnd_It>::value_type;

	if (state != Data_State::NEED_DATA)
		return written;
Luker's avatar
Luker committed
	std::unique_lock<std::mutex> lock (data_mtx);
	RQ_UNUSED (lock);
Luker's avatar
Luker committed
	while (from != to) {
Luker's avatar
Luker committed
		data.emplace_back (*from);
		++from;
		++written;
Luker's avatar
Luker committed
		if ((data.size() * sizeof (T) >= _symbols * _symbol_size)) {
			state = Data_State::FULL;
			break;
		}
	}
	return written;
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::clear_data()
{
	std::unique_lock<std::mutex> lock (data_mtx);
Luker's avatar
Luker committed
	encoder.clear_data();
Luker's avatar
Luker committed
	data.clear();
Luker's avatar
Luker committed
	state = Data_State::NEED_DATA;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
bool Encoder<Rnd_It, Fwd_It>::compute_sync()
{
Luker's avatar
Luker committed
	static RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING;
Luker's avatar
Luker committed
	if (state == Data_State::INIT) {
		return encoder.generate_symbols (&work);
	} else {
Luker's avatar
Luker committed
		if (precomputed.rows() != 0)
			return true;
Luker's avatar
Luker committed
		precomputed = encoder.get_precomputed (&work);
		return precomputed.rows() != 0;
	}
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::compute_thread (
							Encoder<Rnd_It, Fwd_It> *obj, std::promise<Error> p)
{
	static RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING;
	if (obj->state == Data_State::INIT) {
		if (obj->encoder.generate_symbols (&work)) {
			p.set_value (Error::NONE);
		} else {
			// only possible reason:
			p.set_value (Error::EXITING);
		}
	} else {
		if (obj->precomputed.rows() != 0) {
			p.set_value (Error::NONE);
			return;
		}
		obj->precomputed = obj->encoder.get_precomputed(&work);
		if (obj->precomputed.rows() != 0) {
			p.set_value (Error::NONE);
			return;
		}
		// only possible reason:
		p.set_value (Error::EXITING);
		return;
	}
}

template <typename Rnd_It, typename Fwd_It>
std::future<Error> Encoder<Rnd_It, Fwd_It>::compute()
{
	std::promise<Error> p;
	auto future = p.get_future();

	// only one waiting thread for the encoder
	if (waiting.joinable()) {
		p.set_value (Error::WORKING);
		return p.get_future();
	}
	waiting = std::thread (compute_thread, this, std::move(p));
	return future;
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
uint64_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
															const uint32_t &id)
{
Luker's avatar
Luker committed
	// returns number of iterators written
Luker's avatar
Luker committed
	switch (state) {
	case Data_State::INIT:
Luker's avatar
Luker committed
		if (!encoder.ready())
Luker's avatar
Luker committed
			return 0;
		return encoder.Enc (id, output, end);
	case Data_State::NEED_DATA:
Luker's avatar
Luker committed
		break;
Luker's avatar
Luker committed
	case Data_State::FULL:
Luker's avatar
Luker committed
		if (!encoder.ready()) {
Luker's avatar
Luker committed
			if (precomputed.rows() == 0) {
				return 0;
Luker's avatar
Luker committed
			} else if (interleaver == nullptr) {
Luker's avatar
Luker committed
				interleaver = std::unique_ptr<
							RFC6330__v1::Impl::Interleaver<Rnd_It>> (
									new RFC6330__v1::Impl::Interleaver<Rnd_It> (
													data.begin(), data.end(),
													_symbol_size, SIZE_MAX,
																_symbol_size));
			}
Luker's avatar
Luker committed
			encoder.generate_symbols (precomputed, interleaver.get());
Luker's avatar
Luker committed
		}
		return encoder.Enc (id, output, end);
	}
Luker's avatar
Luker committed
	return 0;
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
uint64_t Encoder<Rnd_It, Fwd_It>::needed_bytes()
{
	using T = typename std::iterator_traits<Rnd_It>::value_type;
Luker's avatar
Luker committed
	return (_symbols * _symbol_size) - (data.size() * sizeof(T));
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

///////////////////
//// Decoder
///////////////////

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
uint16_t Decoder<In_It, Fwd_It>::get_symbols (const uint64_t bytes,
													const uint16_t symbol_size)
Luker's avatar
Luker committed
{
	uint16_t symbols = static_cast<uint16_t> (bytes / symbol_size);
	if (bytes % symbol_size != 0)
		++symbols;
Luker's avatar
Luker committed
	return symbols;
}

template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::~Decoder ()
{
	work = RaptorQ__v1::Work_State::ABORT_COMPUTATION;
	_cond.notify_all();
	// wait threads to exit
	do {
		std::unique_lock<std::mutex> lock (_mtx);
		if (waiting.size() == 0)
			break;
		_cond.wait (lock);
		lock.unlock();
	} while (waiting.size() != 0);
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::Decoder (const uint64_t bytes,
								const uint16_t symbol_size, const Report type)
Luker's avatar
Luker committed
	:_bytes (bytes), _symbols (get_symbols (bytes, symbol_size)),
									_symbol_size (symbol_size), _type (type),
													dec (_symbols, symbol_size)
Luker's avatar
Luker committed
{
	IS_INPUT(In_It, "RaptorQ__v1::Decoder");
	IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder");
	symbols_tracker = std::vector<bool> (2 * _symbols, false);
Luker's avatar
Luker committed
	last_reported = -1;
Luker's avatar
Luker committed
	work = RaptorQ__v1::Work_State::KEEP_WORKING;
}

template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols() const
{
	return _symbols;
}

template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
{
	return _symbol_size;
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It>
Luker's avatar
Luker committed
												Decoder<In_It, Fwd_It>::begin()
{
Luker's avatar
Luker committed
	return RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> (this, 0);
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It>
												Decoder<In_It, Fwd_It>::end()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> (nullptr,
Luker's avatar
Luker committed
																	_symbols);
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &from, const In_It to,
Luker's avatar
Luker committed
															const uint32_t esi)
{
	auto ret = dec.add_symbol (from, to, esi);
	if (ret == Error::NONE && esi < _symbols) {
		symbols_tracker [2 * esi] = true;
		std::unique_lock<std::mutex> lock (_mtx);
		RQ_UNUSED (lock);
		_cond.notify_all();
	}
	return ret;
}


template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
std::pair<Error, uint16_t> Decoder<In_It, Fwd_It>::poll ()
Luker's avatar
Luker committed
	std::unique_lock<std::mutex> lock (_mtx, std::defer_lock);
Luker's avatar
Luker committed
	uint32_t id;
Luker's avatar
Luker committed
	uint32_t to_report;
	switch (_type) {
	case Report::PARTIAL_FROM_BEGINNING:
Luker's avatar
Luker committed
		lock.lock();
Luker's avatar
Luker committed
		id = static_cast<uint32_t> (std::max (0, last_reported));
Luker's avatar
Luker committed
		to_report = 0;
		for (; id < symbols_tracker.size(); id += 2) {
			if (symbols_tracker[id] == true) {
				++id;
Luker's avatar
Luker committed
				if (symbols_tracker[id] == false) {
Luker's avatar
Luker committed
					symbols_tracker[id] = true;
					++to_report;
Luker's avatar
Luker committed
				}
Luker's avatar
Luker committed
		if (to_report > 0 || dec.ready()) {
Luker's avatar
Luker committed
			last_reported += to_report;
			return {Error::NONE, last_reported};
		}
		if (!dec.can_decode())
			return {Error::NEED_DATA, 0};
		return {Error::WORKING, 0};
	case Report::PARTIAL_ANY:
Luker's avatar
Luker committed
		for (id = 0; id < symbols_tracker.size(); id += 2) {
			if (symbols_tracker[id] == true) {
				++id;
Luker's avatar
Luker committed
				lock.lock();
				if (symbols_tracker[id] == false) {
					symbols_tracker[id] = true;
					return {Error::NONE, id / 2};
Luker's avatar
Luker committed
				}
Luker's avatar
Luker committed
		if (dec.ready())
			return {Error::NONE, 0};
		if (dec.can_decode())
			return {Error::NEED_DATA, 0};
		return {Error::WORKING, 0};
	case Report::COMPLETE:
Luker's avatar
Luker committed
		// "last_reported" is only used in this function, so now we can
		// make it mean "last index in symbols_tracker", instead of
		// "last reported symbol number"
		id = static_cast<uint32_t> (std::max (0, last_reported));
		for (; id < symbols_tracker.size(); id += 2) {
			if (symbols_tracker[id] == false) {
				if (dec.can_decode())
					return {Error::WORKING, 0};
				return {Error::NEED_DATA, 0};
			}
		}
Luker's avatar
Luker committed
		last_reported = static_cast<int32_t> (symbols_tracker.size());
		return {Error::NONE, 0};
	}
	return {Error::WORKING, 0};
}

template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::waiting_thread (Decoder<In_It, Fwd_It> *obj,
									std::promise<std::pair<Error, uint16_t>> p)
{
Luker's avatar
Luker committed
	// return on not enough data or finished computation.
	while (obj->work == RaptorQ__v1::Work_State::KEEP_WORKING) {
		std::unique_lock<std::mutex> lock (obj->_mtx);
		auto res = obj->poll();
Luker's avatar
Luker committed
		if (res.first == Error::NONE || res.first == Error::NEED_DATA) {
			p.set_value (res);
			break;
		}
		obj->_cond.wait (lock);
		res = obj->poll();
		lock.unlock();
Luker's avatar
Luker committed
		if (res.first == Error::NONE || res.first == Error::NEED_DATA) {
			p.set_value (res);
			break;
		}
	}
	if (obj->work != RaptorQ__v1::Work_State::KEEP_WORKING)
		p.set_value ({Error::EXITING, 0});

	std::unique_lock<std::mutex> lock (obj->_mtx);
	RQ_UNUSED (lock);
	for (auto th = obj->waiting.begin(); th != obj->waiting.end(); ++th) {
Luker's avatar
Luker committed
		if (std::this_thread::get_id() == th->get_id()) {
			th->detach();
			obj->waiting.erase (th);
			break;
		}
	}
	obj->_cond.notify_all(); // notify exit to destructor
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
std::pair<Error, uint16_t> Decoder<In_It, Fwd_It>::wait_sync ()
{
	std::promise<std::pair<Error, uint16_t>> p;
Luker's avatar
Luker committed
	auto fut = p.get_future();
	waiting_thread (this, std::move(p));
	fut.wait();
	return fut.get();
}

template <typename In_It, typename Fwd_It>
std::future<std::pair<Error, uint16_t>> Decoder<In_It, Fwd_It>::wait ()
{
	std::promise<std::pair<Error, uint16_t>> p;
	auto f = p.get_future();
	waiting.emplace_back (waiting_thread, this, std::move(p));
	return f;
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
bool Decoder<In_It, Fwd_It>::can_decode() const
{
	return dec.can_decode();
}

template <typename In_It, typename Fwd_It>
typename Decoder<In_It, Fwd_It>::Decoder_Result Decoder<In_It, Fwd_It>::decode()
{
	auto res = dec.decode (&work);
	if (res == Decoder_Result::DECODED) {
		std::unique_lock<std::mutex> lock (_mtx);
Luker's avatar
Luker committed
		RQ_UNUSED (lock);
		if (_type != Report::COMPLETE) {
			uint32_t id = static_cast<uint32_t> (std::max (0, last_reported));
			for (; id < symbols_tracker.size(); id += 2)
				symbols_tracker[id] = true;
		}
		_cond.notify_all();
	}
	return res;
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::stop()
{
	work = RaptorQ__v1::Work_State::ABORT_COMPUTATION;
	std::unique_lock<std::mutex> lock (_mtx);
	_cond.notify_all();
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
std::pair<uint64_t, size_t> Decoder<In_It, Fwd_It>::decode_bytes (Fwd_It &start,
													const Fwd_It end,
													const uint64_t from_byte,
													const size_t skip)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	using T = typename std::iterator_traits<Fwd_It>::value_type;
	if (!dec.ready() || skip >= sizeof(T))	// !dec.ready()
Luker's avatar
Luker committed
		return {0, 0};
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	auto decoded = dec.get_symbols();

	uint16_t esi = static_cast<uint16_t> (from_byte /
									static_cast<uint64_t> (decoded->cols()));
	uint16_t byte = static_cast<uint16_t> (from_byte %
									static_cast<uint64_t> (decoded->cols()));

Luker's avatar
Luker committed
	size_t offset_al = skip;
Luker's avatar
Luker committed
	T element = static_cast<T> (0);
	if (skip != 0) {
		uint8_t *p = reinterpret_cast<uint8_t *> (&*start);
		for (size_t keep = 0; keep < skip; ++keep) {
			element += static_cast<T> (*(p++)) << keep * 8;
		}
	}
Luker's avatar
Luker committed
	uint64_t written = 0;
Luker's avatar
Luker committed
	while (start != end && esi < decoded->rows() &&
									_bytes > from_byte + written + offset_al) {
		element += static_cast<T> (static_cast<uint8_t> ((*decoded)(esi, byte)))
Luker's avatar
Luker committed
															<< offset_al * 8;
		++offset_al;
		if (offset_al == sizeof(T)) {
			*start = element;
Luker's avatar
Luker committed
			++start;
Luker's avatar
Luker committed
			written += offset_al;
			offset_al = 0;
			element = static_cast<T> (0);
		}
		++byte;
		if (byte == decoded->cols()) {
			byte = 0;
			++esi;
		}
	}
	if (start != end && offset_al != 0) {
		// we have more stuff in "element", but not enough to fill
		// the iterator.
		*start = element;
		written += offset_al;
	}
Luker's avatar
Luker committed
	return {written, offset_al};
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::decode_symbol (Fwd_It &start, const Fwd_It end,
Luker's avatar
Luker committed
															const uint16_t esi)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	assert ((_symbol_size %
			sizeof(typename std::iterator_traits<Fwd_It>::value_type)) == 0);
Luker's avatar
Luker committed
	if (!dec.ready())
		return 0;
Luker's avatar
Luker committed
	uint64_t esi_byte = esi * _symbol_size;
Luker's avatar
Luker committed
	auto pair = decode_bytes (start, end, esi_byte, 0);
	assert (pair.second == 0 );
	return pair.first / _symbol_size;
Luker's avatar
Luker committed
}

}   // namespace Impl
}   // namespace RaptorQ__v1