Apart from supporting asynchronous data structures for multi-threading in games, Prodigy Engine also provides a Job System that can be used for the same. This job system abstracts some logic and context from game allowing for a better end-user experience.
The class breakdown of the job system is as follows:
- Job class
- Job Category
- Job System
The intent of this breakdown is to allow the Job System to handle the threads and work performed by each thread while the job category class defines the category of work and which thread it needs to be executed on. The job class acts as a base class that can be inherited from to create desired jobs that would be handled by the system.
Below is the implementation of the Job System:
class JobSystem { public: static JobSystem* CreateInstance(); static JobSystem* GetInstance(); static void DestroyInstance(); // negative here means as many as you can MINUS the value (8 cores, -1 is 7... -2 would be 6. -20 would be 1) // MINIMUM 1 unless explicitly saying 0; void Startup(int numGenericThreads = -1, int numCategories = JOB_CATEGORY_CORE_COUNT); void Shutdown(); void Run(Job* job); bool ProcessCategoryForTimeInMS(int category, uint ms); // process until no more jobs, or until 'ms' has passed bool ProcessCategory(int category); // process until no more jobs, return number of jobs executed void ProcessFinishJobsForCategory(int category); void AddJobForCategory(Job* job, int category); void AddFinishedJobForCategory(Job* job, int category); private: static void GenericThreadWork(); Semaphore m_genericJobsSemaphore; void WaitForWork() { m_genericJobsSemaphore.Acquire(); } void SignalWork() { m_genericJobsSemaphore.Release(1); } JobCategory* m_categories; int m_numCategories = JOB_CATEGORY_CORE_COUNT; std::vector<std::thread> m_genericThreads; bool m_isRunning; };
JobSystem* gJobSystem = nullptr; //------------------------------------------------------------------------------------------------------------------------------ JobSystem* JobSystem::CreateInstance() { if (gJobSystem == nullptr) { gJobSystem = new JobSystem(); } gJobSystem->Startup(-2, gJobSystem->m_numCategories); return gJobSystem; } //------------------------------------------------------------------------------------------------------------------------------ JobSystem* JobSystem::GetInstance() { if (gJobSystem == nullptr) { JobSystem::CreateInstance(); } return gJobSystem; } //------------------------------------------------------------------------------------------------------------------------------ void JobSystem::DestroyInstance() { if (gJobSystem != nullptr) { gJobSystem->Shutdown(); delete gJobSystem; } } //------------------------------------------------------------------------------------------------------------------------------ void JobSystem::Startup(int numGenericThreads /*= -1*/, int numCategories /*= JOB_CATEGORY_CORE_COUNT*/) { m_genericJobsSemaphore.Create(0, 1); m_isRunning = true; //Create required number of JobCategories m_categories = new JobCategory[numCategories]; // figure out thread count int coreCount = std::thread::hardware_concurrency(); int numThreadsToMake = coreCount; if (numGenericThreads < 0) { numThreadsToMake += numGenericThreads; //Case where numThreadsToMake is still negative if (numThreadsToMake < 0) numThreadsToMake = 1; } else { numThreadsToMake = numGenericThreads; } for (int threadIndex = 0; threadIndex < numThreadsToMake; threadIndex++) { //Make these threads run the generic work task m_genericThreads.emplace_back(&GenericThreadWork); } } //------------------------------------------------------------------------------------------------------------------------------ void JobSystem::Shutdown() { m_isRunning = false; SignalWork(); for (int threadIndex = 0; threadIndex < m_genericThreads.size(); threadIndex++) { m_genericThreads[threadIndex].join(); } } //------------------------------------------------------------------------------------------------------------------------------ void JobSystem::Run(Job* job) { job->TryStart(); } //------------------------------------------------------------------------------------------------------------------------------ bool JobSystem::ProcessCategoryForTimeInMS(int category, uint ms) { double startTime = GetCurrentTimeSeconds() * 1000; double endTime = 0; bool elapsed = false; bool result = false; while (!elapsed) { result = ProcessCategory(category); if (!result) { elapsed = true; } endTime = GetCurrentTimeSeconds() * 1000; if (endTime > startTime + ms) { elapsed = true; } } return result; } //------------------------------------------------------------------------------------------------------------------------------ bool JobSystem::ProcessCategory(int category) { Job* job = nullptr; job = m_categories[category].TryDequeue(); if (job != nullptr) { job->Execute(); job->FinishJob(); return true; } return false; } //------------------------------------------------------------------------------------------------------------------------------ void JobSystem::ProcessFinishJobsForCategory(int category) { Job* job = m_categories[category].TryDequeueFinished(); while (job) { job->m_finishCallback(job); delete job; job = nullptr; job = m_categories[category].TryDequeueFinished(); } } //------------------------------------------------------------------------------------------------------------------------------ void JobSystem::AddJobForCategory(Job* job, int category) { m_categories[category].Enqueue(job); SignalWork(); } //------------------------------------------------------------------------------------------------------------------------------ void JobSystem::AddFinishedJobForCategory(Job* job, int category) { m_categories[category].EnqueueFinished(job); } //------------------------------------------------------------------------------------------------------------------------------ STATIC void JobSystem::GenericThreadWork() { //This is the work the generic thread needs to do JobSystem* system = JobSystem::GetInstance(); while (system->m_isRunning) { system->WaitForWork(); while (system->ProcessCategoryForTimeInMS(JOB_GENERIC, 5)); system->ProcessFinishJobsForCategory(JOB_GENERIC); Sleep(0); } }
//------------------------------------------------------------------------------------------------------------------------------ enum eJobCategory : int { JOB_GENERIC = 0, JOB_MAIN, JOB_RENDER, JOB_CATEGORY_CORE_COUNT, };
For this system to work, it also needs to Job Category class as well as the Job class to define the actual behavior of the work being carried out. Below is the implementations used for these classes:
class JobCategory { public: void Enqueue(Job* job); Job* TryDequeue(); Job* Dequeue(); void EnqueueFinished(Job* job); Job* TryDequeueFinished(); private: AsyncQueue<Job*> m_pendingQueue; AsyncQueue<Job*> m_finishQueue; };
//------------------------------------------------------------------------------------------------------------------------------ void JobCategory::Enqueue(Job* job) { m_pendingQueue.EnqueueLocked(job); } //------------------------------------------------------------------------------------------------------------------------------ Job* JobCategory::TryDequeue() { Job* jobReturned = nullptr; m_pendingQueue.DequeueLocked(&jobReturned); return jobReturned; } //------------------------------------------------------------------------------------------------------------------------------ Job* JobCategory::Dequeue() { Job* jobReturned = nullptr; bool result = false; while (!result) { result = m_pendingQueue.DequeueLocked(&jobReturned); } return jobReturned; } //------------------------------------------------------------------------------------------------------------------------------ void JobCategory::EnqueueFinished(Job* job) { m_finishQueue.EnqueueLocked(job); } //------------------------------------------------------------------------------------------------------------------------------ Job* JobCategory::TryDequeueFinished() { Job* jobReturned = nullptr; m_finishQueue.DequeueLocked(&jobReturned); return jobReturned; }
class Job { friend class JobSystem; public: Job(); ~Job(); using finishCallback = std::function<void(Job*)>; void AddSuccessor(Job* job); void AddPredecessor(Job* job); void Dispatch(); //NOTE: // To add a methods to callback // job->set_callback( [=]() { this->apply_path( job->path ); } ); // Otherwise we can simply bind static functions or stand alone functions with no extra lambda magic void SetFinishCallback(finishCallback callBack); void SetJobCategory(eJobCategory type); eJobCategory GetJobCategory(); virtual void Execute() = 0; private: bool TryStart(); void EnqueueForCategoryInSystem(); void EnqueueFinishedForCategoryInSystem(); void FinishJob(); int m_category = JOB_GENERIC; std::vector<Job*> m_successors; std::atomic<int> m_predecessorCount; finishCallback m_finishCallback; std::string m_event; std::mutex m_mutex; };
//------------------------------------------------------------------------------------------------------------------------------ Job::Job() { m_predecessorCount = 1; } //------------------------------------------------------------------------------------------------------------------------------ Job::~Job() { } //------------------------------------------------------------------------------------------------------------------------------ void Job::AddSuccessor(Job* job) { job->AddPredecessor(this); //m_successors.push_back(job); } //------------------------------------------------------------------------------------------------------------------------------ void Job::AddPredecessor(Job* job) { m_predecessorCount++; { std::scoped_lock lock(m_mutex); job->m_successors.push_back(this); } } //------------------------------------------------------------------------------------------------------------------------------ void Job::Dispatch() { TryStart(); } //------------------------------------------------------------------------------------------------------------------------------ void Job::SetFinishCallback(finishCallback callBack) { m_finishCallback = callBack; } //------------------------------------------------------------------------------------------------------------------------------ void Job::SetJobCategory(eJobCategory type) { m_category = type; } //------------------------------------------------------------------------------------------------------------------------------ eJobCategory Job::GetJobCategory() { return (eJobCategory)m_category; } //------------------------------------------------------------------------------------------------------------------------------ bool Job::TryStart() { int newNumPredecessors = --m_predecessorCount; ASSERT_RECOVERABLE(newNumPredecessors >= 0, "The number of predecessors is lesser than 0!"); if (0 == newNumPredecessors) { EnqueueForCategoryInSystem(); return true; } return false; } //------------------------------------------------------------------------------------------------------------------------------ void Job::EnqueueForCategoryInSystem() { JobSystem* system = JobSystem::GetInstance(); system->AddJobForCategory(this, m_category); } //------------------------------------------------------------------------------------------------------------------------------ void Job::EnqueueFinishedForCategoryInSystem() { JobSystem* system = JobSystem::GetInstance(); system->AddFinishedJobForCategory(this, m_category); } //------------------------------------------------------------------------------------------------------------------------------ void Job::FinishJob() { //Try Start all your successors std::vector<Job*>::iterator itr = m_successors.begin(); while (itr != m_successors.end()) { (*itr)->TryStart(); itr++; } if (m_finishCallback != nullptr) { EnqueueFinishedForCategoryInSystem(); } else { delete this; } }
With this framework in place, Prodigy Engine can enqueue jobs under a specific category to the job system. The support to be able to execute enqueued jobs under desired categories also allow for control over execution for specific categories as desired by the user.
Here is an example of a job incorporated using the Job base class highlighted above:
//------------------------------------------------------------------------------------------------------------------------------ class MandleBrotJob : public Job { public: MandleBrotJob(Image* image, uint rowNum, uint maxIterations); ~MandleBrotJob(); void Execute(); private: Image* m_image = nullptr; uint m_rowNum = 0U; uint m_maxIterations = 0U; }; //------------------------------------------------------------------------------------------------------------------------------ class UpdateTextureRowJob : public Job { public: UpdateTextureRowJob(Image* image, Texture2D* texture2D, uint rowNum); ~UpdateTextureRowJob(); void Execute(); private: uint m_rowNum = 0U; Image* m_image = nullptr; Texture2D* m_textureToUpdate = nullptr; };
//------------------------------------------------------------------------------------------------------------------------------ MandleBrotJob::MandleBrotJob(Image* image, uint rowNum, uint maxIterations) { m_rowNum = rowNum; m_image = image; m_maxIterations = maxIterations; } //------------------------------------------------------------------------------------------------------------------------------ MandleBrotJob::~MandleBrotJob() { } //------------------------------------------------------------------------------------------------------------------------------ static uint CheckMandlebrotSet(float x0, float y0, uint const MAX_ITERATIONS) { float x = 0.0f; float y = 0.0f; uint iteration = 0; while ((((x * x) + (y * y)) < 4.0f) && (iteration < MAX_ITERATIONS)) { float xtemp = x * x - y * y + x0; y = 2.0f * x * y + y0; x = xtemp; iteration++; } return iteration; } //------------------------------------------------------------------------------------------------------------------------------ void MandleBrotJob::Execute() { IntVec2 imageDim = m_image->GetImageDimensions(); // column stuff // scale image height to -1 to 1 range for mandelbrot float y0 = ((float)m_rowNum / (float)imageDim.y) * 2.0f - 1.0f; for (uint x = 0; x < (uint)imageDim.x; ++x) { //scale image width float x0 = ((float)x / (float)imageDim.x) * 3.5f - 2.5f; uint iterations = CheckMandlebrotSet(x0, y0, m_maxIterations); if (iterations == m_maxIterations) { m_image->SetTexelColor(x, m_rowNum, Rgba::BLACK); } else { m_image->SetTexelColor(x, m_rowNum, Rgba::WHITE); } } } //------------------------------------------------------------------------------------------------------------------------------ UpdateTextureRowJob::UpdateTextureRowJob(Image* image, Texture2D* texture2D, uint rowNum) { m_rowNum = rowNum; m_image = image; m_textureToUpdate = texture2D; } //------------------------------------------------------------------------------------------------------------------------------ UpdateTextureRowJob::~UpdateTextureRowJob() { } //------------------------------------------------------------------------------------------------------------------------------ void UpdateTextureRowJob::Execute() { // map the image ID3D11DeviceContext* d3dContext = g_renderContext->GetDXContext(); D3D11_MAPPED_SUBRESOURCE resource; ZeroMemory(&resource, sizeof(D3D11_MAPPED_SUBRESOURCE)); HRESULT hr = d3dContext->Map(m_textureToUpdate->m_handle, 0, D3D11_MAP_WRITE_DISCARD, 0U, &resource); if (SUCCEEDED(hr)) { // we're mapped! Copy over void* buffer = m_image->GetRawPointerToRow(m_rowNum); void* texBuffer = (unsigned char*)resource.pData + m_rowNum * resource.RowPitch; memcpy(texBuffer, buffer, m_image->GetImageDimensions().x * m_image->GetBytesPerPixel()); // unlock the resource (we're done writing) d3dContext->Unmap(m_textureToUpdate->m_handle, 0); } else { ERROR_RECOVERABLE("Couldn't write image from d3d Resource"); } }
Retrospectives
What went well 🙂
- Job system can run desired categories
- Implementation of job system and categories allow for further development and adding new job queues
What went wrong 🙁
- The current structure only allows jobs to have 1 callback after job execution
- Due to blocking calls, the job system may not be performance optimal
What I would do differently 🙂
- Definitely add support for more than 1 callbacks
- Implement a job profiler interface for profiling job times, system times and context switching time