Asynchronous Job System

Apart from supporting asynchronous data structures for multi-threading in games, Prodigy Engine also provides a Job System that can be used for the same. This job system abstracts some logic and context from game allowing for a better end-user experience.

The class breakdown of the job system is as follows:

  • Job class
  • Job Category
  • Job System

The intent of this breakdown is to allow the Job System to handle the threads and work performed by each thread while the job category class defines the category of work and which thread it needs to be executed on. The job class acts as a base class that can be inherited from to create desired jobs that would be handled by the system.

Below is the implementation of the Job System:

class JobSystem
{
public:

	static JobSystem*	CreateInstance();
	static JobSystem*	GetInstance();
	static void			DestroyInstance();

	// negative here means as many as you can MINUS the value (8 cores, -1 is 7... -2 would be 6.  -20 would be 1)
	// MINIMUM 1 unless explicitly saying 0; 
	void				Startup(int numGenericThreads = -1, int numCategories = JOB_CATEGORY_CORE_COUNT);
	void				Shutdown();

	void				Run(Job* job);

	bool				ProcessCategoryForTimeInMS(int category, uint ms); // process until no more jobs, or until 'ms' has passed 
	bool				ProcessCategory(int category); // process until no more jobs, return number of jobs executed
	
	void				ProcessFinishJobsForCategory(int category);

	void				AddJobForCategory(Job* job, int category);
	void				AddFinishedJobForCategory(Job* job, int category);

private:
	static void			GenericThreadWork();

	Semaphore			m_genericJobsSemaphore;

	void				WaitForWork()	{ m_genericJobsSemaphore.Acquire(); }
	void				SignalWork()	{ m_genericJobsSemaphore.Release(1); }

	JobCategory*				m_categories;
	int							m_numCategories = JOB_CATEGORY_CORE_COUNT;

	std::vector<std::thread>	m_genericThreads;

	bool m_isRunning;
};
JobSystem* gJobSystem = nullptr;

//------------------------------------------------------------------------------------------------------------------------------
JobSystem* JobSystem::CreateInstance()
{
	if (gJobSystem == nullptr)
	{
		gJobSystem = new JobSystem();
	}

	gJobSystem->Startup(-2, gJobSystem->m_numCategories);

	return gJobSystem;
}

//------------------------------------------------------------------------------------------------------------------------------
JobSystem* JobSystem::GetInstance()
{
	if (gJobSystem == nullptr)
	{
		JobSystem::CreateInstance();
	}
	
	return gJobSystem;
}

//------------------------------------------------------------------------------------------------------------------------------
void JobSystem::DestroyInstance()
{
	if (gJobSystem != nullptr)
	{
		gJobSystem->Shutdown();
		delete gJobSystem;
	}
}

//------------------------------------------------------------------------------------------------------------------------------
void JobSystem::Startup(int numGenericThreads /*= -1*/, int numCategories /*= JOB_CATEGORY_CORE_COUNT*/)
{
	m_genericJobsSemaphore.Create(0, 1);
	m_isRunning = true;

	//Create required number of JobCategories
	m_categories = new JobCategory[numCategories];

	// figure out thread count
	int coreCount = std::thread::hardware_concurrency();
	int numThreadsToMake = coreCount;
	if (numGenericThreads < 0)
	{
		numThreadsToMake += numGenericThreads;

		//Case where numThreadsToMake is still negative
		if (numThreadsToMake < 0)
			numThreadsToMake = 1;
	}
	else
	{
		numThreadsToMake = numGenericThreads;
	}

	for (int threadIndex = 0; threadIndex < numThreadsToMake; threadIndex++)
	{
		//Make these threads run the generic work task
		m_genericThreads.emplace_back(&GenericThreadWork);
	}
}

//------------------------------------------------------------------------------------------------------------------------------
void JobSystem::Shutdown()
{
	m_isRunning = false;
	SignalWork();

	for (int threadIndex = 0; threadIndex < m_genericThreads.size(); threadIndex++)
	{
		m_genericThreads[threadIndex].join();
	}

}

//------------------------------------------------------------------------------------------------------------------------------
void JobSystem::Run(Job* job)
{
	job->TryStart();
}

//------------------------------------------------------------------------------------------------------------------------------
bool JobSystem::ProcessCategoryForTimeInMS(int category, uint ms)
{
	double startTime = GetCurrentTimeSeconds() * 1000;
	double endTime = 0;
	bool elapsed = false;
	bool result = false;

	while (!elapsed)
	{
		result = ProcessCategory(category);
		if (!result)
		{
			elapsed = true;
		}

		endTime = GetCurrentTimeSeconds() * 1000;
		if (endTime > startTime + ms)
		{
			elapsed = true;
		}
	}

	return result;
}

//------------------------------------------------------------------------------------------------------------------------------
bool JobSystem::ProcessCategory(int category)
{
	Job* job = nullptr;

	job = m_categories[category].TryDequeue();

	if (job != nullptr)
	{
		job->Execute();
		job->FinishJob();

		return true;
	}

	return false;
}

//------------------------------------------------------------------------------------------------------------------------------
void JobSystem::ProcessFinishJobsForCategory(int category)
{
	Job* job = m_categories[category].TryDequeueFinished();

	while (job)
	{
		job->m_finishCallback(job);

		delete job;
		job = nullptr;

		job = m_categories[category].TryDequeueFinished();
	}
}

//------------------------------------------------------------------------------------------------------------------------------
void JobSystem::AddJobForCategory(Job* job, int category)
{
	m_categories[category].Enqueue(job);
	SignalWork();
}

//------------------------------------------------------------------------------------------------------------------------------
void JobSystem::AddFinishedJobForCategory(Job* job, int category)
{
	m_categories[category].EnqueueFinished(job);
}

//------------------------------------------------------------------------------------------------------------------------------
STATIC void JobSystem::GenericThreadWork()
{
	//This is the work the generic thread needs to do
	JobSystem* system = JobSystem::GetInstance();
	while (system->m_isRunning)
	{
		system->WaitForWork();
		while (system->ProcessCategoryForTimeInMS(JOB_GENERIC, 5));

		system->ProcessFinishJobsForCategory(JOB_GENERIC);
		Sleep(0);
	}
}
//------------------------------------------------------------------------------------------------------------------------------
enum eJobCategory : int
{
	JOB_GENERIC = 0,
	JOB_MAIN,
	JOB_RENDER,

	JOB_CATEGORY_CORE_COUNT,
};

For this system to work, it also needs to Job Category class as well as the Job class to define the actual behavior of the work being carried out. Below is the implementations used for these classes:

class JobCategory
{
public:
	void				Enqueue(Job* job);
	Job*				TryDequeue();
	Job*				Dequeue();	
        
void				EnqueueFinished(Job* job);
	Job*				TryDequeueFinished();

private:
	AsyncQueue<Job*>	m_pendingQueue;
	AsyncQueue<Job*>	m_finishQueue;
};
//------------------------------------------------------------------------------------------------------------------------------
void JobCategory::Enqueue(Job* job)
{
	m_pendingQueue.EnqueueLocked(job);
}

//------------------------------------------------------------------------------------------------------------------------------
Job* JobCategory::TryDequeue()
{
	Job* jobReturned = nullptr;
	m_pendingQueue.DequeueLocked(&jobReturned);

	return jobReturned;
}

//------------------------------------------------------------------------------------------------------------------------------
Job* JobCategory::Dequeue()
{
	Job* jobReturned = nullptr;
	bool result = false;

	while (!result)
	{
		result = m_pendingQueue.DequeueLocked(&jobReturned);
	}

	return jobReturned;
}

//------------------------------------------------------------------------------------------------------------------------------
void JobCategory::EnqueueFinished(Job* job)
{
	m_finishQueue.EnqueueLocked(job);
}

//------------------------------------------------------------------------------------------------------------------------------
Job* JobCategory::TryDequeueFinished()
{
	Job* jobReturned = nullptr;
	m_finishQueue.DequeueLocked(&jobReturned);

	return jobReturned;
}
class Job
{
	friend class JobSystem;

public:
	Job();
	~Job();

	using finishCallback = std::function<void(Job*)>;

	void				AddSuccessor(Job* job);
	void				AddPredecessor(Job* job);
	void				Dispatch();

	//NOTE: 
	//	To add a methods to callback
	//	job->set_callback( [=]() { this->apply_path( job->path ); } ); 
	//	Otherwise we can simply bind static functions or stand alone functions with no extra lambda magic
	void				SetFinishCallback(finishCallback callBack);

	void				SetJobCategory(eJobCategory type);
	eJobCategory		GetJobCategory();
	virtual void		Execute() = 0;

private:
	bool				TryStart();
	void				EnqueueForCategoryInSystem();
	void				EnqueueFinishedForCategoryInSystem();
	void				FinishJob();

	int 				m_category = JOB_GENERIC;

	std::vector<Job*> 	m_successors;
	std::atomic<int> 	m_predecessorCount;

	finishCallback		m_finishCallback;
	std::string 		m_event;

	std::mutex			m_mutex;
};
//------------------------------------------------------------------------------------------------------------------------------
Job::Job()
{
	m_predecessorCount = 1;
}

//------------------------------------------------------------------------------------------------------------------------------
Job::~Job()
{

}

//------------------------------------------------------------------------------------------------------------------------------
void Job::AddSuccessor(Job* job)
{
	job->AddPredecessor(this);
	//m_successors.push_back(job);
}

//------------------------------------------------------------------------------------------------------------------------------
void Job::AddPredecessor(Job* job)
{
	m_predecessorCount++;

	{
		std::scoped_lock lock(m_mutex);
		job->m_successors.push_back(this);
	}
}

//------------------------------------------------------------------------------------------------------------------------------
void Job::Dispatch()
{
	TryStart();
}

//------------------------------------------------------------------------------------------------------------------------------
void Job::SetFinishCallback(finishCallback callBack)
{
	m_finishCallback = callBack;
}

//------------------------------------------------------------------------------------------------------------------------------
void Job::SetJobCategory(eJobCategory type)
{
	m_category = type;
}

//------------------------------------------------------------------------------------------------------------------------------
eJobCategory Job::GetJobCategory()
{
	return (eJobCategory)m_category;
}

//------------------------------------------------------------------------------------------------------------------------------
bool Job::TryStart()
{
	int newNumPredecessors = --m_predecessorCount;
	ASSERT_RECOVERABLE(newNumPredecessors >= 0, "The number of predecessors is lesser than 0!");

	if (0 == newNumPredecessors) 
	{
		EnqueueForCategoryInSystem();
		return true;
	}

	return false;
}

//------------------------------------------------------------------------------------------------------------------------------
void Job::EnqueueForCategoryInSystem()
{
	JobSystem* system = JobSystem::GetInstance();

	system->AddJobForCategory(this, m_category);
}

//------------------------------------------------------------------------------------------------------------------------------
void Job::EnqueueFinishedForCategoryInSystem()
{
	JobSystem* system = JobSystem::GetInstance();
	
	system->AddFinishedJobForCategory(this, m_category);
}

//------------------------------------------------------------------------------------------------------------------------------
void Job::FinishJob()
{
	//Try Start all your successors
	std::vector<Job*>::iterator itr = m_successors.begin();
	while (itr != m_successors.end())
	{
		(*itr)->TryStart();
		itr++;
	}
	
	if (m_finishCallback != nullptr) 
	{
		EnqueueFinishedForCategoryInSystem();
	}
	else 
	{
		delete this;
	}
}

With this framework in place, Prodigy Engine can enqueue jobs under a specific category to the job system. The support to be able to execute enqueued jobs under desired categories also allow for control over execution for specific categories as desired by the user.

Here is an example of a job incorporated using the Job base class highlighted above:

//------------------------------------------------------------------------------------------------------------------------------
class MandleBrotJob : public Job 
{
public:
	MandleBrotJob(Image* image, uint rowNum, uint maxIterations);
	~MandleBrotJob();
	void			Execute();

private:
	Image*		m_image = nullptr;
	uint		m_rowNum = 0U;
	uint		m_maxIterations = 0U;
};

//------------------------------------------------------------------------------------------------------------------------------
class UpdateTextureRowJob : public Job 
{
public:
	UpdateTextureRowJob(Image* image, Texture2D* texture2D, uint rowNum);
	~UpdateTextureRowJob();
	void			Execute();

private:
	uint			m_rowNum = 0U;
	Image*			m_image = nullptr;
	Texture2D*		m_textureToUpdate = nullptr;
};
//------------------------------------------------------------------------------------------------------------------------------
MandleBrotJob::MandleBrotJob(Image* image, uint rowNum, uint maxIterations)
{
	m_rowNum = rowNum;
	m_image = image;
	m_maxIterations = maxIterations;
}

//------------------------------------------------------------------------------------------------------------------------------
MandleBrotJob::~MandleBrotJob()
{

}

//------------------------------------------------------------------------------------------------------------------------------
static uint CheckMandlebrotSet(float x0, float y0, uint const MAX_ITERATIONS)
{

	float x = 0.0f;
	float y = 0.0f;

	uint iteration = 0;
	while ((((x * x) + (y * y)) < 4.0f) && (iteration < MAX_ITERATIONS)) 
	{
		float xtemp = x * x - y * y + x0;
		y = 2.0f * x * y + y0;
		x = xtemp;
		iteration++;
	}

	return iteration;
}

//------------------------------------------------------------------------------------------------------------------------------
void MandleBrotJob::Execute()
{

	IntVec2 imageDim = m_image->GetImageDimensions();

	// column stuff
	// scale image height to -1 to 1 range for mandelbrot
	float y0 = ((float)m_rowNum / (float)imageDim.y) * 2.0f - 1.0f;

	for (uint x = 0; x < (uint)imageDim.x; ++x)
	{
		//scale image width 
		float x0 = ((float)x / (float)imageDim.x) * 3.5f - 2.5f;

		uint iterations = CheckMandlebrotSet(x0, y0, m_maxIterations);

		if (iterations == m_maxIterations) 
		{
			m_image->SetTexelColor(x, m_rowNum, Rgba::BLACK);
		}
		else 
		{
			m_image->SetTexelColor(x, m_rowNum, Rgba::WHITE);
		}
	}
}

//------------------------------------------------------------------------------------------------------------------------------
UpdateTextureRowJob::UpdateTextureRowJob(Image* image, Texture2D* texture2D, uint rowNum)
{
	m_rowNum = rowNum;
	m_image = image;
	m_textureToUpdate = texture2D;
}

//------------------------------------------------------------------------------------------------------------------------------
UpdateTextureRowJob::~UpdateTextureRowJob()
{

}

//------------------------------------------------------------------------------------------------------------------------------
void UpdateTextureRowJob::Execute()
{
	// map the image
	ID3D11DeviceContext* d3dContext = g_renderContext->GetDXContext();

	D3D11_MAPPED_SUBRESOURCE resource;
	ZeroMemory(&resource, sizeof(D3D11_MAPPED_SUBRESOURCE));

	HRESULT hr = d3dContext->Map(m_textureToUpdate->m_handle, 0, D3D11_MAP_WRITE_DISCARD, 0U, &resource);
	if (SUCCEEDED(hr))
	{
		// we're mapped!  Copy over
		void* buffer = m_image->GetRawPointerToRow(m_rowNum);
		void* texBuffer = (unsigned char*)resource.pData + m_rowNum * resource.RowPitch;

		memcpy(texBuffer, buffer, m_image->GetImageDimensions().x * m_image->GetBytesPerPixel());

		// unlock the resource (we're done writing)
		d3dContext->Unmap(m_textureToUpdate->m_handle, 0);
	}
	else
	{
		ERROR_RECOVERABLE("Couldn't write image from d3d Resource");
	}
}

Retrospectives

What went well 🙂

  • Job system can run desired categories
  • Implementation of job system and categories allow for further development and adding new job queues

What went wrong 🙁

  • The current structure only allows jobs to have 1 callback after job execution
  • Due to blocking calls, the job system may not be performance optimal

What I would do differently 🙂

  • Definitely add support for more than 1 callbacks
  • Implement a job profiler interface for profiling job times, system times and context switching time