Skip to content
RaptorQ.hpp 22.4 KiB
Newer Older
Luker's avatar
Luker committed
/*
 * Copyright (c) 2015-2016, Luca Fulchir<luca@fulchir.it>, All rights reserved.
 *
 * This file is part of "libRaptorQ".
 *
 * libRaptorQ is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as
 * published by the Free Software Foundation, either version 3
 * of the License, or (at your option) any later version.
 *
 * libRaptorQ is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * and a copy of the GNU Lesser General Public License
 * along with libRaptorQ.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once

#include "RaptorQ/v1/common.hpp"
Luker's avatar
Luker committed
#include "RaptorQ/v1/RaptorQ_Iterators.hpp"
Luker's avatar
Luker committed
#include "RaptorQ/v1/Encoder.hpp"
#include "RaptorQ/v1/Decoder.hpp"
Luker's avatar
Luker committed
#include "RaptorQ/v1/Parameters.hpp"
Luker's avatar
Luker committed
#include <algorithm>
Luker's avatar
Luker committed
#include <atomic>
#include <deque>
Luker's avatar
Luker committed
#include <future>
Luker's avatar
Luker committed
#include <limits>
Luker's avatar
Luker committed
#include <memory>
Luker's avatar
Luker committed
#include <mutex>
Luker's avatar
Luker committed
#include <vector>
#include <utility>

Luker's avatar
Luker committed


namespace RaptorQ__v1 {
namespace Impl {


template <typename Rnd_It, typename Fwd_It>
class RAPTORQ_LOCAL Encoder
{
public:
	~Encoder();
    // used for precomputation
Luker's avatar
Luker committed
    Encoder (const uint16_t symbols, const size_t symbol_size);
Luker's avatar
Luker committed
    // with data at the beginning. Less work.
    Encoder (const Rnd_It data_from, const Rnd_It data_to,
Luker's avatar
Luker committed
													const size_t symbol_size);
Luker's avatar
Luker committed
	uint16_t symbols() const;
Luker's avatar
Luker committed
	size_t symbol_size() const;
Luker's avatar
Luker committed
	uint32_t max_repair() const;

    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> begin_source();
	RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> end_source();
    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> begin_repair();
    RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> end_repair
Luker's avatar
Luker committed
                                                        (const uint32_t repair);
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	size_t add_data (Rnd_It &from, const Rnd_It to);
Luker's avatar
Luker committed
    //TODO: end_of_input!
Luker's avatar
Luker committed
	void clear_data();
Luker's avatar
Luker committed
	bool compute_sync();
Luker's avatar
Luker committed
	size_t needed_bytes();
Luker's avatar
Luker committed

    std::future<Error> compute();
Luker's avatar
Luker committed
    size_t encode (Fwd_It &output, const Fwd_It end, const uint32_t id);
Luker's avatar
Luker committed

private:
	enum class Data_State : uint8_t {
		NEED_DATA = 1,	// first constructor used. no interleaver until FULL
		FULL = 2,
		INIT = 3	// second constructor used: we already have the interleaver
	};

    std::unique_ptr<RFC6330__v1::Impl::Interleaver<Rnd_It>> interleaver;
	Raw_Encoder<Rnd_It, Fwd_It> encoder;
	DenseMtx precomputed;
	std::vector<typename std::iterator_traits<Rnd_It>::value_type> data;
Luker's avatar
Luker committed
	std::thread waiting;
Luker's avatar
Luker committed
	std::mutex data_mtx;
Luker's avatar
Luker committed
    const size_t _symbol_size;
	const uint16_t _symbols;
Luker's avatar
Luker committed
    Data_State state;
Luker's avatar
Luker committed


Luker's avatar
Luker committed
	static size_t real_symbol_size (const size_t symbol_size);
Luker's avatar
Luker committed
	static uint16_t calc_symbols (const Rnd_It data_from, const Rnd_It data_to,
Luker's avatar
Luker committed
													const size_t symbol_size);
Luker's avatar
Luker committed
	static void compute_thread (Encoder<Rnd_It, Fwd_It> *obj,
														std::promise<Error> p);
Luker's avatar
Luker committed
};

template <typename In_It, typename Fwd_It>
class RAPTORQ_LOCAL Decoder
{
public:

	enum class RAPTORQ_LOCAL Report : uint8_t {
Luker's avatar
Luker committed
		PARTIAL_FROM_BEGINNING = RQ_COMPUTE_PARTIAL_FROM_BEGINNING,
		PARTIAL_ANY = RQ_COMPUTE_PARTIAL_ANY,
		COMPLETE = RQ_COMPUTE_COMPLETE
	~Decoder();
Luker's avatar
Luker committed
    Decoder (const uint16_t symbols, const size_t symbol_size,
Luker's avatar
Luker committed
															const Report type);

Luker's avatar
Luker committed
	uint16_t symbols() const;
Luker's avatar
Luker committed
	size_t symbol_size() const;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> begin();
    RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> end();
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	Error add_symbol (In_It &from, const In_It to, const uint32_t esi);
Luker's avatar
Luker committed
    void end_of_input();
Luker's avatar
Luker committed

	bool can_decode() const;
	void stop();
Luker's avatar
Luker committed
	uint16_t needed_symbols() const;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
    void set_max_concurrency (const uint16_t max_threads);
    using Decoder_Result = typename Raw_Decoder<In_It>::Decoder_Result;
    Decoder_Result decode_once();
Luker's avatar
Luker committed
	std::pair<Error, uint16_t> poll();
Luker's avatar
Luker committed
	std::pair<Error, uint16_t> wait_sync();
	std::future<std::pair<Error, uint16_t>> wait();
Luker's avatar
Luker committed

Luker's avatar
Luker committed

	Error decode_symbol (Fwd_It &start, const Fwd_It end,const uint16_t esi);
Luker's avatar
Luker committed
	// return number of bytes written
Luker's avatar
Luker committed
	std::pair<size_t, size_t> decode_bytes (Fwd_It &start, const Fwd_It end,
									const size_t from_byte, const size_t skip);
Luker's avatar
Luker committed
private:
Luker's avatar
Luker committed
    uint16_t _max_threads;
	const uint16_t _symbols;
    const size_t _symbol_size;
Luker's avatar
Luker committed
	std::atomic<uint32_t> last_reported;
Luker's avatar
Luker committed
	const Report _type;
	RaptorQ__v1::Work_State work;
Luker's avatar
Luker committed
	Raw_Decoder<In_It> dec;
Luker's avatar
Luker committed
	// 2* symbols. Actually tracks available and reported symbols.
	// each symbol gets 2 bool: 1= available, 2=reported
Luker's avatar
Luker committed
	std::deque<std::atomic<bool>> symbols_tracker;
	std::mutex _mtx;
	std::condition_variable _cond;
	std::vector<std::thread> waiting;
Luker's avatar
Luker committed

	static void waiting_thread (Decoder<In_It, Fwd_It> *obj,
									std::promise<std::pair<Error, uint16_t>> p);
Luker's avatar
Luker committed
};


///////////////////
//// Encoder
///////////////////

template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::~Encoder()
{
Luker's avatar
Luker committed
	encoder.stop();
Luker's avatar
Luker committed
	if (waiting.joinable())
		waiting.join();
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::Encoder (const uint16_t symbols,
Luker's avatar
Luker committed
													const size_t symbol_size)
	: interleaver (nullptr), encoder (symbols), _symbol_size (symbol_size),
                                                            _symbols (symbols)
Luker's avatar
Luker committed
{
	IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder");
	IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder");
Luker's avatar
Luker committed
    state = Data_State::NEED_DATA;
Luker's avatar
Luker committed
    using T = typename std::iterator_traits<Rnd_It>::value_type;
    const size_t tot_size = symbols * symbol_size;
    const size_t it_size = (tot_size / sizeof(T)) +
                                        (tot_size % sizeof(T) == 0 ? 0 : 1);
    data.reserve (it_size);
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::real_symbol_size (const size_t symbol_size)
Luker's avatar
Luker committed
{
	using T = typename std::iterator_traits<Rnd_It>::value_type;
	return symbol_size * sizeof(T);
}

template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::calc_symbols (const Rnd_It data_from,
													const Rnd_It data_to,
Luker's avatar
Luker committed
													const size_t symbol_size)
Luker's avatar
Luker committed
{
	using T = typename std::iterator_traits<Rnd_It>::value_type;

	uint64_t size = static_cast<uint64_t> (data_to - data_from) * sizeof(T);
	uint16_t symbols = static_cast<uint16_t> (size / symbol_size);
	if ((size % symbol_size) != 0)
		++symbols;
	return symbols;
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Encoder<Rnd_It, Fwd_It>::Encoder (const Rnd_It data_from, const Rnd_It data_to,
Luker's avatar
Luker committed
                                                    const size_t symbol_size)
Luker's avatar
Luker committed
    : interleaver (new RFC6330__v1::Impl::Interleaver<Rnd_It> (data_from,
Luker's avatar
Luker committed
									data_to, real_symbol_size(symbol_size),
									SIZE_MAX, real_symbol_size(symbol_size))),
	  encoder (interleaver.get(), 0),
Luker's avatar
Luker committed
                                _symbol_size (real_symbol_size(symbol_size)),
                                _symbols (calc_symbols (data_from, data_to,
												real_symbol_size(symbol_size)))
Luker's avatar
Luker committed
{
	IS_RANDOM(Rnd_It, "RaptorQ__v1::Encoder");
	IS_FORWARD(Fwd_It, "RaptorQ__v1::Encoder");
Luker's avatar
Luker committed
    state = Data_State::INIT;
}

template <typename Rnd_It, typename Fwd_It>
uint16_t Encoder<Rnd_It, Fwd_It>::symbols() const
{
	return _symbols;
}

template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::symbol_size() const
Luker's avatar
Luker committed
{
	return _symbol_size;
}

template <typename Rnd_It, typename Fwd_It>
uint32_t Encoder<Rnd_It, Fwd_It>::max_repair() const
{
Luker's avatar
Luker committed
    // you can have up to 56403 symbols in a block
    // rfc6330 limits you to 992173 repair symbols
    // but limits are meant to be broken!
    // the limit sould be up to 4294967279 repair symbols 2^32-(_param.S + H)
    // but people might misuse the API, and call end_repair(max_repair),
    // which would overflow.
    // We are sorry for taking away from you that 0.0014% of repair symbols.
    auto _param = Parameters (_symbols);
    return static_cast<uint32_t> (std::numeric_limits<uint32_t>::max() -
                                                                    _param.L);
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
                                        Encoder<Rnd_It, Fwd_It>::begin_source()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (this, 0);
Luker's avatar
Luker committed
}

template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
                                       Encoder<Rnd_It, Fwd_It>::end_source()
Luker's avatar
Luker committed
{
	return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (this,
																	_symbols);
}

template <typename Rnd_It, typename Fwd_It>
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
                                        Encoder<Rnd_It, Fwd_It>::begin_repair()
Luker's avatar
Luker committed
{
	return end_source();
}

template <typename Rnd_It, typename Fwd_It>
RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It>
Luker's avatar
Luker committed
                    Encoder<Rnd_It, Fwd_It>::end_repair (const uint32_t repair)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return RaptorQ__v1::It::Encoder::Symbol_Iterator<Rnd_It, Fwd_It> (nullptr,
Luker's avatar
Luker committed
                                                            _symbols + repair);
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::add_data (Rnd_It &from, const Rnd_It to)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	size_t written = 0;
Luker's avatar
Luker committed
	using T = typename std::iterator_traits<Rnd_It>::value_type;

	if (state != Data_State::NEED_DATA)
		return written;
Luker's avatar
Luker committed
	std::unique_lock<std::mutex> lock (data_mtx);
	RQ_UNUSED (lock);
Luker's avatar
Luker committed
	while (from != to) {
Luker's avatar
Luker committed
		data.emplace_back (*from);
		++from;
		++written;
Luker's avatar
Luker committed
		if (data.size() * sizeof (T) >=
							static_cast<uint64_t> (_symbols * _symbol_size)) {
Luker's avatar
Luker committed
			state = Data_State::FULL;
			break;
		}
	}
	return written;
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::clear_data()
{
	std::unique_lock<std::mutex> lock (data_mtx);
Luker's avatar
Luker committed
	encoder.clear_data();
Luker's avatar
Luker committed
	data.clear();
Luker's avatar
Luker committed
	state = Data_State::NEED_DATA;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
bool Encoder<Rnd_It, Fwd_It>::compute_sync()
{
Luker's avatar
Luker committed
	static RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING;
Luker's avatar
Luker committed
	if (state == Data_State::INIT) {
		return encoder.generate_symbols (&work);
	} else {
Luker's avatar
Luker committed
		if (precomputed.rows() != 0)
			return true;
Luker's avatar
Luker committed
		precomputed = encoder.get_precomputed (&work);
		return precomputed.rows() != 0;
	}
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
void Encoder<Rnd_It, Fwd_It>::compute_thread (
							Encoder<Rnd_It, Fwd_It> *obj, std::promise<Error> p)
{
	static RaptorQ__v1::Work_State work = RaptorQ__v1::Work_State::KEEP_WORKING;
	if (obj->state == Data_State::INIT) {
		if (obj->encoder.generate_symbols (&work)) {
			p.set_value (Error::NONE);
		} else {
			// only possible reason:
			p.set_value (Error::EXITING);
		}
	} else {
		if (obj->precomputed.rows() != 0) {
			p.set_value (Error::NONE);
			return;
		}
		obj->precomputed = obj->encoder.get_precomputed(&work);
		if (obj->precomputed.rows() != 0) {
			p.set_value (Error::NONE);
			return;
		}
		// only possible reason:
		p.set_value (Error::EXITING);
		return;
	}
}

template <typename Rnd_It, typename Fwd_It>
std::future<Error> Encoder<Rnd_It, Fwd_It>::compute()
{
	std::promise<Error> p;
	auto future = p.get_future();

	// only one waiting thread for the encoder
	if (waiting.joinable()) {
		p.set_value (Error::WORKING);
		return p.get_future();
	}
	waiting = std::thread (compute_thread, this, std::move(p));
	return future;
}

Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::encode (Fwd_It &output, const Fwd_It end,
Luker's avatar
Luker committed
															const uint32_t id)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	// returns number of iterators written
Luker's avatar
Luker committed
	switch (state) {
	case Data_State::INIT:
Luker's avatar
Luker committed
		if (!encoder.ready())
Luker's avatar
Luker committed
			return 0;
		return encoder.Enc (id, output, end);
	case Data_State::NEED_DATA:
Luker's avatar
Luker committed
		break;
Luker's avatar
Luker committed
	case Data_State::FULL:
Luker's avatar
Luker committed
		if (!encoder.ready()) {
Luker's avatar
Luker committed
			if (precomputed.rows() == 0) {
				return 0;
Luker's avatar
Luker committed
			} else if (interleaver == nullptr) {
Luker's avatar
Luker committed
				interleaver = std::unique_ptr<
							RFC6330__v1::Impl::Interleaver<Rnd_It>> (
									new RFC6330__v1::Impl::Interleaver<Rnd_It> (
													data.begin(), data.end(),
													_symbol_size, SIZE_MAX,
																_symbol_size));
			}
Luker's avatar
Luker committed
			encoder.generate_symbols (precomputed, interleaver.get());
Luker's avatar
Luker committed
		}
		return encoder.Enc (id, output, end);
	}
Luker's avatar
Luker committed
	return 0;
Luker's avatar
Luker committed
template <typename Rnd_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Encoder<Rnd_It, Fwd_It>::needed_bytes()
Luker's avatar
Luker committed
{
	using T = typename std::iterator_traits<Rnd_It>::value_type;
Luker's avatar
Luker committed
	return (_symbols * _symbol_size) - (data.size() * sizeof(T));
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

///////////////////
//// Decoder
///////////////////

template <typename In_It, typename Fwd_It>
Decoder<In_It, Fwd_It>::~Decoder ()
{
	work = RaptorQ__v1::Work_State::ABORT_COMPUTATION;
	_cond.notify_all();
	// wait threads to exit
	do {
		std::unique_lock<std::mutex> lock (_mtx);
		if (waiting.size() == 0)
			break;
		_cond.wait (lock);
		lock.unlock();
	} while (waiting.size() != 0);
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Decoder<In_It, Fwd_It>::Decoder (const uint16_t symbols,
Luker's avatar
Luker committed
								const size_t symbol_size, const Report type)
Luker's avatar
Luker committed
	:_symbols (symbols), _symbol_size (symbol_size), _type (type),
Luker's avatar
Luker committed
													dec (_symbols, symbol_size)
Luker's avatar
Luker committed
{
	IS_INPUT(In_It, "RaptorQ__v1::Decoder");
	IS_FORWARD(Fwd_It, "RaptorQ__v1::Decoder");
Luker's avatar
Luker committed

	last_reported.store (0);
	symbols_tracker = std::deque<std::atomic<bool>> (2 * _symbols);
	for(uint32_t idx = 0; idx < 2 * _symbols; ++idx)
		symbols_tracker[idx] = false;
Luker's avatar
Luker committed
	work = RaptorQ__v1::Work_State::KEEP_WORKING;
Luker's avatar
Luker committed
    _max_threads = 2;
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols() const
{
	return _symbols;
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Decoder<In_It, Fwd_It>::symbol_size() const
Luker's avatar
Luker committed
{
	return _symbol_size;
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It>
Luker's avatar
Luker committed
                                                Decoder<In_It, Fwd_It>::begin()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> (this, 0);
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It>
Luker's avatar
Luker committed
                                                Decoder<In_It, Fwd_It>::end()
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return RaptorQ__v1::It::Decoder::Symbol_Iterator<In_It, Fwd_It> (nullptr,
Luker's avatar
Luker committed
																_symbols);
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::needed_symbols() const
{
	return dec.needed_symbols();
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Error Decoder<In_It, Fwd_It>::add_symbol (In_It &from, const In_It to,
Luker's avatar
Luker committed
															const uint32_t esi)
{
	auto ret = dec.add_symbol (from, to, esi);
	if (ret == Error::NONE && esi < _symbols) {
Luker's avatar
Luker committed
		symbols_tracker [2 * esi].store (true);
		_cond.notify_all();
	}
	return ret;
}


template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
std::pair<Error, uint16_t> Decoder<In_It, Fwd_It>::poll ()
Luker's avatar
Luker committed
	uint32_t idx;
	uint32_t last;
	bool expected = false;
	switch (_type) {
	case Report::PARTIAL_FROM_BEGINNING:
Luker's avatar
Luker committed
		// report the number of symbols that are known, starting from
		// the beginning.
		last = last_reported.load();
		idx = last;
		for (; idx < symbols_tracker.size(); idx += 2) {
			if (symbols_tracker[idx].load() == true) {
				++idx;
				if (symbols_tracker[idx].load() == false)
					symbols_tracker[idx].store (true);
				--idx;
Luker's avatar
Luker committed
		idx /= 2;
		if (idx > last) {
			while (!last_reported.compare_exchange_weak (last, idx)) {
				// expected is now "last_reported.load()"
				if (last >= idx) {
					// other thread already reported more than us.
					// do not report things twice.
					if (dec.ready()) {
						last_reported.store (_symbols);
						return {Error::NONE, _symbols};
					}
Luker's avatar
Luker committed
                    if (dec.threads() > 0)
						return {Error::WORKING, 0};
Luker's avatar
Luker committed
					return {Error::NEED_DATA, 0};
				}
				// else we can report the new stuff
			}
			return {Error::NONE, idx};
		}
		// nothing to report
		if (dec.ready()) {
			last_reported.store (_symbols);
			return {Error::NONE, _symbols};
Luker's avatar
Luker committed
		}
Luker's avatar
Luker committed
        if (dec.threads() > 0)
			return {Error::WORKING, 0};
Luker's avatar
Luker committed
		return {Error::NEED_DATA, 0};
	case Report::PARTIAL_ANY:
Luker's avatar
Luker committed
		// report the first available, not yet reported.
		// or return {NONE, _symbols} if all have been reported
		if (dec.ready())
			return {Error::NONE, _symbols};
		for (idx = 0; idx < static_cast<uint32_t> (symbols_tracker.size());
																	idx += 2) {
			if (symbols_tracker[idx].load() == true) {
				++idx;
				if (symbols_tracker[idx].load() == false) {
					expected = false;
					if (symbols_tracker[idx].
									compare_exchange_strong (expected, true)) {
						return {Error::NONE, idx / 2};
					}	// else some other thread raced us, keep trying other
						// symbols
Luker's avatar
Luker committed
				}
Luker's avatar
Luker committed
		if (dec.ready())
Luker's avatar
Luker committed
			return {Error::NONE, _symbols};
Luker's avatar
Luker committed
        if (dec.threads() > 0)
			return {Error::WORKING, 0};
Luker's avatar
Luker committed
		return {Error::NEED_DATA, 0};
	case Report::COMPLETE:
Luker's avatar
Luker committed
        auto init = last_reported.load();
		idx = init * 2;
Luker's avatar
Luker committed
		for (; idx < symbols_tracker.size(); idx += 2) {
			if (symbols_tracker[idx].load() == false) {
Luker's avatar
Luker committed
                idx /= 2;
                while (!last_reported.compare_exchange_weak(init, idx))
                    idx = std::max(init, idx);
Luker's avatar
Luker committed
				if (dec.threads() > 0)
					return {Error::WORKING, 0};
				return {Error::NEED_DATA, 0};
			}
		}
Luker's avatar
Luker committed
		last_reported.store (_symbols);
		return {Error::NONE, 0};
	}
	return {Error::WORKING, 0};
}

template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::waiting_thread (Decoder<In_It, Fwd_It> *obj,
									std::promise<std::pair<Error, uint16_t>> p)
{
	while (obj->work == RaptorQ__v1::Work_State::KEEP_WORKING) {
Luker's avatar
Luker committed
        bool compute = obj->dec.add_concurrent (obj->_max_threads);
        if (compute) {
            obj->decode_once();
            std::unique_lock<std::mutex> lock (obj->_mtx);
            obj->dec.drop_concurrent();
            lock.unlock();
            obj->_cond.notify_all(); // notify other waiting threads
        }
		std::unique_lock<std::mutex> lock (obj->_mtx);
Luker's avatar
Luker committed
		// poll() does not actually need to be locked, but we use the
		// lock-wait mechanism to signal the arrival of new symbols,
Luker's avatar
Luker committed
		// so that we retry only when we get new data.
		auto res = obj->poll();
Luker's avatar
Luker committed
		if (res.first == Error::NONE || (obj->dec.end_of_input == true &&
                                            !obj->dec.can_decode()  &&
                                            obj->dec.threads() == 0 &&
                                                res.first == Error::NEED_DATA)){
			p.set_value (res);
			break;
		}
		obj->_cond.wait (lock);
		lock.unlock();
	}
Luker's avatar
Luker committed

	if (obj->work != RaptorQ__v1::Work_State::KEEP_WORKING)
		p.set_value ({Error::EXITING, 0});

	std::unique_lock<std::mutex> lock (obj->_mtx);
	RQ_UNUSED (lock);
	for (auto th = obj->waiting.begin(); th != obj->waiting.end(); ++th) {
Luker's avatar
Luker committed
		if (std::this_thread::get_id() == th->get_id()) {
			th->detach();
			obj->waiting.erase (th);
			break;
		}
	}
Luker's avatar
Luker committed
	lock.unlock();
	obj->_cond.notify_all(); // notify exit to destructor
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
std::pair<Error, uint16_t> Decoder<In_It, Fwd_It>::wait_sync ()
{
	std::promise<std::pair<Error, uint16_t>> p;
Luker's avatar
Luker committed
	auto fut = p.get_future();
	waiting_thread (this, std::move(p));
	fut.wait();
	return fut.get();
}

template <typename In_It, typename Fwd_It>
std::future<std::pair<Error, uint16_t>> Decoder<In_It, Fwd_It>::wait ()
{
	std::promise<std::pair<Error, uint16_t>> p;
	auto f = p.get_future();
	waiting.emplace_back (waiting_thread, this, std::move(p));
	return f;
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::end_of_input()
    { dec.end_of_input = true; }

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
bool Decoder<In_It, Fwd_It>::can_decode() const
Luker's avatar
Luker committed
    { return dec.can_decode(); }
Luker's avatar
Luker committed

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
void Decoder<In_It, Fwd_It>::set_max_concurrency (const uint16_t max_threads)
    { _max_threads = max_threads; }

template <typename In_It, typename Fwd_It>
typename Decoder<In_It, Fwd_It>::Decoder_Result
                                        Decoder<In_It, Fwd_It>::decode_once()
{
    auto res = dec.decode (&work);
    if (res == Decoder_Result::DECODED) {
        std::unique_lock<std::mutex> lock (_mtx);
        RQ_UNUSED (lock);
        if (_type != Report::COMPLETE) {
            uint32_t id = last_reported.load();
            for (; id < symbols_tracker.size(); id += 2)
                symbols_tracker[id].store (true);
        }
        last_reported.store(_symbols);
        lock.unlock();
    }
    return res;
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::stop()
{
	work = RaptorQ__v1::Work_State::ABORT_COMPUTATION;
	_cond.notify_all();
Luker's avatar
Luker committed
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
std::pair<size_t, size_t> Decoder<In_It, Fwd_It>::decode_bytes (Fwd_It &start,
Luker's avatar
Luker committed
													const Fwd_It end,
Luker's avatar
Luker committed
													const size_t from_byte,
Luker's avatar
Luker committed
													const size_t skip)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	using T = typename std::iterator_traits<Fwd_It>::value_type;
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	if (skip >= sizeof(T) || from_byte >=
Luker's avatar
Luker committed
							static_cast<size_t> (_symbols * _symbol_size)) {
Luker's avatar
Luker committed
		return {0, 0};
Luker's avatar
Luker committed
	}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	auto decoded = dec.get_symbols();

	uint16_t esi = static_cast<uint16_t> (from_byte /
Luker's avatar
Luker committed
											static_cast<size_t> (_symbol_size));
Luker's avatar
Luker committed
	uint16_t byte = static_cast<uint16_t> (from_byte %
Luker's avatar
Luker committed
											static_cast<size_t> (_symbol_size));
Luker's avatar
Luker committed

Luker's avatar
Luker committed
	size_t offset_al = skip;
Luker's avatar
Luker committed
	T element = static_cast<T> (0);
	if (skip != 0) {
		uint8_t *p = reinterpret_cast<uint8_t *> (&*start);
		for (size_t keep = 0; keep < skip; ++keep) {
			element += static_cast<T> (*(p++)) << keep * 8;
		}
	}
Luker's avatar
Luker committed
	size_t written = 0;
Luker's avatar
Luker committed
	while (start != end && esi < _symbols && dec.has_symbol (esi)) {
Luker's avatar
Luker committed
		element += static_cast<T> (static_cast<uint8_t> ((*decoded)(esi, byte)))
Luker's avatar
Luker committed
															<< offset_al * 8;
		++offset_al;
		if (offset_al == sizeof(T)) {
			*start = element;
Luker's avatar
Luker committed
			++start;
Luker's avatar
Luker committed
			written += offset_al;
			offset_al = 0;
			element = static_cast<T> (0);
		}
		++byte;
		if (byte == decoded->cols()) {
			byte = 0;
			++esi;
		}
	}
	if (start != end && offset_al != 0) {
		// we have more stuff in "element", but not enough to fill
Luker's avatar
Luker committed
		// the iterator. Do not overwrite additional data of the iterator.
		uint8_t *out = reinterpret_cast<uint8_t *> (&*start);
		uint8_t *in = reinterpret_cast<uint8_t *> (&element);
		for (size_t idx = 0; idx < offset_al; ++idx, ++out, ++in)
			*out = *in;
Luker's avatar
Luker committed
		written += offset_al;
	}
Luker's avatar
Luker committed
	return {written, offset_al};
Luker's avatar
Luker committed
}
Luker's avatar
Luker committed

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
Error Decoder<In_It, Fwd_It>::decode_symbol (Fwd_It &start, const Fwd_It end,
Luker's avatar
Luker committed
															const uint16_t esi)
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	auto start_copy = start;
Luker's avatar
Luker committed
	size_t esi_byte = esi * _symbol_size;
Luker's avatar
Luker committed

	auto pair = decode_bytes (start_copy, end, esi_byte, 0);
	if (pair.first == _symbol_size) {
		start = start_copy;
		return Error::NONE;
	}
	return Error::NEED_DATA;
Luker's avatar
Luker committed
}

}   // namespace Impl
}   // namespace RaptorQ__v1