You've already forked UnrealEngineUWP
mirror of
https://github.com/izzy2lost/UnrealEngineUWP.git
synced 2026-03-26 18:15:20 -07:00
* Added support for adding high priorty work * Changed so cache server puts bucket maintenance work at high priority [CL 34058891 by henrik karlsson in ue5-main branch]
197 lines
3.8 KiB
C++
197 lines
3.8 KiB
C++
// Copyright Epic Games, Inc. All Rights Reserved.
|
|
|
|
#include "UbaWorkManager.h"
|
|
#include "UbaEvent.h"
|
|
#include "UbaPlatform.h"
|
|
#include "UbaThread.h"
|
|
|
|
namespace uba
|
|
{
|
|
u32 WorkManager::TrackWorkStart(const tchar* desc)
|
|
{
|
|
if (auto t = m_workTracker.load())
|
|
return t->TrackWorkStart(desc);
|
|
return 0;
|
|
}
|
|
|
|
void WorkManager::TrackWorkEnd(u32 id)
|
|
{
|
|
if (auto t = m_workTracker.load())
|
|
return t->TrackWorkEnd(id);
|
|
}
|
|
|
|
|
|
struct WorkManagerImpl::Worker
|
|
{
|
|
Worker(WorkManagerImpl& manager) : m_workAvailable(false)
|
|
{
|
|
m_loop = true;
|
|
manager.PushWorker(this);
|
|
m_thread.Start([&]() { ThreadWorker(manager); return 0; });
|
|
}
|
|
~Worker()
|
|
{
|
|
}
|
|
|
|
void Stop()
|
|
{
|
|
m_loop = false;
|
|
m_workAvailable.Set();
|
|
m_thread.Wait();
|
|
}
|
|
|
|
void ThreadWorker(WorkManagerImpl& manager)
|
|
{
|
|
while (true)
|
|
{
|
|
if (!m_workAvailable.IsSet())
|
|
break;
|
|
if (!m_loop)
|
|
break;
|
|
|
|
while (true)
|
|
{
|
|
while (true)
|
|
{
|
|
SCOPED_WRITE_LOCK(manager.m_workLock, lock);
|
|
if (manager.m_work.empty())
|
|
break;
|
|
Work work = manager.m_work.front();
|
|
manager.m_work.pop_front();
|
|
lock.Leave();
|
|
|
|
TrackWorkScope tws(manager, work.desc.c_str());
|
|
work.func();
|
|
}
|
|
|
|
SCOPED_WRITE_LOCK(manager.m_availableWorkersLock, lock1);
|
|
SCOPED_READ_LOCK(manager.m_workLock, lock2);
|
|
if (!manager.m_work.empty())
|
|
continue;
|
|
|
|
manager.PushWorkerNoLock(this);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
Worker* m_nextWorker = nullptr;
|
|
Worker* m_prevWorker = nullptr;
|
|
Atomic<bool> m_loop;
|
|
Event m_workAvailable;
|
|
Thread m_thread;
|
|
};
|
|
|
|
WorkManagerImpl::WorkManagerImpl(u32 workerCount)
|
|
{
|
|
m_workers.resize(workerCount);
|
|
m_activeWorkerCount = workerCount;
|
|
for (u32 i = 0; i != workerCount; ++i)
|
|
m_workers[i] = new Worker(*this);
|
|
}
|
|
|
|
WorkManagerImpl::~WorkManagerImpl()
|
|
{
|
|
for (auto worker : m_workers)
|
|
worker->Stop();
|
|
for (auto worker : m_workers)
|
|
delete worker;
|
|
}
|
|
|
|
|
|
void WorkManagerImpl::AddWork(const Function<void()>& work, u32 count, const tchar* desc, bool highPriority)
|
|
{
|
|
SCOPED_WRITE_LOCK(m_workLock, lock);
|
|
bool trackWork = m_workTracker.load();
|
|
for (u32 i = 0; i != count; ++i)
|
|
{
|
|
if (highPriority)
|
|
{
|
|
m_work.push_front({ work });
|
|
if (trackWork)
|
|
m_work.front().desc = desc;
|
|
}
|
|
else
|
|
{
|
|
m_work.push_back({ work });
|
|
if (trackWork)
|
|
m_work.back().desc = desc;
|
|
}
|
|
}
|
|
lock.Leave();
|
|
|
|
SCOPED_WRITE_LOCK(m_availableWorkersLock, lock2);
|
|
while (count--)
|
|
{
|
|
Worker* worker = PopWorkerNoLock();
|
|
if (!worker)
|
|
break;
|
|
worker->m_workAvailable.Set();
|
|
}
|
|
}
|
|
|
|
u32 WorkManagerImpl::GetWorkerCount()
|
|
{
|
|
return u32(m_workers.size());
|
|
}
|
|
|
|
void WorkManagerImpl::PushWorker(Worker* worker)
|
|
{
|
|
SCOPED_WRITE_LOCK(m_availableWorkersLock, lock);
|
|
PushWorkerNoLock(worker);
|
|
}
|
|
|
|
void WorkManagerImpl::PushWorkerNoLock(Worker* worker)
|
|
{
|
|
if (m_firstAvailableWorker)
|
|
m_firstAvailableWorker->m_prevWorker = worker;
|
|
worker->m_prevWorker = nullptr;
|
|
worker->m_nextWorker = m_firstAvailableWorker;
|
|
m_firstAvailableWorker = worker;
|
|
--m_activeWorkerCount;
|
|
}
|
|
|
|
void WorkManagerImpl::DoWork(u32 count)
|
|
{
|
|
while (count--)
|
|
{
|
|
SCOPED_WRITE_LOCK(m_workLock, lock);
|
|
if (m_work.empty())
|
|
break;
|
|
Work work = m_work.front();
|
|
m_work.pop_front();
|
|
lock.Leave();
|
|
|
|
TrackWorkScope tws(*this, work.desc.c_str());
|
|
work.func();
|
|
}
|
|
}
|
|
|
|
void WorkManagerImpl::FlushWork()
|
|
{
|
|
while (true)
|
|
{
|
|
SCOPED_READ_LOCK(m_workLock, lock);
|
|
bool workEmpty = m_work.empty();
|
|
lock.Leave();
|
|
if (workEmpty)
|
|
break;
|
|
Sleep(5);
|
|
}
|
|
while (m_activeWorkerCount)
|
|
Sleep(5);
|
|
}
|
|
|
|
WorkManagerImpl::Worker* WorkManagerImpl::PopWorkerNoLock()
|
|
{
|
|
Worker* worker = m_firstAvailableWorker;
|
|
if (!worker)
|
|
return nullptr;
|
|
m_firstAvailableWorker = worker->m_nextWorker;
|
|
if (m_firstAvailableWorker)
|
|
m_firstAvailableWorker->m_prevWorker = nullptr;
|
|
++m_activeWorkerCount;
|
|
return worker;
|
|
}
|
|
}
|