You've already forked UnrealEngineUWP
mirror of
https://github.com/izzy2lost/UnrealEngineUWP.git
synced 2026-03-26 18:15:20 -07:00
* Implemented RunProcess which can be used to spawn and wait on raw processes (not detoured). (Note, calling program needs to report file system modifications since it is not known to UBA) * Changed so UBAExecutor is using UBA for non-detoured processes as well. This will give correct profiling tracking and reuse existing code. * Fixed handle leak in windows * Fixed race condition on linux since wordexp() is n ot thread safe * Added tests * New binaries [CL 30216046 by henrik karlsson in ue5-main branch]
1634 lines
47 KiB
C++
1634 lines
47 KiB
C++
// Copyright Epic Games, Inc. All Rights Reserved.
|
|
|
|
#include "UbaSessionClient.h"
|
|
#include "UbaNetworkClient.h"
|
|
#include "UbaNetworkMessage.h"
|
|
#include "UbaProcess.h"
|
|
#include "UbaProtocol.h"
|
|
#include "UbaStorage.h"
|
|
|
|
namespace uba
|
|
{
|
|
SessionClient::SessionClient(const SessionClientCreateInfo& info)
|
|
: Session(info, TC("UbaSessionClient"), true)
|
|
, m_client(info.client)
|
|
, m_name(info.name.data)
|
|
, m_waitToSendEvent(false)
|
|
, m_loop(true)
|
|
{
|
|
m_maxProcessCount = info.maxProcessCount;
|
|
m_dedicated = info.dedicated;
|
|
m_useStorage = info.useStorage;
|
|
m_defaultPriorityClass = info.defaultPriorityClass;
|
|
m_outputStatsThresholdMs = info.outputStatsThresholdMs;
|
|
m_maxIdleSeconds = info.maxIdleSeconds;
|
|
m_disableCustomAllocator = info.disableCustomAllocator;
|
|
m_useBinariesAsVersion = info.useBinariesAsVersion;
|
|
m_memWaitLoadPercent = info.memWaitLoadPercent;
|
|
m_memKillLoadPercent = info.memKillLoadPercent;
|
|
|
|
if (m_name.IsEmpty())
|
|
{
|
|
tchar buf[256];
|
|
if (GetComputerNameW(buf, sizeof_array(buf)))
|
|
m_name.Appendf(TC("%s"), buf);
|
|
}
|
|
|
|
m_processWorkingDir.Append(m_rootDir).Append(TC("empty"));// + TC("empty\\");
|
|
m_storage.CreateDirectory(m_processWorkingDir.data);
|
|
m_processWorkingDir.EnsureEndsWithSlash();
|
|
|
|
if (info.killRandom)
|
|
{
|
|
Guid g;
|
|
CreateGuid(g);
|
|
m_killRandomIndex = 10 + g.data1 % 30;
|
|
}
|
|
|
|
m_client.RegisterOnConnected([this]()
|
|
{
|
|
Connect();
|
|
});
|
|
|
|
m_client.RegisterOnDisconnected([this]()
|
|
{
|
|
m_loop = false;
|
|
});
|
|
|
|
m_nameToHashTableMem.Init(NameToHashMemSize);
|
|
}
|
|
|
|
SessionClient::~SessionClient()
|
|
{
|
|
CancelAllProcessesAndWait();
|
|
m_loop = false;
|
|
m_waitToSendEvent.Set();
|
|
m_loopThread.Wait();
|
|
}
|
|
|
|
bool SessionClient::Wait(u32 milliseconds, Event* wakeupEvent)
|
|
{
|
|
return m_loopThread.Wait(milliseconds, wakeupEvent);
|
|
}
|
|
|
|
void SessionClient::SetIsTerminating(const tchar* reason, u64 delayMs)
|
|
{
|
|
m_terminationTime = GetTime() + MsToTime(delayMs);
|
|
m_terminationReason = reason;
|
|
}
|
|
|
|
u64 SessionClient::GetBestPing()
|
|
{
|
|
return m_bestPing;
|
|
}
|
|
|
|
bool SessionClient::RetrieveCasFile(CasKey& outNewKey, u64& outSize, const CasKey& casKey, const tchar* hint, bool willBeUsedUncompressed, bool allowProxy)
|
|
{
|
|
TimerScope s(m_stats.storageRetrieve);
|
|
CasKey tempKey = casKey;
|
|
|
|
#if UBA_USE_SPARSEFILE
|
|
willBeUsedUncompressed = false;
|
|
#endif
|
|
|
|
if (willBeUsedUncompressed)
|
|
tempKey = AsCompressed(casKey, false);
|
|
//else
|
|
// willBeUsedUncompressed = !IsCompressed(casKey);
|
|
|
|
Storage::RetrieveResult result;
|
|
bool res = m_storage.RetrieveCasFile(result, tempKey, hint, nullptr, 1, allowProxy);
|
|
outNewKey = result.casKey;
|
|
outSize = result.size;
|
|
return res;
|
|
}
|
|
|
|
bool SessionClient::GetCasKeyForFile(CasKey& out, u32 processId, const StringBufferBase& fileName, const StringKey& fileNameKey)
|
|
{
|
|
TimerScope waitTimer(Stats().waitGetFileMsg);
|
|
ScopedWriteLock lock(m_nameToHashLookupLock);
|
|
auto insres = m_nameToHashLookup.try_emplace(fileNameKey);
|
|
HashRec& rec = insres.first->second;
|
|
lock.Leave();
|
|
ScopedWriteLock lock2(rec.lock);
|
|
if (rec.key == CasKeyZero)//!rec.serverTime)
|
|
{
|
|
waitTimer.Cancel();
|
|
|
|
// These will never succeed
|
|
if (fileName.StartsWith(m_sessionBinDir.data) || fileName.StartsWith(TC("c:\\noenvironment")) || fileName.StartsWith(m_processWorkingDir.data))
|
|
{
|
|
out = CasKeyZero;
|
|
return true;
|
|
}
|
|
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_GetFileFromServer, writer);
|
|
writer.WriteU32(processId);
|
|
writer.WriteString(fileName);
|
|
writer.WriteStringKey(fileNameKey);
|
|
|
|
StackBinaryReader<128> reader;
|
|
if (!msg.Send(reader, Stats().getFileMsg))
|
|
return false;
|
|
|
|
rec.key = reader.ReadCasKey();
|
|
rec.serverTime = reader.ReadU64();
|
|
}
|
|
out = rec.key;
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::EnsureBinaryFile(StringBufferBase& out, StringBufferBase& outVirtual, u32 processId, const StringBufferBase& fileName, const StringKey& fileNameKey, const tchar* applicationDir)
|
|
{
|
|
CasKey casKey;
|
|
u32 fileAttributes = DefaultAttributes(); // TODO: This is wrong.. need to retrieve from server if this is executable or not
|
|
if (fileName[1] == ':')
|
|
{
|
|
if (!GetCasKeyForFile(casKey, processId, fileName, fileNameKey))
|
|
return false;
|
|
const tchar* lastSlash = TStrrchr(fileName.data, PathSeparator);
|
|
outVirtual.Append(lastSlash + 1);
|
|
}
|
|
else
|
|
{
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_EnsureBinaryFile, writer);
|
|
writer.WriteU32(processId);
|
|
writer.WriteString(fileName);
|
|
writer.WriteStringKey(fileNameKey);
|
|
writer.WriteString(applicationDir);
|
|
|
|
StackBinaryReader<1024> reader;
|
|
if (!msg.Send(reader, Stats().getBinaryMsg))
|
|
return false;
|
|
|
|
casKey = reader.ReadCasKey();
|
|
reader.ReadString(outVirtual);
|
|
}
|
|
|
|
if (casKey == CasKeyZero)
|
|
{
|
|
out.Append(fileName);
|
|
return true;
|
|
}
|
|
bool willBeUsedUncompressed = true;
|
|
CasKey newKey;
|
|
u64 fileSize;
|
|
if (!RetrieveCasFile(newKey, fileSize, casKey, outVirtual.data, willBeUsedUncompressed))
|
|
UBA_ASSERTF(false, TC("Casfile not found for %s using %s"), outVirtual.data, CasKeyString(casKey).str);
|
|
StringBuffer<> destFile;
|
|
if (fileName[1] == ':')
|
|
destFile.AppendFileName(fileName.data);
|
|
else
|
|
destFile.Append(fileName);
|
|
|
|
StringBuffer<> applicationDirLower;
|
|
applicationDirLower.Append(applicationDir).MakeLower();
|
|
KeyToString keyStr(ToStringKey(applicationDirLower));
|
|
|
|
return WriteBinFile(out, destFile.data, newKey, keyStr, fileAttributes);
|
|
}
|
|
|
|
bool SessionClient::PrepareProcess(const ProcessStartInfo& startInfo, bool isChild, StringBufferBase& outRealApplication, const tchar*& outRealWorkingDir)
|
|
{
|
|
outRealApplication.Clear();
|
|
outRealWorkingDir = m_processWorkingDir.data;
|
|
return EnsureApplicationEnvironment(outRealApplication, 0, startInfo.application);
|
|
}
|
|
|
|
u64 SessionClient::GetMemoryMapAlignment(const tchar* fileName, u64 fileNameLen) const
|
|
{
|
|
u64 alignment = Session::GetMemoryMapAlignment(fileName, fileNameLen);
|
|
if (!alignment && !m_useStorage && m_allowMemoryMaps)
|
|
return 64 * 1024;
|
|
return alignment;
|
|
}
|
|
|
|
bool SessionClient::EnsureApplicationEnvironment(StringBufferBase& out, u32 processId, const tchar* application)
|
|
{
|
|
StringBuffer<> applicationDir;
|
|
applicationDir.AppendDir(application);
|
|
KeyToString keyStr(ToStringKeyLower(applicationDir));
|
|
|
|
UBA_ASSERT(application && *application);
|
|
ScopedWriteLock lock(m_handledApplicationEnvironmentsLock);
|
|
auto insres = m_handledApplicationEnvironments.insert(application);
|
|
if (insres.second)
|
|
{
|
|
auto failGuard = MakeGuard([&]() { m_handledApplicationEnvironments.erase(insres.first); });
|
|
|
|
StackBinaryReader<SendMaxSize> reader;
|
|
{
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_GetApplication, writer);
|
|
writer.WriteU32(processId);
|
|
writer.WriteString(application);
|
|
if (!msg.Send(reader, m_stats.getApplicationMsg))
|
|
return false;
|
|
}
|
|
|
|
u32 serverSystemPathLen = reader.ReadU32();
|
|
u32 moduleCount = reader.ReadU32();
|
|
if (moduleCount == 0)
|
|
return m_logger.Error(TC("Application %s not found"), application);
|
|
|
|
while (moduleCount--)
|
|
{
|
|
StringBuffer<> moduleFile;
|
|
reader.ReadString(moduleFile);
|
|
u32 fileAttributes = reader.ReadU32();
|
|
bool isSystem = reader.ReadBool();
|
|
|
|
CasKey casKey = reader.ReadCasKey();
|
|
if (casKey == CasKeyZero)
|
|
return m_logger.Error(TC("Bad CasKey for %s (%s)"), moduleFile.data, CasKeyString(casKey).str);
|
|
|
|
if (isSystem)
|
|
{
|
|
StringBuffer<> localSystemModule;
|
|
localSystemModule.Append(m_systemPath).Append(moduleFile.data + serverSystemPathLen);
|
|
if (FileExists(m_logger, localSystemModule.data))
|
|
continue;
|
|
}
|
|
|
|
CasKey newCasKey;
|
|
bool willBeUsedUncompressed = true;
|
|
u64 fileSize;
|
|
if (!RetrieveCasFile(newCasKey, fileSize, casKey, moduleFile.data, willBeUsedUncompressed))
|
|
return m_logger.Error(TC("Casfile not found for %s (%s)"), moduleFile.data, CasKeyString(casKey).str);
|
|
|
|
const tchar* moduleName = moduleFile.data;
|
|
if (const tchar* lastSeparator = TStrrchr(moduleFile.data, PathSeparator))
|
|
moduleName = lastSeparator + 1;
|
|
|
|
StringBuffer<> temp;
|
|
if (!WriteBinFile(temp, moduleName, newCasKey, keyStr, fileAttributes))
|
|
return false;
|
|
}
|
|
|
|
failGuard.Cancel();
|
|
}
|
|
out.Append(m_sessionBinDir).Append(keyStr).Append(PathSeparator).AppendFileName(application);
|
|
return true;
|
|
}
|
|
|
|
void* SessionClient::GetProcessEnvironmentVariables()
|
|
{
|
|
UBA_ASSERT(!m_environmentVariables.empty());
|
|
return m_environmentVariables.data();
|
|
}
|
|
|
|
bool SessionClient::WriteBinFile(StringBufferBase& out, const tchar* binaryName, const CasKey& casKey, const KeyToString& applicationDir, u32 fileAttributes)
|
|
{
|
|
UBA_ASSERT(fileAttributes);
|
|
|
|
out.Append(m_sessionBinDir);
|
|
out.Append(applicationDir).Append(PathSeparator);
|
|
|
|
StringBuffer<> lower;
|
|
lower.Append(applicationDir).Append(PathSeparator).Append(binaryName);
|
|
lower.MakeLower();
|
|
ScopedWriteLock lock(m_binFileLock);
|
|
|
|
auto insres = m_writtenBinFiles.try_emplace(lower.data, casKey);
|
|
if (!insres.second)
|
|
{
|
|
out.Append(binaryName);
|
|
if (insres.first->second != casKey)
|
|
return m_logger.Error(TC("Writing same binary file %s multiple times but with different data! (Current: %s Previous: %s)"), out.data, CasKeyString(casKey).str, CasKeyString(insres.first->second).str);
|
|
return true;
|
|
}
|
|
|
|
m_storage.CreateDirectory(out.data);
|
|
out.Append(binaryName);
|
|
|
|
//if (GetFileAttributesW(out.data) != INVALID_FILE_ATTRIBUTES)
|
|
// return true;
|
|
|
|
if (TStrchr(binaryName, PathSeparator))
|
|
{
|
|
StringBuffer<> binaryDir;
|
|
binaryDir.AppendDir(out);
|
|
if (!m_storage.CreateDirectory(binaryDir.data))
|
|
return false;
|
|
}
|
|
return m_storage.CopyOrLink(casKey, out.data, fileAttributes);
|
|
}
|
|
|
|
bool SessionClient::CreateFile(CreateFileResponse& out, const CreateFileMessage& msg, const tchar* virtualApplicationDir)
|
|
{
|
|
const StringBufferBase& fileName = msg.fileName;
|
|
StringKey fileNameKey = msg.fileNameKey;
|
|
|
|
if ((msg.access & FileAccess_Write) != 0)
|
|
{
|
|
if (fileName.EndsWith(TC(".o.tmp")) || fileName.EndsWith(TC(".obj.tmp"))) // This is clang's temporary object file that is moved into place
|
|
{
|
|
#if PLATFORM_WINDOWS
|
|
out.fileName.Append(m_sessionDir).Append(TC("temp\\")).Append(KeyToString(fileNameKey));
|
|
#else
|
|
out.fileName.Append(fileName);
|
|
#endif
|
|
return true;
|
|
}
|
|
|
|
return Session::CreateFile(out, msg, virtualApplicationDir);
|
|
}
|
|
|
|
CasKey casKey;
|
|
|
|
if (!GetCasKeyForFile(casKey, msg.process.m_id, fileName, fileNameKey))
|
|
return false;
|
|
|
|
// Not finding a file is a valid path. Some applications try with a path and if fails try another path
|
|
if (casKey == CasKeyZero)
|
|
{
|
|
//m_logger.Warning(TC("No casfile found for %s"), fileName.data);
|
|
out.directoryTableSize = GetDirectoryTableSize();
|
|
out.mappedFileTableSize = GetFileMappingSize();
|
|
out.fileName.Append(fileName);
|
|
return true;
|
|
}
|
|
|
|
StringBuffer<> newName;
|
|
bool isDir = casKey == CasKeyIsDirectory;
|
|
u64 fileSize = InvalidValue;
|
|
CasKey newCasKey;
|
|
u64 memoryMapAlignment = GetMemoryMapAlignment(fileName.data, fileName.count);
|
|
|
|
if (isDir)
|
|
{
|
|
newName.Append(TC("$d"));
|
|
}
|
|
else if (casKey != CasKeyZero)
|
|
{
|
|
if (m_useStorage || memoryMapAlignment == 0)
|
|
{
|
|
bool willBeUsedUncompressed = memoryMapAlignment == 0;
|
|
if (!RetrieveCasFile(newCasKey, fileSize, casKey, fileName.data, willBeUsedUncompressed, !IsRarelyRead(msg.process, fileName)))
|
|
return m_logger.Error(TC("Error retrieving cas entry %s (%s)"), CasKeyString(casKey).str, fileName.data);
|
|
|
|
#if !UBA_USE_SPARSEFILE
|
|
if (!m_storage.GetCasFileName(newName, newCasKey))
|
|
return false;
|
|
#else
|
|
if (!memoryMapAlignment)
|
|
memoryMapAlignment = 4096;
|
|
MemoryMap map;
|
|
if (!CreateMemoryMapFromView(map, fileNameKey, fileName.data, newCasKey, memoryMapAlignment))
|
|
return false;
|
|
newName.Append(map.name);
|
|
fileSize = map.size;
|
|
#endif
|
|
}
|
|
else
|
|
{
|
|
StorageStats& stats = m_storage.Stats();
|
|
TimerScope ts(stats.ensureCas);
|
|
|
|
ScopedWriteLock lookupLock(m_fileMappingTableLookupLock);
|
|
auto insres = m_fileMappingTableLookup.try_emplace(fileNameKey);
|
|
FileMappingEntry& entry = insres.first->second;
|
|
lookupLock.Leave();
|
|
|
|
ScopedWriteLock entryCs(entry.lock);
|
|
ts.Leave();
|
|
|
|
if (entry.handled)
|
|
{
|
|
if (!entry.success)
|
|
return false;
|
|
}
|
|
else
|
|
{
|
|
casKey = AsCompressed(casKey, false);
|
|
entry.handled = true;
|
|
Storage::RetrieveResult result;
|
|
if (!m_storage.RetrieveCasFile(result, casKey, fileName.data, &m_fileMappingBuffer, memoryMapAlignment, !IsRarelyRead(msg.process, fileName)))
|
|
return m_logger.Error(TC("Error retrieving cas entry %s (%s)"), CasKeyString(casKey).str, fileName.data);
|
|
entry.success = true;
|
|
entry.size = result.size;
|
|
entry.mapping = result.view.handle;
|
|
entry.mappingOffset = result.view.offset;
|
|
}
|
|
|
|
fileSize = entry.size;
|
|
if (entry.mapping.IsValid())
|
|
Storage::GetMappingString(newName, entry.mapping, entry.mappingOffset);
|
|
else
|
|
newName.Append(entry.isDir ? TC("$d") : TC("$f"));
|
|
}
|
|
}
|
|
|
|
UBA_ASSERTF(!newName.IsEmpty(), TC("No casfile available for %s using %s"), fileName.data, CasKeyString(casKey).str);
|
|
|
|
if (newName[0] != '^')
|
|
{
|
|
if (!isDir && memoryMapAlignment)
|
|
{
|
|
MemoryMap map;
|
|
if (!CreateMemoryMapFromFile(map, fileNameKey, newName.data, IsCompressed(newCasKey), memoryMapAlignment))
|
|
return false;
|
|
|
|
fileSize = map.size;
|
|
newName.Clear().Append(map.name);
|
|
}
|
|
else if (!IsRarelyRead(msg.process, fileName))
|
|
{
|
|
AddFileMapping(fileNameKey, fileName.data, newName.data, fileSize);
|
|
}
|
|
}
|
|
|
|
out.directoryTableSize = GetDirectoryTableSize();
|
|
out.mappedFileTableSize = GetFileMappingSize();
|
|
out.fileName.Append(newName);
|
|
out.size = fileSize;
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::SendFiles(ProcessImpl& process, Timer& sendFiles)
|
|
{
|
|
StorageStatsScope storageStatsScope(process.m_storageStats);
|
|
for (auto& pair : process.m_writtenFiles)
|
|
{
|
|
TimerScope timer(sendFiles);
|
|
#ifdef _DEBUG
|
|
if (!pair.second.mappingHandle.IsValid())
|
|
m_logger.Warning(TC("%s is not using file mapping"), pair.first.c_str());
|
|
#endif
|
|
if (!SendFile(process, pair.second, pair.first.c_str()))
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::SendFile(ProcessImpl& process, WrittenFile& source, const tchar* destination)
|
|
{
|
|
bool keepMappingInMemory = IsWindows && !IsRarelyReadAfterWritten(process, destination, TStrlen(destination));
|
|
|
|
CasKey casKey;
|
|
{
|
|
TimerScope ts(m_stats.storageSend);
|
|
bool deferCreation = false;
|
|
if (!m_storage.StoreCasFile(casKey, source.key, source.name.c_str(), source.mappingHandle, 0, source.mappingWritten, destination, deferCreation, keepMappingInMemory))
|
|
return false;
|
|
}
|
|
UBA_ASSERTF(casKey != CasKeyZero, TC("Failed to store cas file for %s (destination %s)"), source.name.c_str(), destination);
|
|
|
|
CloseFileMapping(source.mappingHandle);
|
|
source.mappingHandle = {};
|
|
|
|
StackBinaryReader<128> reader;
|
|
{
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_SendFileToServer, writer);
|
|
writer.WriteU32(process.GetId());
|
|
writer.WriteString(destination);
|
|
writer.WriteStringKey(source.key);
|
|
writer.WriteU32(source.attributes);
|
|
writer.WriteCasKey(casKey);
|
|
if (!msg.Send(reader, Stats().sendFileMsg))
|
|
return m_logger.Error(TC("Failed to send file %s to server"), source.name.c_str());
|
|
}
|
|
if (!reader.ReadBool())
|
|
return m_logger.Error(TC("Server failed to receive file %s"), source.name.c_str());
|
|
return true;
|
|
}
|
|
|
|
void RemoveWrittenFile(ProcessImpl& process, const TString& name);
|
|
|
|
bool SessionClient::DeleteFile(DeleteFileResponse& out, const DeleteFileMessage& msg)
|
|
{
|
|
// TODO: Deleting output files should also delete them on disk (for now they will leak until process shutdown)
|
|
RemoveWrittenFile(msg.process, msg.fileName.data);
|
|
|
|
bool sendDelete = true;
|
|
if (msg.closeId != 0)
|
|
{
|
|
UBA_ASSERTF(false, TC("This has not been tested properly"));
|
|
ScopedWriteLock lock(m_activeFilesLock);
|
|
sendDelete = m_activeFiles.erase(msg.closeId) == 0;
|
|
}
|
|
|
|
{
|
|
ScopedWriteLock lock(m_outputFilesLock);
|
|
sendDelete = m_outputFiles.erase(msg.fileName.data) == 0 && sendDelete;
|
|
}
|
|
|
|
bool isTemp = StartsWith(msg.fileName.data, m_tempPath.data);
|
|
if (isTemp)
|
|
sendDelete = false;
|
|
|
|
if (!sendDelete)
|
|
{
|
|
if (!m_allowMemoryMaps && isTemp)
|
|
{
|
|
out.result = uba::DeleteFileW(msg.fileName.data);
|
|
out.errorCode = GetLastError();
|
|
return true;
|
|
}
|
|
out.result = true;
|
|
out.errorCode = ERROR_SUCCESS;
|
|
return true;
|
|
}
|
|
|
|
// TODO: Cache this if it becomes noisy
|
|
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage networkMsg(m_client, ServiceId, SessionMessageType_DeleteFile, writer);
|
|
writer.WriteStringKey(msg.fileNameKey);
|
|
writer.WriteString(msg.fileName);
|
|
StackBinaryReader<1024> reader;
|
|
if (!networkMsg.Send(reader, Stats().deleteFileMsg))
|
|
return false;
|
|
out.result = reader.ReadBool();
|
|
out.errorCode = reader.ReadU32();
|
|
if (out.result)
|
|
if (!SendUpdateDirectoryTable())
|
|
return false;
|
|
out.directoryTableSize = GetDirectoryTableSize();
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::CopyFile(CopyFileResponse& out, const CopyFileMessage& msg)
|
|
{
|
|
ScopedWriteLock lock(m_outputFilesLock);
|
|
auto findIt = m_outputFiles.find(msg.fromName.data);
|
|
if (findIt == m_outputFiles.end())
|
|
{
|
|
lock.Leave();
|
|
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage networkMsg(m_client, ServiceId, SessionMessageType_CopyFile, writer);
|
|
writer.WriteStringKey(msg.fromKey);
|
|
writer.WriteString(msg.fromName);
|
|
writer.WriteStringKey(msg.toKey);
|
|
writer.WriteString(msg.toName);
|
|
StackBinaryReader<1024> reader;
|
|
if (!networkMsg.Send(reader, Stats().copyFileMsg))
|
|
return false;
|
|
out.fromName.Append(msg.fromName);
|
|
out.toName.Append(msg.toName);
|
|
out.closeId = ~0u;
|
|
out.errorCode = reader.ReadU32();
|
|
if (!out.errorCode)
|
|
if (!SendUpdateDirectoryTable())
|
|
return false;
|
|
out.directoryTableSize = GetDirectoryTableSize();
|
|
return true;
|
|
}
|
|
lock.Leave();
|
|
|
|
out.fromName.Append(findIt->second);
|
|
|
|
CreateFileMessage writeMsg { msg.process };
|
|
writeMsg.fileName.Append(msg.toName.data);
|
|
writeMsg.fileNameKey = msg.toKey;
|
|
writeMsg.access = FileAccess_Write;
|
|
CreateFileResponse writeOut;
|
|
if (!CreateFile(writeOut, writeMsg, nullptr))
|
|
return false;
|
|
|
|
out.toName.Append(writeOut.fileName);
|
|
out.closeId = writeOut.closeId;
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::MoveFile(MoveFileResponse& out, const MoveFileMessage& msg)
|
|
{
|
|
const tchar* fromName = msg.fromName.data;
|
|
|
|
{
|
|
ScopedWriteLock lock(msg.process.m_writtenFilesLock);
|
|
auto& writtenFiles = msg.process.m_writtenFiles;
|
|
auto findIt = writtenFiles.find(fromName);
|
|
if (findIt != writtenFiles.end())
|
|
{
|
|
auto insres = writtenFiles.try_emplace(msg.toName.data);
|
|
UBA_ASSERT(insres.second);
|
|
insres.first->second = findIt->second;
|
|
insres.first->second.owner = &msg.process;
|
|
writtenFiles.erase(findIt);
|
|
}
|
|
}
|
|
|
|
bool sendMove = true;
|
|
{
|
|
ScopedWriteLock lock(m_outputFilesLock);
|
|
auto findIt = m_outputFiles.find(msg.fromName.data);
|
|
if (findIt != m_outputFiles.end())
|
|
{
|
|
auto insres = m_outputFiles.try_emplace(msg.toName.data);
|
|
UBA_ASSERT(insres.second);
|
|
insres.first->second = findIt->second;
|
|
m_outputFiles.erase(findIt);
|
|
sendMove = false;
|
|
}
|
|
}
|
|
|
|
if (!sendMove)
|
|
{
|
|
out.result = true;
|
|
out.errorCode = ERROR_SUCCESS;
|
|
return true;
|
|
}
|
|
|
|
out.result = uba::MoveFileExW(fromName, msg.toName.data, 0);
|
|
out.errorCode = GetLastError();
|
|
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::Chmod(ChmodResponse& out, const ChmodMessage& msg)
|
|
{
|
|
const tchar* fromName = msg.fileName.data;
|
|
|
|
{
|
|
ScopedWriteLock lock(msg.process.m_writtenFilesLock);
|
|
auto& writtenFiles = msg.process.m_writtenFiles;
|
|
auto findIt = writtenFiles.find(fromName);
|
|
if (findIt != writtenFiles.end())
|
|
{
|
|
bool executable = false;
|
|
#if !PLATFORM_WINDOWS
|
|
if (msg.fileMode & S_IXUSR)
|
|
executable = true;
|
|
#endif
|
|
findIt->second.attributes = DefaultAttributes(executable);
|
|
out.errorCode = 0;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
UBA_ASSERTF(false, TC("Code path not implemented.. should likely send message to server"));
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::CreateDirectory(CreateDirectoryResponse& out, const CreateDirectoryMessage& msg)
|
|
{
|
|
// TODO: Cache this if it becomes noisy
|
|
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage networkMsg(m_client, ServiceId, SessionMessageType_CreateDirectory, writer);
|
|
writer.WriteString(msg.name);
|
|
StackBinaryReader<1024> reader;
|
|
if (!networkMsg.Send(reader, Stats().createDirMsg))
|
|
return false;
|
|
out.result = reader.ReadBool();
|
|
out.errorCode = reader.ReadU32();
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::GetFullFileName(GetFullFileNameResponse& out, const GetFullFileNameMessage& msg, const tchar* virtualApplicationDir)
|
|
{
|
|
ScopedWriteLock lock(m_nameToNameLookupLock);
|
|
auto insres = m_nameToNameLookup.try_emplace(msg.fileName.data);
|
|
NameRec& rec = insres.first->second;
|
|
lock.Leave();
|
|
ScopedWriteLock lock2(rec.lock);
|
|
|
|
if (rec.handled)
|
|
{
|
|
out.fileName.Append(rec.name.c_str());
|
|
out.virtualFileName.Append(rec.virtualName.c_str());
|
|
return true;
|
|
}
|
|
rec.handled = true;
|
|
|
|
if (!EnsureBinaryFile(out.fileName, out.virtualFileName, msg.process.m_id, msg.fileName, msg.fileNameKey, virtualApplicationDir))
|
|
return false;
|
|
|
|
rec.name = out.fileName.data;
|
|
rec.virtualName = out.virtualFileName.data;
|
|
out.mappedFileTableSize = AddFileMapping(msg.fileNameKey, msg.fileName.data, out.fileName.data);
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::GetListDirectoryInfo(ListDirectoryResponse& out, tchar* dirName, const StringKey& dirKey)
|
|
{
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_ListDirectory, writer);
|
|
writer.WriteU32(m_sessionId);
|
|
writer.WriteString(dirName);
|
|
writer.WriteStringKey(dirKey);
|
|
|
|
StackBinaryReader<SendMaxSize> reader;
|
|
|
|
if (!msg.Send(reader, Stats().listDirMsg))
|
|
return false;
|
|
|
|
u32 tableOffset = reader.ReadU32();
|
|
|
|
u32 oldTableSize = GetDirectoryTableSize();
|
|
if (!UpdateDirectoryTableFromServer(reader))
|
|
return false;
|
|
u32 newTableSize = GetDirectoryTableSize();
|
|
|
|
// Ask for a refresh of hashes straight away since they will likely be asked for by the process doing this query
|
|
if (oldTableSize != newTableSize)
|
|
m_waitToSendEvent.Set();
|
|
|
|
out.tableOffset = tableOffset;
|
|
out.tableSize = newTableSize;
|
|
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::WriteFileToDisk(ProcessImpl& process, WrittenFile& file)
|
|
{
|
|
// Do nothing, we will send the data to the host when process is finished
|
|
return true;
|
|
}
|
|
|
|
|
|
struct SessionClient::ActiveUpdateDirectoryEntry
|
|
{
|
|
Event done;
|
|
u32 readPos = 0;
|
|
ActiveUpdateDirectoryEntry* prev = nullptr;
|
|
ActiveUpdateDirectoryEntry* next = nullptr;
|
|
|
|
static bool Wait(SessionClient& client, ActiveUpdateDirectoryEntry*& first, ScopedWriteLock& lock, u32 readPos)
|
|
{
|
|
ActiveUpdateDirectoryEntry item;
|
|
item.next = first;
|
|
if (item.next)
|
|
item.next->prev = &item;
|
|
item.readPos = readPos;
|
|
first = &item;
|
|
item.done.Create(true);
|
|
lock.Leave();
|
|
while (!item.done.IsSet(10000))
|
|
{
|
|
client.m_logger.Error(TC("Timed out waiting for update directory message"));
|
|
return false;
|
|
}
|
|
lock.Enter();
|
|
if (item.prev)
|
|
item.prev->next = item.next;
|
|
else
|
|
first = item.next;
|
|
if (item.next)
|
|
item.next->prev = item.prev;
|
|
return true;
|
|
}
|
|
|
|
static void UpdateReadPosMatching(ActiveUpdateDirectoryEntry*& first, u32 readPos)
|
|
{
|
|
for (auto i = first; i; i = i->next)
|
|
{
|
|
if (i->readPos != readPos)
|
|
continue;
|
|
i->done.Set();
|
|
break;
|
|
}
|
|
}
|
|
|
|
static void UpdateReadPosLess(ActiveUpdateDirectoryEntry*& first, u32 readPos)
|
|
{
|
|
for (auto i = first; i; i = i->next)
|
|
if (i->readPos <= readPos)
|
|
i->done.Set();
|
|
}
|
|
};
|
|
|
|
bool SessionClient::UpdateDirectoryTableFromServer(BinaryReader& reader)
|
|
{
|
|
auto& dirTable = m_directoryTable;
|
|
|
|
bool isFirst = true;
|
|
while (true)
|
|
{
|
|
if (!isFirst)
|
|
{
|
|
reader.SetPosition(0);
|
|
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_GetDirectoriesFromServer, writer);
|
|
writer.WriteU32(m_sessionId);
|
|
if (!msg.Send(reader, Stats().getDirsMsg))
|
|
return false;
|
|
}
|
|
|
|
u32 readPos = reader.ReadU32();
|
|
|
|
u8* pos = dirTable.m_memory + readPos;
|
|
u32 toRead = u32(reader.GetLeft());
|
|
|
|
ScopedWriteLock lock(m_directoryTableLock);
|
|
|
|
if (toRead == 0)
|
|
{
|
|
// We might share this position with others
|
|
if (readPos > dirTable.m_memorySize)
|
|
if (!ActiveUpdateDirectoryEntry::Wait(*this, m_firstEmptyWait, lock, readPos))
|
|
return false;
|
|
return true;
|
|
}
|
|
|
|
reader.ReadBytes(pos, toRead);
|
|
|
|
if (readPos != m_directoryTableMemPos)
|
|
if (!ActiveUpdateDirectoryEntry::Wait(*this, m_firstReadWait, lock, readPos))
|
|
return false;
|
|
|
|
m_directoryTableMemPos += toRead;
|
|
|
|
ActiveUpdateDirectoryEntry::UpdateReadPosMatching(m_firstReadWait, m_directoryTableMemPos);
|
|
|
|
if (reader.GetPosition() < m_client.GetMessageMaxSize() - m_client.GetMessageReceiveHeaderSize())
|
|
{
|
|
dirTable.ParseDirectoryTable(m_directoryTableMemPos);
|
|
ActiveUpdateDirectoryEntry::UpdateReadPosLess(m_firstEmptyWait, m_directoryTableMemPos);
|
|
break;
|
|
}
|
|
isFirst = false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::UpdateNameToHashTableFromServer(BinaryReader& reader)
|
|
{
|
|
u32 serverTableSize = 0;
|
|
bool isFirst = true;
|
|
u32 readStartPos = u32(m_nameToHashTableMem.writtenSize);
|
|
u32 localTableSize = readStartPos;
|
|
u64 serverTime = 0;
|
|
while (true)
|
|
{
|
|
if (isFirst)
|
|
{
|
|
serverTableSize = reader.ReadU32();
|
|
isFirst = false;
|
|
}
|
|
else
|
|
{
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_GetNameToHashFromServer, writer);
|
|
writer.WriteU32(serverTableSize);
|
|
writer.WriteU32(localTableSize);
|
|
|
|
reader.SetPosition(0);
|
|
if (!msg.Send(reader, Stats().getHashesMsg))
|
|
return false;
|
|
}
|
|
serverTime = reader.ReadU64();
|
|
|
|
u8* pos = m_nameToHashTableMem.memory + localTableSize;
|
|
|
|
u32 left = u32(reader.GetLeft());
|
|
u32 toRead = serverTableSize - localTableSize;
|
|
|
|
bool needMore = left < toRead;
|
|
if (needMore)
|
|
toRead = left;
|
|
|
|
m_nameToHashTableMem.AllocateNoLock(toRead, 1, TC("NameToHashTable"));
|
|
reader.ReadBytes(pos, toRead);
|
|
localTableSize += toRead;
|
|
|
|
if (!needMore)
|
|
break;
|
|
}
|
|
|
|
u32 addCount = 0;
|
|
BinaryReader r(m_nameToHashTableMem.memory, readStartPos, NameToHashMemSize);
|
|
ScopedWriteLock lock(m_nameToHashLookupLock);
|
|
while (r.GetPosition() < localTableSize)
|
|
{
|
|
StringKey name = r.ReadStringKey();
|
|
CasKey hash = r.ReadCasKey();
|
|
|
|
HashRec& rec = m_nameToHashLookup[name];
|
|
ScopedWriteLock lock2(rec.lock);
|
|
if (serverTime < rec.serverTime)
|
|
continue;
|
|
rec.key = hash;
|
|
rec.serverTime = serverTime;
|
|
++addCount;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void SessionClient::Connect()
|
|
{
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_Connect, writer);
|
|
writer.WriteString(m_name.data);
|
|
writer.WriteU32(SessionNetworkVersion);
|
|
|
|
CasKey keys[2];
|
|
if (m_useBinariesAsVersion)
|
|
{
|
|
StringBuffer<> dir;
|
|
GetDirectoryOfCurrentModule(m_logger, dir);
|
|
u64 dirCount = dir.count;
|
|
dir.Append(PathSeparator).Append(UBA_AGENT_EXECUTABLE);
|
|
m_storage.CalculateCasKey(keys[0], dir.data);
|
|
dir.Resize(dirCount).Append(PathSeparator).Append(UBA_DETOURS_LIBRARY);
|
|
m_storage.CalculateCasKey(keys[1], dir.data);
|
|
}
|
|
|
|
writer.WriteCasKey(keys[0]);
|
|
writer.WriteCasKey(keys[1]);
|
|
|
|
writer.WriteU32(m_maxProcessCount);
|
|
writer.WriteBool(m_dedicated);
|
|
|
|
StringBuffer<> info;
|
|
GetSystemInfo(info);
|
|
writer.WriteString(info);
|
|
|
|
StackBinaryReader<SendMaxSize> reader;
|
|
|
|
if (!msg.Send(reader, m_stats.connectMsg))
|
|
return;
|
|
|
|
m_connected = reader.ReadBool();
|
|
if (!m_connected)
|
|
{
|
|
StringBuffer<> str;
|
|
reader.ReadString(str);
|
|
m_logger.Error(str.data);
|
|
|
|
CasKey exeKey = reader.ReadCasKey();
|
|
CasKey dllKey = reader.ReadCasKey();
|
|
m_client.InvokeVersionMismatch(exeKey, dllKey);
|
|
return;
|
|
}
|
|
|
|
{
|
|
CasKey detoursBinaryKey = reader.ReadCasKey();
|
|
Storage::RetrieveResult result;
|
|
if (!m_storage.RetrieveCasFile(result, AsCompressed(detoursBinaryKey, false), UBA_DETOURS_LIBRARY))
|
|
return;
|
|
KeyToString dir(StringKeyZero);
|
|
StringBuffer<> detoursFile;
|
|
if (!WriteBinFile(detoursFile, UBA_DETOURS_LIBRARY, detoursBinaryKey, dir, DefaultAttributes()))
|
|
return;
|
|
|
|
#if PLATFORM_WINDOWS
|
|
char dll[1024];
|
|
sprintf_s(dll, sizeof(dll), "%ls", detoursFile.data);
|
|
m_detoursLibrary = dll;
|
|
#else
|
|
m_detoursLibrary = detoursFile.data;
|
|
#endif
|
|
}
|
|
|
|
bool resetCas = reader.ReadBool();
|
|
if (resetCas)
|
|
m_storage.Reset();
|
|
|
|
m_sessionId = reader.ReadU32();
|
|
m_uiLanguage = reader.ReadU32();
|
|
m_detailedTrace = reader.ReadBool();
|
|
|
|
BuildEnvironmentVariables(reader);
|
|
|
|
m_loop = true;
|
|
m_loopThread.Start([this]() { ThreadCreateProcessLoop(); return 0; });
|
|
}
|
|
|
|
void SessionClient::BuildEnvironmentVariables(BinaryReader& reader)
|
|
{
|
|
TString tempStr;
|
|
while (true)
|
|
{
|
|
tempStr = reader.ReadString();
|
|
if (tempStr.empty())
|
|
break;
|
|
m_environmentVariables.insert(m_environmentVariables.end(), tempStr.begin(), tempStr.end());
|
|
m_environmentVariables.push_back(0);
|
|
}
|
|
|
|
#if PLATFORM_WINDOWS
|
|
AddEnvironmentVariableNoLock(TC("Path"), TC("c:\\noenvironment"));
|
|
AddEnvironmentVariableNoLock(TC("TEMP"), m_tempPath.data);
|
|
AddEnvironmentVariableNoLock(TC("TMP"), m_tempPath.data);
|
|
#else
|
|
AddEnvironmentVariableNoLock(TC("TMPDIR"), m_tempPath.data);
|
|
#endif
|
|
|
|
StringBuffer<> v;
|
|
for (auto& var : m_localEnvironmentVariables)
|
|
if (GetEnvironmentVariableW(var, v.data, v.capacity))
|
|
AddEnvironmentVariableNoLock(var, v.data);
|
|
|
|
m_environmentVariables.push_back(0);
|
|
}
|
|
|
|
struct SessionClient::InternalProcessStartInfo : ProcessStartInfo
|
|
{
|
|
u32 processId = 0;
|
|
float weight = 1.0f;
|
|
TString descriptionBuf;
|
|
TString applicationBuf;
|
|
TString argumentsBuf;
|
|
TString workingDirBuf;
|
|
};
|
|
|
|
|
|
bool SessionClient::SendProcessAvailable(Vector<InternalProcessStartInfo>& out, float availableWeight, bool& outRemoteExecutionEnabled)
|
|
{
|
|
StackBinaryWriter<32> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_ProcessAvailable, writer);
|
|
writer.WriteU32(m_sessionId);
|
|
writer.WriteU32(*(u32*)&availableWeight);
|
|
|
|
StackBinaryReader<SendMaxSize> reader;
|
|
if (!msg.Send(reader, m_stats.procAvailableMsg))
|
|
{
|
|
if (m_loop)
|
|
m_logger.Error(TC("Failed to send ProcessAvailable message"));
|
|
return false;
|
|
}
|
|
while (true)
|
|
{
|
|
u32 processId = reader.ReadU32();
|
|
if (processId == 0)
|
|
break;
|
|
if (processId == SessionProcessAvailableResponse_Disconnect)
|
|
{
|
|
m_logger.Info(TC("Got disconnect request from host"));
|
|
return false;
|
|
}
|
|
if (processId == SessionProcessAvailableResponse_RemoteExecutionDisabled)
|
|
{
|
|
outRemoteExecutionEnabled = false;
|
|
break;
|
|
}
|
|
out.push_back({});
|
|
InternalProcessStartInfo& info = out.back();
|
|
info.processId = processId;
|
|
info.descriptionBuf = reader.ReadString();
|
|
info.applicationBuf = reader.ReadString();
|
|
info.argumentsBuf = reader.ReadString();
|
|
info.workingDirBuf = reader.ReadString();
|
|
u32 weight32 = reader.ReadU32();
|
|
info.outputStatsThresholdMs = reader.ReadU64();
|
|
info.weight = *(float*)&weight32;
|
|
|
|
Replace(info.applicationBuf.data(), '/', PathSeparator);
|
|
|
|
}
|
|
|
|
if (!out.empty())
|
|
if (!SendUpdateDirectoryTable())
|
|
return false;
|
|
|
|
return true;
|
|
}
|
|
|
|
void SessionClient::SendReturnProcess(u32 processId, const tchar* reason)
|
|
{
|
|
StackBinaryWriter<1024> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_ProcessReturned, writer);
|
|
writer.WriteU32(processId);
|
|
writer.WriteString(reason);
|
|
StackBinaryReader<32> reader;
|
|
if (!msg.Send(reader, m_stats.procReturnedMsg))
|
|
return;
|
|
}
|
|
|
|
bool SessionClient::SendUpdateDirectoryTable()
|
|
{
|
|
StackBinaryWriter<32> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_GetDirectoriesFromServer, writer);
|
|
writer.WriteU32(m_sessionId);
|
|
writer.WriteU32(~u32(0));
|
|
StackBinaryReader<SendMaxSize> reader;
|
|
if (!msg.Send(reader, Stats().getDirsMsg))
|
|
return false;
|
|
return UpdateDirectoryTableFromServer(reader);
|
|
}
|
|
|
|
bool SessionClient::SendUpdateNameToHashTable()
|
|
{
|
|
StackBinaryWriter<32> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_GetNameToHashFromServer, writer);
|
|
writer.WriteU32(~u32(0));
|
|
|
|
ScopedWriteLock lock(m_nameToHashMemLock);
|
|
writer.WriteU32(u32(m_nameToHashTableMem.writtenSize));
|
|
|
|
StackBinaryReader<SendMaxSize> reader;
|
|
if (!msg.Send(reader, Stats().getHashesMsg))
|
|
return false;
|
|
return UpdateNameToHashTableFromServer(reader);
|
|
}
|
|
|
|
void SessionClient::SendPing(u64 memAvail, u64 memTotal)
|
|
{
|
|
u64 time = GetTime();
|
|
if (TimeToMs(time - m_lastPingSendTime) < 2000) // Ping every ~2 seconds... this is so server can disconnect a client quickly if no ping is coming
|
|
return;
|
|
|
|
float cpuLoad = UpdateCpuLoad();
|
|
|
|
StackBinaryWriter<128> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_Ping, writer);
|
|
writer.WriteU32(m_sessionId);
|
|
writer.WriteU64(m_lastPing);
|
|
writer.WriteU64(memAvail);
|
|
writer.WriteU64(memTotal);
|
|
writer.WriteU32(*(u32*)&cpuLoad);
|
|
StackBinaryReader<32> reader;
|
|
time = GetTime();
|
|
if (!msg.Send(reader, m_stats.pingMsg))
|
|
m_loop = false;
|
|
u64 newTime = GetTime();
|
|
m_lastPing = newTime - time;
|
|
m_lastPingSendTime = newTime;
|
|
|
|
if (m_lastPing < m_bestPing || m_bestPing == 0)
|
|
m_bestPing = m_lastPing;
|
|
|
|
m_storage.Ping();
|
|
}
|
|
|
|
void SessionClient::SendSummary()
|
|
{
|
|
StackBinaryWriter<SendMaxSize> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_Summary, writer);
|
|
|
|
writer.WriteU32(m_sessionId);
|
|
|
|
WriteSummary(writer, [&](Logger& logger)
|
|
{
|
|
PrintSummary(logger);
|
|
m_storage.PrintSummary(logger);
|
|
m_client.PrintSummary(logger);
|
|
SystemStats::GetGlobal().Print(logger, true);
|
|
});
|
|
|
|
msg.Send();
|
|
}
|
|
|
|
void SessionClient::ThreadCreateProcessLoop()
|
|
{
|
|
struct ProcessRec
|
|
{
|
|
ProcessRec(ProcessImpl* impl) : handle(impl) {}
|
|
ProcessHandle handle;
|
|
ReaderWriterLock lock;
|
|
Atomic<bool> isKilled;
|
|
Atomic<bool> isDone;
|
|
float weight = 1.0f;
|
|
};
|
|
List<ProcessRec> activeProcesses;
|
|
|
|
u64 lastWaitTime = 0;
|
|
u64 waitForMemoryPressureStartTime = 0;
|
|
|
|
constexpr u64 waitTimeToSpawnAfterKillMs = 5 * 1000;
|
|
|
|
u64 memAvail;
|
|
u64 memTotal;
|
|
GetMemoryInfo(memAvail, memTotal);
|
|
|
|
SendPing(memAvail, memTotal);
|
|
|
|
u64 memRequiredToSpawn = u64(double(memTotal) * double(100 - m_memWaitLoadPercent) / 100.0);
|
|
u64 memRequiredFree = u64(double(memTotal) * double(100 - m_memKillLoadPercent) / 100.0);
|
|
|
|
bool remoteExecutionEnabled = true;
|
|
|
|
float maxWeight = float(m_maxProcessCount);
|
|
float activeWeight = 0;
|
|
ReaderWriterLock activeWeightLock;
|
|
|
|
u64 idleStartTime = GetTime();
|
|
u32 processRequestCount = 0;
|
|
|
|
auto RemoveInactiveProcesses = [&]()
|
|
{
|
|
for (auto it=activeProcesses.begin();it!=activeProcesses.end();)
|
|
{
|
|
ProcessRec& r = *it;
|
|
if (!r.isDone)
|
|
{
|
|
++it;
|
|
continue;
|
|
}
|
|
it = activeProcesses.erase(it);
|
|
}
|
|
|
|
if (remoteExecutionEnabled && m_terminationReason)
|
|
{
|
|
remoteExecutionEnabled = false;
|
|
m_logger.Info(TC("%s. Will stop scheduling processes and send failing processes back for retry"), m_terminationReason);
|
|
}
|
|
|
|
if (!activeProcesses.empty())
|
|
{
|
|
idleStartTime = GetTime();
|
|
processRequestCount = 0;
|
|
}
|
|
else if (remoteExecutionEnabled)
|
|
{
|
|
u32 idleTime = u32(TimeToS(GetTime() - idleStartTime));
|
|
if (idleTime > m_maxIdleSeconds)
|
|
{
|
|
m_logger.Info(TC("Session has been idle longer than max idle time (%u seconds). Disconnecting (Did %u process requests during idle)"), m_maxIdleSeconds, processRequestCount);
|
|
m_waitToSendEvent.Set();
|
|
remoteExecutionEnabled = false;
|
|
}
|
|
}
|
|
};
|
|
|
|
|
|
while (m_loop)
|
|
{
|
|
u32 waitTimeoutMs = 3000;
|
|
|
|
FlushDeadProcesses();
|
|
|
|
GetMemoryInfo(memAvail, memTotal);
|
|
if (memAvail < memRequiredFree)
|
|
{
|
|
for (auto it = activeProcesses.rbegin(); it != activeProcesses.rend(); ++it)
|
|
{
|
|
ProcessRec& rec = *it;
|
|
ScopedWriteLock lock(rec.lock);
|
|
if (rec.isKilled || rec.isDone)
|
|
continue;
|
|
rec.handle.Cancel(true);
|
|
rec.isKilled = true;
|
|
SendReturnProcess(rec.handle.GetId(), TC("Running out of memory"));
|
|
++m_stats.killCount;
|
|
m_logger.Warning(TC("Killed process due to memory pressure (Available: %s Total: %s)"), BytesToText(memAvail).str, BytesToText(memTotal).str);
|
|
break;
|
|
}
|
|
lastWaitTime = GetTime();
|
|
}
|
|
|
|
bool canSpawn = TimeToMs(GetTime() - lastWaitTime) > waitTimeToSpawnAfterKillMs;
|
|
if (!canSpawn)
|
|
waitTimeoutMs = 500;
|
|
|
|
bool firstCall = true;
|
|
|
|
while (remoteExecutionEnabled && canSpawn && m_loop)
|
|
{
|
|
float availableWeight;
|
|
{
|
|
ScopedReadLock lock(activeWeightLock);
|
|
if (activeWeight >= maxWeight)
|
|
break;
|
|
availableWeight = maxWeight - activeWeight;
|
|
}
|
|
|
|
if (!firstCall)
|
|
GetMemoryInfo(memAvail, memTotal);
|
|
|
|
if (memAvail < memRequiredToSpawn)
|
|
{
|
|
if (waitForMemoryPressureStartTime == 0)
|
|
{
|
|
m_logger.Info(TC("Delaying spawn due to memory pressure (Available: %s Total: %s)"), BytesToText(memAvail).str, BytesToText(memTotal).str);
|
|
waitForMemoryPressureStartTime = GetTime();
|
|
}
|
|
break;
|
|
}
|
|
|
|
if (waitForMemoryPressureStartTime)
|
|
{
|
|
u64 waitTime = GetTime() - waitForMemoryPressureStartTime;
|
|
m_logger.Info(TC("Waited %s for memory pressure to go down (Available: %s Total: %s)"), TimeToText(waitTime).str, BytesToText(memAvail).str, BytesToText(memTotal).str);
|
|
m_stats.waitMemPressure += waitTime;
|
|
waitForMemoryPressureStartTime = 0;
|
|
lastWaitTime = GetTime();
|
|
waitTimeoutMs = 200;
|
|
availableWeight = Min(availableWeight, 1.0f);
|
|
}
|
|
|
|
Vector<InternalProcessStartInfo> startInfos;
|
|
if (!SendProcessAvailable(startInfos, availableWeight, remoteExecutionEnabled))
|
|
{
|
|
m_loop = false;
|
|
break;
|
|
}
|
|
++processRequestCount;
|
|
|
|
if (!remoteExecutionEnabled)
|
|
{
|
|
m_logger.Info(TC("Got remote execution disabled response from host (will finish %llu active processes)"), startInfos.size() + activeProcesses.size());
|
|
}
|
|
|
|
if (startInfos.empty())
|
|
{
|
|
canSpawn = false;
|
|
waitTimeoutMs = 200;
|
|
}
|
|
|
|
SendUpdateNameToHashTable();
|
|
|
|
for (InternalProcessStartInfo& startInfo : startInfos)
|
|
{
|
|
startInfo.description = startInfo.descriptionBuf.c_str();
|
|
startInfo.application = startInfo.applicationBuf.c_str();
|
|
startInfo.arguments = startInfo.argumentsBuf.c_str();
|
|
startInfo.workingDir = startInfo.workingDirBuf.c_str();
|
|
|
|
startInfo.uiLanguage = int(m_uiLanguage);
|
|
startInfo.priorityClass = m_defaultPriorityClass;
|
|
startInfo.useCustomAllocator = !m_disableCustomAllocator;
|
|
startInfo.outputStatsThresholdMs = m_outputStatsThresholdMs != 0 ? m_outputStatsThresholdMs : startInfo.outputStatsThresholdMs;
|
|
|
|
StringBuffer<> logFile;
|
|
if (m_logToFile)
|
|
{
|
|
logFile.Append(m_sessionLogDir.data);
|
|
GetNameFromArguments(logFile, startInfo.arguments, true);
|
|
logFile.Append(TC(".log"));
|
|
startInfo.logFile = logFile.data;
|
|
}
|
|
|
|
StringBuffer<> realApplication;
|
|
if (!EnsureApplicationEnvironment(realApplication, startInfo.processId, startInfo.application))
|
|
{
|
|
m_logger.Error(TC("Failed to ensure application environment for %s"), startInfo.application);
|
|
SendReturnProcess(startInfo.processId, TC("Failed to ensure application environment"));
|
|
m_loop = false;
|
|
break;
|
|
}
|
|
|
|
void* env = GetProcessEnvironmentVariables();
|
|
|
|
auto process = new ProcessImpl(*this, startInfo.processId, nullptr);
|
|
|
|
activeProcesses.emplace_back(process);
|
|
ProcessRec* rec = &activeProcesses.back();
|
|
|
|
rec->weight = startInfo.weight;
|
|
|
|
{
|
|
ScopedWriteLock lock(activeWeightLock);
|
|
activeWeight += rec->weight;
|
|
}
|
|
|
|
struct ExitedRec
|
|
{
|
|
ExitedRec(SessionClient& s, ReaderWriterLock& l, float& w, ProcessRec* r) : session(s), activeWeightLock(l), activeWeight(w), rec(r) {}
|
|
SessionClient& session;
|
|
ReaderWriterLock& activeWeightLock;
|
|
float& activeWeight;
|
|
ProcessRec* rec;
|
|
};
|
|
|
|
ExitedRec* exitedRec = new ExitedRec(*this, activeWeightLock, activeWeight, rec);
|
|
startInfo.exitedUserData = exitedRec;
|
|
startInfo.exitedFunc = [](void* userData, const ProcessHandle& h)
|
|
{
|
|
auto er = (ExitedRec*)userData;
|
|
SessionClient& session = er->session;
|
|
ReaderWriterLock& activeWeightLock = er->activeWeightLock;
|
|
float& activeWeight = er->activeWeight;
|
|
ProcessRec* rec = er->rec;
|
|
delete er;
|
|
|
|
#if UBA_DEBUG_LOG_ENABLED
|
|
if (false)
|
|
{
|
|
if (const tchar* logFile = h.GetStartInfo().logFile)
|
|
{
|
|
WrittenFile f;
|
|
f.name = logFile;
|
|
StringBuffer<> dest;
|
|
if (const tchar* lastSlash = TStrrchr(logFile, PathSeparator))
|
|
logFile = lastSlash + 1;
|
|
dest.Append(TC("<log>")).Append(logFile);
|
|
f.key = ToStringKeyLower(dest);
|
|
session.SendFile(*(ProcessImpl*)h.m_process, f, dest.data);
|
|
}
|
|
}
|
|
#endif
|
|
|
|
auto decreaseWeight = MakeGuard([&]()
|
|
{
|
|
ScopedWriteLock weightLock(activeWeightLock);
|
|
activeWeight -= rec->weight;
|
|
session.m_waitToSendEvent.Set();
|
|
});
|
|
|
|
ScopedWriteLock lock(rec->lock);
|
|
if (rec->isKilled)
|
|
{
|
|
rec->isDone = true;
|
|
return;
|
|
}
|
|
auto& process = *(ProcessImpl*)h.m_process;
|
|
|
|
if (session.m_killRandomIndex != ~0u && session.m_killRandomCounter++ == session.m_killRandomIndex)
|
|
{
|
|
session.m_loop = false;
|
|
session.m_logger.Info(TC("Killed random process (%s)"), process.m_description.c_str());
|
|
rec->isDone = true;
|
|
return;
|
|
}
|
|
|
|
u32 exitCode = process.m_exitCode;
|
|
|
|
if (exitCode != 0)
|
|
{
|
|
if (GetTime() >= session.m_terminationTime)
|
|
{
|
|
if (session.m_loop)
|
|
session.SendReturnProcess(rec->handle.GetId(), session.m_terminationReason);
|
|
rec->isDone = true;
|
|
return;
|
|
}
|
|
|
|
// TODO: Remove this once m_isTerminating is working properly
|
|
if (IsAWSTermination(process))
|
|
{
|
|
session.m_loop = false;
|
|
session.m_logger.Info(TC("Got error that is most likely because AWS termination. If not, this should be checked (%s)"), process.m_description.c_str());
|
|
rec->isDone = true;
|
|
return;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
// Should we decrease weight before or after sending files?
|
|
//decreaseWeight.Execute();
|
|
|
|
if (!session.SendFiles(process, process.m_processStats.sendFiles))
|
|
{
|
|
const tchar* desc = TC("Failed to send output files to host");
|
|
session.m_logger.Error(desc);
|
|
if (session.m_loop)
|
|
session.SendReturnProcess(rec->handle.GetId(), desc);
|
|
rec->isDone = true;
|
|
return;
|
|
}
|
|
}
|
|
|
|
decreaseWeight.Execute();
|
|
|
|
if (process.IsCancelled())
|
|
{
|
|
if (session.m_loop)
|
|
session.SendReturnProcess(rec->handle.GetId(), TC("Cancelled"));
|
|
rec->isDone = true;
|
|
return;
|
|
}
|
|
|
|
StackBinaryWriter<SendMaxSize> writer;
|
|
NetworkMessage msg(session.m_client, ServiceId, SessionMessageType_ProcessFinished, writer);
|
|
writer.WriteU32(process.m_id);
|
|
writer.WriteU32(exitCode);
|
|
writer.WriteU32(session.CountLogLines(process));
|
|
session.WriteLogLines(writer, process);
|
|
|
|
// Must be written last
|
|
process.m_processStats.Write(writer);
|
|
process.m_sessionStats.Write(writer);
|
|
process.m_storageStats.Write(writer);
|
|
process.m_systemStats.Write(writer);
|
|
|
|
StackBinaryReader<16> reader;
|
|
if (!msg.Send(reader, session.m_stats.procFinishedMsg) && session.m_loop)
|
|
session.m_logger.Error(TC("Failed to send ProcessFinished message!"));
|
|
|
|
// TODO: These should be removed and instead added in TraceReader (so it will update over time)
|
|
session.m_stats.stats.Add(process.m_sessionStats);
|
|
session.m_storage.AddStats(process.m_storageStats);
|
|
|
|
if (session.m_processFinished)
|
|
session.m_processFinished(&process);
|
|
|
|
rec->isDone = true;
|
|
session.m_waitToSendEvent.Set();
|
|
};
|
|
|
|
process->Start(startInfo, realApplication.data, m_processWorkingDir.data, true, env, true, true);
|
|
}
|
|
|
|
RemoveInactiveProcesses();
|
|
|
|
firstCall = false;
|
|
}
|
|
|
|
SendPing(memAvail, memTotal);
|
|
|
|
m_waitToSendEvent.IsSet(waitTimeoutMs);
|
|
|
|
RemoveInactiveProcesses();
|
|
|
|
if (activeProcesses.empty() && !remoteExecutionEnabled)
|
|
break;
|
|
}
|
|
|
|
CancelAllProcessesAndWait(); // If we got the exit from server there is no point sending anything more back.. cancel everything
|
|
|
|
u32 retry = 0;
|
|
while (true)
|
|
{
|
|
if (retry++ == 100)
|
|
{
|
|
m_logger.Error(TC("This should never happen!"));
|
|
break;
|
|
}
|
|
RemoveInactiveProcesses();
|
|
if (activeProcesses.empty())
|
|
break;
|
|
m_waitToSendEvent.IsSet(100);
|
|
};
|
|
|
|
SendSummary();
|
|
}
|
|
|
|
u32 SessionClient::CountLogLines(ProcessImpl& process)
|
|
{
|
|
u32 count = u32(process.m_logLines.size());
|
|
for (auto& child : process.m_childProcesses)
|
|
count += CountLogLines(*(ProcessImpl*)child.m_process);
|
|
return count;
|
|
}
|
|
|
|
void SessionClient::WriteLogLines(BinaryWriter& writer, ProcessImpl& process)
|
|
{
|
|
for (auto& child : process.m_childProcesses)
|
|
WriteLogLines(writer, *(ProcessImpl*)child.m_process);
|
|
for (auto& line : process.m_logLines)
|
|
{
|
|
writer.WriteString(line.text);
|
|
writer.WriteByte(line.type);
|
|
}
|
|
}
|
|
|
|
bool SessionClient::IsAWSTermination(ProcessImpl& process)
|
|
{
|
|
// This seems to be AWS termination error code... let's just shutdown the entire agent
|
|
if (process.m_exitCode == 4 || (process.m_exitCode == 1 && Contains(process.m_realApplication.c_str(), TC("link.exe"))))
|
|
return true;
|
|
//for (auto c : process.m_childProcesses)
|
|
// if (IsAWSTermination(*(ProcessImpl*)c.m_process))
|
|
// return true;
|
|
return false;
|
|
}
|
|
|
|
bool SessionClient::AllocFailed(Process& process, const tchar* allocType, u32 error)
|
|
{
|
|
//StackBinaryWriter<32> writer;
|
|
//NetworkMessage msg(m_client, ServiceId, SessionMessageType_VirtualAllocFailed, writer);
|
|
//if (!msg.Send())
|
|
// m_logger.Error(TC("Failed to send VirtualAllocFailed message!"));
|
|
return Session::AllocFailed(process, allocType, error);
|
|
}
|
|
|
|
void SessionClient::PrintSessionStats(Logger& logger)
|
|
{
|
|
Session::PrintSessionStats(logger);
|
|
m_stats.Print(logger);
|
|
}
|
|
|
|
bool SessionClient::CustomMessage(BinaryReader& reader, BinaryWriter& writer)
|
|
{
|
|
StackBinaryWriter<SendMaxSize> msgWriter;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_Custom, msgWriter);
|
|
|
|
u32 recvSize = reader.ReadU32();
|
|
msgWriter.WriteU32(recvSize);
|
|
msgWriter.WriteBytes(reader.GetPositionData(), recvSize);
|
|
|
|
BinaryReader msgReader(writer.GetData(), 0);
|
|
if (!msg.Send(msgReader, m_stats.customMsg))
|
|
return false;
|
|
|
|
u32 responseSize = msgReader.ReadU32();
|
|
writer.AllocWrite(4ull + responseSize);
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::FlushWrittenFiles(ProcessImpl& process)
|
|
{
|
|
ScopedWriteLock lock(process.m_writtenFilesLock);
|
|
if (!SendFiles(process, process.m_processStats.sendFiles))
|
|
return false;
|
|
process.m_writtenFiles.clear();
|
|
return true;
|
|
}
|
|
|
|
bool SessionClient::UpdateEnvironment(ProcessImpl& process, const tchar* reason)
|
|
{
|
|
StackBinaryWriter<SendMaxSize> writer;
|
|
NetworkMessage msg(m_client, ServiceId, SessionMessageType_UpdateEnvironment, writer);
|
|
writer.WriteU32(process.m_id);
|
|
writer.WriteString(reason);
|
|
process.m_processStats.Write(writer);
|
|
process.m_sessionStats.Write(writer);
|
|
process.m_storageStats.Write(writer);
|
|
process.m_systemStats.Write(writer);
|
|
StackBinaryReader<32> reader;
|
|
if (!msg.Send(reader, m_stats.customMsg))
|
|
return false;
|
|
return SendUpdateDirectoryTable();
|
|
}
|
|
}
|