Skip to content
RaptorQ.hpp 34.7 KiB
Newer Older
				std::unique_ptr<Block_Work> work =std::unique_ptr<Block_Work>(
															new Block_Work());
				work->work = dec;
				work->notify = pool_lock;
				Impl::Thread_Pool::get().add_work (std::move(work));
			}
		}
	}
	lock.unlock();

	// spawn thread waiting for other thread exit.
	// this way we can set_value to the future when needed.
	auto future = p.get_future();
	if (Compute::NONE != (flags & Compute::NO_BACKGROUND)) {
		wait_threads (this, flags, std::move(p));
	} else {
		std::unique_lock<std::mutex> pool_wait_lock (_mtx);
		UNUSED(pool_wait_lock);
		pool_wait.emplace_back(wait_threads, this, flags, std::move(p));

	}
	return future;
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
void Decoder<In_It, Fwd_It>::wait_threads (Decoder<In_It, Fwd_It> *obj,
									const Compute flags,
									std::promise<std::pair<Error, uint8_t>> p)
Luker's avatar
Luker committed
{
	do {
		if (obj->exiting) {	// make sure we can exit
			p.set_value ({Error::NONE, 0});
			break;
		}
		// pool is global (static), so wait only for our stuff.
		std::unique_lock<std::mutex> lock (obj->pool_lock->first);
		if (obj->exiting) { // make sure we can exit
			p.set_value ({Error::NONE, 0});
			break;
		}
		auto status = obj->get_report (flags);
		if (Error::WORKING != status.first) {
			p.set_value (status);
			break;
Luker's avatar
Luker committed
		}

		obj->pool_lock->second.wait (lock); // conditional wait
		if (obj->exiting) {	// make sure we can exit
			p.set_value ({Error::NONE, 0});
			break;
		}
		status = obj->get_report (flags);
		lock.unlock();
		if (Error::WORKING != status.first) {
			p.set_value (status);
			break;
		}
	} while (true);

	// delete ourselves from the waiting thread vector.
	std::unique_lock<std::mutex> lock (obj->_mtx);
	UNUSED (lock);
	for (auto it = obj->pool_wait.begin(); it != obj->pool_wait.end(); ++it) {
		if (it->get_id() == std::this_thread::get_id()) {
			it->detach();
			obj->pool_wait.erase (it);
		}
	}
}

template <typename In_It, typename Fwd_It>
std::pair<Error, uint8_t> Decoder<In_It, Fwd_It>::get_report (
														const Compute flags)
{
	if (Compute::NONE != (flags & Compute::COMPLETE) ||
			Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING)) {
		auto it = decoders.begin();
		for (; it != decoders.end(); ++it) {
			auto ptr = it->second.dec;
			if (ptr != nullptr && !ptr->ready())
				break;
		}
		if (it == decoders.end()) {
			pool_last_reported = static_cast<int16_t> (decoders.size() - 1);
			return {Error::NONE, pool_last_reported};
		}
		if (Compute::NONE != (flags & Compute::PARTIAL_FROM_BEGINNING) &&
									(pool_last_reported < (it->first - 1))) {
			pool_last_reported = it->first - 1;
			return {Error::NONE, pool_last_reported};
		}
		return {Error::WORKING, 0};
	}
	if (Compute::NONE != (flags & Compute::PARTIAL_ANY)) {
		for (auto &it : decoders) {
			if (!it.second.reported) {
				auto ptr = it.second.dec;
				if (ptr != nullptr && ptr->ready()) {
					return {Error::NONE, it.first};
				}
			}
		}
Luker's avatar
Luker committed
	}
	// can be reached if computing thread was stopped
	return {Error::WORKING, 0};
}

template <typename In_It, typename Fwd_It>
std::pair<size_t, uint8_t> Decoder<In_It, Fwd_It>::decode (Fwd_It &start,
														const Fwd_It end,
														const uint8_t skip)
{
	// Decode from the beginning, up untill we can.

Luker's avatar
Luker committed
	uint64_t written = 0;
	uint8_t new_skip = skip;
	for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
		std::unique_lock<std::mutex> block_lock (_mtx);
Luker's avatar
Luker committed
		auto it = decoders.find (sbn);
		if (it == decoders.end())
			return {written, new_skip};
		auto dec_ptr = it->second.dec;
		block_lock.unlock();

		if (!dec_ptr->ready()) {
			if (!use_pool && dec_ptr->can_decode()) {
				Work_State state = Work_State::KEEP_WORKING;
				auto ret = dec_ptr->decode (&state);
				if (Impl::Decoder<In_It>::Decoder_Result::DECODED != ret)
					return {written, new_skip};;
			} else {
				return {written, new_skip};
			}
		}

		Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
													_sub_blocks, _alignment);
Luker's avatar
Luker committed
		auto tmp_start = start;
		uint64_t tmp_written = de_interleaving (tmp_start, end, new_skip);
Luker's avatar
Luker committed
		if (tmp_written == 0)
			return {written, new_skip};
		written += static_cast<size_t> (tmp_written);
		new_skip = block_size (sbn) %
				sizeof(typename std::iterator_traits<Fwd_It>::value_type);
Luker's avatar
Luker committed
		// if we ended decoding in the middle of a Fwd_It, do not advance
		// start too much, or we will end up having additional zeros.
		if (new_skip == 0) {
Luker's avatar
Luker committed
			start = tmp_start;
		} else {
			// tmp_written > 0 due to earlier return
			// moreover, RaptorQ handles at most 881GB per rfc, so
			// casting uint64 to int64 is safe
			// we can not do "--start" since it's a forward iterator
			start += static_cast<int64_t> (tmp_written - 1);
		}
		written += tmp_written;
Luker's avatar
Luker committed
	}
	return {written, new_skip};
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::decode_block (Fwd_It &start, const Fwd_It end,
Luker's avatar
Luker committed
															const uint8_t sbn)
Luker's avatar
Luker committed
{
	if (sbn >= _blocks)
		return 0;

	std::shared_ptr<RaptorQ__v1::Impl::Decoder<In_It>> dec_ptr = nullptr;
	std::unique_lock<std::mutex> lock (_mtx);
Luker's avatar
Luker committed
	auto it = decoders.find (sbn);

	if (it == decoders.end())
		return 0;	// did not receiveany data yet.

	if (use_pool) {
		dec_ptr = it->second.dec;
		lock.unlock();
		if (!dec_ptr->ready())
			return 0;	// did not receive enough data, or could not decode yet.
	} else {
		dec_ptr = it->second.dec;
		lock.unlock();
		if (!dec_ptr->ready()) {
			if (!dec_ptr->can_decode())
				return 0;
			Work_State keep_working = Work_State::KEEP_WORKING;
			dec_ptr->decode (&keep_working);
			if (!dec_ptr->ready())
				return 0;
		}
	}
	// decoder has decoded the block
Luker's avatar
Luker committed

	Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
													_sub_blocks, _alignment);
Luker's avatar
Luker committed
	return de_interleaving (start, end);
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::bytes() const
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return static_cast<uint8_t> (part.num (0) + part.num (1));
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint32_t Decoder<In_It, Fwd_It>::block_size (const uint8_t sbn) const
Luker's avatar
Luker committed
{
	if (sbn < part.num (0)) {
		return part.size (0) * _symbol_size;
	} else if (sbn - part.num (0) < part.num (1)) {
		return part.size (1) * _symbol_size;
	}
	return 0;
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
Luker's avatar
Luker committed
{
	return _symbol_size;
}
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols (const uint8_t sbn) const
Luker's avatar
Luker committed
{
	if (sbn < part.num (0)) {
		return part.size (0);
	} else if (sbn - part.num (0) < part.num (1)) {
		return part.size (1);
	}
	return 0;
}
Luker's avatar
Luker committed

Luker's avatar
Luker committed
}	// RaptorQ__v1