Skip to content
RFC.hpp 39.5 KiB
Newer Older
Luker's avatar
Luker committed
			break;
Luker's avatar
Luker committed
    lock.unlock();
Luker's avatar
Luker committed
	_notify->notify_all();
}

template <typename In_It, typename Fwd_It>
std::pair<Error, uint8_t> Decoder<In_It, Fwd_It>::get_report (
Luker's avatar
Luker committed
                                                            const Compute flags)
Luker's avatar
Luker committed
    if (decoders.size() == 0)
        return {Error::WORKING, 0};
Luker's avatar
Luker committed
	if (Compute::COMPLETE == (flags & Compute::COMPLETE) ||
			Compute::PARTIAL_FROM_BEGINNING ==
									(flags & Compute::PARTIAL_FROM_BEGINNING)) {
        uint16_t reportable = 0;
        uint16_t next_expected = static_cast<uint16_t> (pool_last_reported + 1);
        std::unique_lock<std::mutex> dec_lock (_mtx);
        auto it = decoders.lower_bound (static_cast<uint8_t> (next_expected));

Luker's avatar
Luker committed
		// get last reportable block
		for (; it != decoders.end(); ++it) {
            auto id = it->first;
            if (id != next_expected)
                break; // not consecutive
			auto ptr = it->second.dec;
            if (ptr == nullptr) {
                assert(false && "RFC6330: decoder should never be nullptr.");
                break;
            }
            if (!ptr->ready()) {
                if (ptr->is_stopped())
                    return {Error::EXITING, 0};
                if (ptr->end_of_input && ptr->threads() == 0)
                    return {Error::NEED_DATA, 0};
                break; // still working
            }
Luker's avatar
Luker committed
			++reportable;
            ++next_expected;
        dec_lock.unlock();
Luker's avatar
Luker committed
		if (reportable > 0) {
			pool_last_reported += reportable;
			if (Compute::PARTIAL_FROM_BEGINNING ==
									(flags & Compute::PARTIAL_FROM_BEGINNING)) {
Luker's avatar
Luker committed
				return {Error::NONE, static_cast<uint8_t>(pool_last_reported)};
Luker's avatar
Luker committed
			} else {
				// complete
				if (pool_last_reported == _blocks - 1)
Luker's avatar
Luker committed
					return {Error::NONE,
                                    static_cast<uint8_t>(pool_last_reported)};
Luker's avatar
Luker committed
			}
Luker's avatar
Luker committed
	} else if (Compute::PARTIAL_ANY == (flags & Compute::PARTIAL_ANY)) {
        // FIXME: locking might not be necessary. map emplace/erase do not
        // invalidate other pointers.
        auto undecodable = decoders.end();
        std::unique_lock<std::mutex> dec_lock (_mtx);
        RQ_UNUSED(dec_lock);
        for (auto it = decoders.begin(); it != decoders.end(); ++it) {
			if (!it->second.reported) {
				auto ptr = it->second.dec;
                if (ptr == nullptr) {
                    assert(false && "RFC6330: decoder should never be nullptr");
                    break;
                }
                if (ptr->ready()) {
                    it->second.reported = true;
                    return {Error::NONE, it->first};
                }
                if (ptr->is_stopped())
                    return {Error::EXITING, 0};
                // first return all decodable blocks
                // then return the ones we can not decode.
                if (ptr->end_of_input && ptr->threads() == 0)
                    undecodable = it;
        if (undecodable != decoders.end()) {
            undecodable->second.reported = true;
            return {Error::NEED_DATA, undecodable->first};
        }
Luker's avatar
Luker committed
	}
	// can be reached if computing thread was stopped
	return {Error::WORKING, 0};
}

template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::decode_bytes (Fwd_It &start, const Fwd_It end,
														const uint8_t skip)
{
	// Decode from the beginning, up untill we can.
	// return number of BYTES written, starting at "start + skip" bytes
	//
	// in case the last iterator is only half written, "start" will
	// point to the half-written iterator.
Luker's avatar
Luker committed
	uint64_t written = 0;
	uint8_t new_skip = skip;
	for (uint8_t sbn = 0; sbn < blocks(); ++sbn) {
		std::unique_lock<std::mutex> block_lock (_mtx);
Luker's avatar
Luker committed
		auto it = decoders.find (sbn);
		if (it == decoders.end())
			return written;
		auto dec_ptr = it->second.dec;
		block_lock.unlock();

		if (!dec_ptr->ready()) {
			if (!use_pool && dec_ptr->can_decode()) {
				RaptorQ__v1::Work_State state =
										RaptorQ__v1::Work_State::KEEP_WORKING;
				auto ret = dec_ptr->decode (&state);
Luker's avatar
Luker committed
				if (RaptorQ__v1::Impl::Raw_Decoder<In_It>::
											Decoder_Result::DECODED != ret) {
					return written;
				return written;
			}
		}

		Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
													_sub_blocks, _alignment);
Luker's avatar
Luker committed
		size_t max_bytes = block_size (sbn);
		if (sbn == (blocks() - 1)) {
			// size of the data (_size) is different from the sum of the size of
			// all blocks. get the real size, so we do not write more.
			// we obviously need to consider this only for the last block.
			uint64_t all_blocks = 0;
			for (uint8_t id = 0; id < blocks(); ++id)
				all_blocks += block_size (sbn);
Luker's avatar
Luker committed
			const size_t diff = static_cast<size_t> (all_blocks - _size);
			max_bytes -= diff;
		}
Luker's avatar
Luker committed
		auto tmp_start = start;
		uint64_t bytes_written = de_interleaving (tmp_start, end, max_bytes,
																	new_skip);
		written += bytes_written;
		uint64_t bytes_and_skip = new_skip + bytes_written;
		new_skip = bytes_and_skip %
Luker's avatar
Luker committed
					sizeof(typename std::iterator_traits<Fwd_It>::value_type);
		if (bytes_written == 0)
			return written;
		//new_skip = block_size (sbn) %
		//			sizeof(typename std::iterator_traits<Fwd_It>::value_type);
Luker's avatar
Luker committed
		// if we ended decoding in the middle of a Fwd_It, do not advance
		// start too much, or we will end up having additional zeros.
		if (new_skip == 0) {
Luker's avatar
Luker committed
			start = tmp_start;
		} else {
			uint64_t it_written = bytes_and_skip /
					sizeof(typename std::iterator_traits<Fwd_It>::value_type);
			// RaptorQ handles at most 881GB per rfc, so
Luker's avatar
Luker committed
			// casting uint64 to int64 is safe
			// we can not do "--start" since it's a forward iterator
Luker's avatar
Luker committed
			#pragma clang diagnostic push
			#pragma clang diagnostic ignored "-Wshorten-64-to-32"
			#pragma clang diagnostic ignored "-Wsign-conversion"
			start += std::max (static_cast<uint64_t>(0), it_written - 1);
			#pragma clang diagnostic pop
Luker's avatar
Luker committed
		}
Luker's avatar
Luker committed
	}
	return written;
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
size_t Decoder<In_It, Fwd_It>::decode_block_bytes (Fwd_It &start,
															const Fwd_It end,
															const uint8_t skip,
Luker's avatar
Luker committed
															const uint8_t sbn)
Luker's avatar
Luker committed
{
	if (sbn >= _blocks)
		return 0;

Luker's avatar
Luker committed
	std::shared_ptr<RaptorQ__v1::Impl::Raw_Decoder<In_It>> dec_ptr = nullptr;
	std::unique_lock<std::mutex> lock (_mtx);
Luker's avatar
Luker committed
	auto it = decoders.find (sbn);

	if (it == decoders.end())
		return 0;	// did not receiveany data yet.

	if (use_pool) {
		dec_ptr = it->second.dec;
		lock.unlock();
		if (!dec_ptr->ready())
			return 0;	// did not receive enough data, or could not decode yet.
	} else {
		dec_ptr = it->second.dec;
		lock.unlock();
		if (!dec_ptr->ready()) {
			if (!dec_ptr->can_decode())
				return 0;
			RaptorQ__v1::Work_State keep_working =
										RaptorQ__v1::Work_State::KEEP_WORKING;
			dec_ptr->decode (&keep_working);
			if (!dec_ptr->ready())
				return 0;
		}
	}
	// decoder has decoded the block
Luker's avatar
Luker committed

	Impl::De_Interleaver<Fwd_It> de_interleaving (dec_ptr->get_symbols(),
													_sub_blocks, _alignment);
Luker's avatar
Luker committed
	size_t max_bytes = block_size (sbn);
	if (sbn == (blocks() - 1)) {
		// size of the data (_size) is different from the sum of the size of
		// all blocks. get the real size, so we do not write more.
		// we obviously need to consider this only for the last block.
		uint64_t all_blocks = 0;
		for (uint8_t id = 0; id < blocks(); ++id)
			all_blocks += block_size (sbn);
		const uint64_t diff = all_blocks - _size;
Luker's avatar
Luker committed
		max_bytes -= static_cast<size_t>(diff);
	}
	return de_interleaving (start, end, max_bytes, skip);
}

template <typename In_It, typename Fwd_It>
Luker's avatar
Luker committed
std::pair<uint64_t, uint8_t> Decoder<In_It, Fwd_It>::decode_aligned (
														Fwd_It &start,
														const Fwd_It end,
														const uint8_t skip)
{
	const uint64_t bytes = decode_bytes (start, end, skip);
	const uint64_t skip_and_bytes = skip + bytes;
	const uint64_t iterators = skip_and_bytes /
					sizeof(typename std::iterator_traits<Fwd_It>::value_type);
	const uint8_t new_skip = skip_and_bytes %
					sizeof(typename std::iterator_traits<Fwd_It>::value_type);
	return {iterators, new_skip};
}

template <typename In_It, typename Fwd_It>
std::pair<size_t, uint8_t> Decoder<In_It, Fwd_It>::decode_block_aligned (
														Fwd_It &start,
														const Fwd_It end,
														const uint8_t skip,
														const uint8_t sbn)
{
Luker's avatar
Luker committed
	const size_t bytes = decode_block_bytes (start, end, skip, sbn);
	const size_t skip_and_bytes = skip + bytes;
	const size_t iterators = skip_and_bytes /
					sizeof(typename std::iterator_traits<Fwd_It>::value_type);
	const uint8_t new_skip = skip_and_bytes %
					sizeof(typename std::iterator_traits<Fwd_It>::value_type);
	return {iterators, new_skip};
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint64_t Decoder<In_It, Fwd_It>::bytes() const
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint8_t Decoder<In_It, Fwd_It>::blocks() const
Luker's avatar
Luker committed
{
Luker's avatar
Luker committed
	return static_cast<uint8_t> (part.num (0) + part.num (1));
Luker's avatar
Luker committed
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint32_t Decoder<In_It, Fwd_It>::block_size (const uint8_t sbn) const
Luker's avatar
Luker committed
{
	if (sbn < part.num (0)) {
		return part.size (0) * _symbol_size;
	} else if (sbn - part.num (0) < part.num (1)) {
		return part.size (1) * _symbol_size;
	}
	return 0;
}

Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbol_size() const
Luker's avatar
Luker committed
{
	return _symbol_size;
}
Luker's avatar
Luker committed
template <typename In_It, typename Fwd_It>
uint16_t Decoder<In_It, Fwd_It>::symbols (const uint8_t sbn) const
Luker's avatar
Luker committed
{
	if (sbn < part.num (0)) {
		return part.size (0);
	} else if (sbn - part.num (0) < part.num (1)) {
		return part.size (1);
	}
	return 0;
}
Luker's avatar
Luker committed

}   // namespace Impl
}	// namespace RFC6330__v1
Luker's avatar
Luker committed