Thread-Safe Containers

Prodigy Engine supports multi-threading which can be used in a variety of use cases. The engine also supports handling multiple threads such as the main thread, generic work thread, and a log thread. This requires the use of asynchronous data structures that are thread-safe for both reading and writing data. The 2 thread-safe containers used in Prodigy is the Async Queue and the Async MPSC Ring Buffer.

Below are the implementations used for the 2 data structures starting with the asynchronous queue:

Async Queue

The specifications for the Async Queue are as follows:

  • Be able to insert items and remove items from the queue
  • Handle insert and remove from multiple threads
  • Use locks to design a simple Async Queue
  • Be templated to store objects of the desired type

The implementation is as follows:

template <typename TYPE>
class AsyncQueue
{
public:
	AsyncQueue();
	~AsyncQueue();


	void EnqueueLocked(TYPE const &element);
	bool DequeueLocked(TYPE* out);

	int GetLength() const;

private:
	std::queue<TYPE> m_queue;
	std::mutex m_mutex;
};

//------------------------------------------------------------------------------------------------------------------------------
template <typename TYPE>
int AsyncQueue<TYPE>::GetLength() const
{
	std::lock_guard<std::mutex> mutexLock(m_mutex);
	return m_queue.size();
}

//------------------------------------------------------------------------------------------------------------------------------
template <typename TYPE>
AsyncQueue<TYPE>::AsyncQueue()
{

}

//------------------------------------------------------------------------------------------------------------------------------
template <typename TYPE>
AsyncQueue<TYPE>::~AsyncQueue()
{

}

//------------------------------------------------------------------------------------------------------------------------------
template <typename TYPE>
void AsyncQueue<TYPE>::EnqueueLocked(TYPE const& element)
{
	std::lock_guard<std::mutex> mutexLock(m_mutex);
	m_queue.push(element);
}

//------------------------------------------------------------------------------------------------------------------------------
template <typename TYPE>
bool AsyncQueue<TYPE>::DequeueLocked(TYPE *out)
{
	std::lock_guard<std::mutex> mutexLock(m_mutex);
	if (m_queue.empty()) 
	{
		return false;
	}
	else 
	{
		*out = m_queue.front();
		m_queue.pop();
		return true;
	}
}

With this templated implementation, the Async Queue is able to store and use objects of different types and can handle read or write requests from any thread.

Note: The async queue implementation uses blocked enqueue and dequeue which is a design choice that blocks the thread from execution till the enqueue or dequeue takes place.

Async MPSC Ring Buffer

In the case of the Profiler used in Prodigy Engine, I needed to have a multiple producer single consumer asynchronous ring buffer that would be used to allocate ProfilerSample objects. This asynchronous ring buffer would be given either a fixed size of memory or be allowed to use a block allocator to assign more memory if and when required.

Below is the implementation for the same:

struct RingBufferMeta_T
{
	uint bufferObjectSize : 31;
	uint isBufferObjectUnlocked : 1;
};

//------------------------------------------------------------------------------------------------------------------------------
// This is a Multiple Producer Single Consumer Async Ring Buffer. 
//------------------------------------------------------------------------------------------------------------------------------
class MPSCRingBuffer
{
	public:
		MPSCRingBuffer();
		~MPSCRingBuffer();

		bool			InitializeBuffer(size_t sizeInBytes);
		void			ReleaseBuffer();    

		//Instantly returns, either receive valid data pointer or a nullptr
		void*			TryLockWrite(size_t size);
		//Block until there room to write onto buffer
		void*			LockWrite(size_t size);
		void			UnlockWrite(void* ptr);

		size_t			GetWritableSpace() const;
		
		void*			TryLockRead(size_t* outSize);
		void*			LockRead(size_t* outSize);
		void			UnlockRead(void* ptr);

	private:
		byte*			m_buffer = nullptr;
		size_t			m_byteSize = 0;

		uint			m_writeHead = 0;
		uint			m_readHead = 0;

		std::mutex		m_lock;
};
bool MPSCRingBuffer::InitializeBuffer(size_t sizeInBytes)
{
	if (m_buffer == nullptr)
	{
		m_buffer = (byte*)malloc(sizeInBytes);
		m_byteSize = sizeInBytes;
		return true;
	}
	else
	{
		ReleaseBuffer();
		return InitializeBuffer(sizeInBytes);
	}
}

//------------------------------------------------------------------------------------------------------------------------------
void MPSCRingBuffer::ReleaseBuffer()
{
	if (m_buffer != nullptr) 
	{
		free(m_buffer);
		m_buffer = nullptr;
		m_byteSize = 0U;
	}
}

//------------------------------------------------------------------------------------------------------------------------------
void* MPSCRingBuffer::TryLockWrite(size_t writeSize)
{
	ASSERT_OR_DIE(writeSize < (1 << 31), "The size of object to write exceeded the total size of the MPSC ring buffer");

	size_t metaDataSize = sizeof(RingBufferMeta_T);
	size_t totalSize = 2 * metaDataSize + writeSize;

	//If the space available is less than the size of meta data + write data, return nullptr
	std::scoped_lock lock(m_lock);
	if (GetWritableSpace() < totalSize) 
	{
		return nullptr;
	}

	//We have enough space in the buffer so let's write meta and alloc the required space
	uint newHead = (uint)(m_writeHead + totalSize);

	if (newHead > m_byteSize) 
	{
		//Buffer needs to wrap, so let's write a skip meta buffer so the read head will wrap at this point
		RingBufferMeta_T* skipBufferEntry = (RingBufferMeta_T*)(m_buffer + m_writeHead);
		skipBufferEntry->bufferObjectSize = 0;  // 0 means skip; 
		skipBufferEntry->isBufferObjectUnlocked = 1;

		m_writeHead = 0;
	}

	//ASfter we wrapped we would like to check if the buffer has space again
	if (GetWritableSpace() < totalSize) 
	{
		return nullptr;
	}

	//My write usable buffer
	byte* usableBuffer = m_buffer + m_writeHead;

	RingBufferMeta_T* head = (RingBufferMeta_T*)usableBuffer;
	head->bufferObjectSize = (uint)writeSize;
	head->isBufferObjectUnlocked = 0;

	//uint usedHead = m_writeHead;
	m_writeHead += (int)totalSize;
	//we moved by 2 meta data in size so let's get back by 1 meta data size (TotalSize is 2*metaDataSize + writeSize)
	m_writeHead -= (int)metaDataSize;

	return head + 1;
}

//------------------------------------------------------------------------------------------------------------------------------
// No matter how long it takes, this call will lock the thread and make it wait for a write operation
//------------------------------------------------------------------------------------------------------------------------------
void* MPSCRingBuffer::LockWrite(size_t size)
{
	void* ptr = TryLockWrite(size);
	while (ptr == nullptr) 
	{
		std::this_thread::yield();
		ptr = TryLockWrite(size);
	}

	return ptr;
}

//------------------------------------------------------------------------------------------------------------------------------
size_t MPSCRingBuffer::GetWritableSpace() const
{
	// size_t remaining = (m_read_head - m_write_head - 1) % m_size;  
	size_t remaining = 0;
	if (m_writeHead >= m_readHead) 
	{
		remaining = m_byteSize - m_writeHead;
		remaining += m_readHead;
	}
	else 
	{
		remaining = m_readHead - m_writeHead;
	}

	return remaining;
}

//------------------------------------------------------------------------------------------------------------------------------
void MPSCRingBuffer::UnlockWrite(void* ptr)
{
	RingBufferMeta_T* writeHead = (RingBufferMeta_T*)ptr;
	--writeHead;

	writeHead->isBufferObjectUnlocked = 1;
}

//------------------------------------------------------------------------------------------------------------------------------
void* MPSCRingBuffer::TryLockRead(size_t* outSize)
{
	std::scoped_lock lock(m_lock);

	while (true) 
	{
		//If the buffer is empty return
		if (m_readHead == m_writeHead) 
		{
			return nullptr;
		}

		//Cast to Meta object and figure how big the buffer is to return it
		RingBufferMeta_T* readMeta = (RingBufferMeta_T*)(m_buffer + m_readHead);
		if (readMeta->isBufferObjectUnlocked) 
		{
			if (readMeta->bufferObjectSize == 0) 
			{
				// Wrap around case
				m_readHead = 0;
			}
			else 
			{
				// valid read case
				*outSize = readMeta->bufferObjectSize;

				// SINGLE CONSUMER CASE - nothing else happens
				void* returnBuffer = readMeta + 1;
				return returnBuffer;
			}
		}
		else 
		{
			// only one consumer - this shouldn't happen
			return nullptr;
		}
	}
}

//------------------------------------------------------------------------------------------------------------------------------
void* MPSCRingBuffer::LockRead(size_t* outSize)
{
	void* ptr = TryLockRead(outSize);
	while (ptr == nullptr)
	{
		std::this_thread::yield();
		ptr = TryLockRead(outSize);
	}

	return ptr;
}

//------------------------------------------------------------------------------------------------------------------------------
void MPSCRingBuffer::UnlockRead(void* ptr)
{
	std::scoped_lock lock(m_lock);

	RingBufferMeta_T* readHead = (RingBufferMeta_T*)ptr;
	readHead--;

	ASSERT_RECOVERABLE(((m_buffer + m_readHead) == (byte*)readHead), "The read head for MPSC Async Ring Buffer is invalid");

	m_readHead += sizeof(RingBufferMeta_T) + readHead->bufferObjectSize;
}

With this implementation, I was able to use the MPSC Ring Buffer in my profiler. Note however that similar to the case of Async Queue, the MPSC Ring Buffer also has to block write and read calls.