Files
UnrealEngineUWP/Engine/Source/Runtime/MessagingRpc/Private/MessageRpcServer.cpp
mihnea balta cfe83027c3 Fixing lots of places which created USTRUCTs with operator new and passed them to FMessageEndpoint::Publish or Send, wich crashes when ASAN is used.
The messaging system destroys these objects with FMemory::Free, which has different alignment logic than operator new when ASAN is used, resulting in a crash.

#jira none
#rnx
#rb Jerome.Delattre, Matt.Peters

#ROBOMERGE-SOURCE: CL 17116813 in //UE5/Main/...
#ROBOMERGE-BOT: STARSHIP (Main -> Release-Engine-Test) (v855-17104924)

[CL 17116851 by mihnea balta in ue5-release-engine-test branch]
2021-08-10 10:58:07 -04:00

204 lines
5.2 KiB
C++

// Copyright Epic Games, Inc. All Rights Reserved.
#include "MessageRpcServer.h"
#include "Async/IAsyncProgress.h"
#include "Containers/ArrayBuilder.h"
#include "Containers/Ticker.h"
#include "IMessageRpcHandler.h"
#include "MessageEndpoint.h"
#include "MessageEndpointBuilder.h"
#include "RpcMessage.h"
#include "MessageRpcDefines.h"
#include "MessageRpcMessages.h"
/* FMessageRpcServer structors
*****************************************************************************/
FMessageRpcServer::FMessageRpcServer()
: FMessageRpcServer(FMessageEndpoint::Builder(TEXT("FMessageRpcServer")))
{
}
FMessageRpcServer::FMessageRpcServer(const FString& InDebugName, const TSharedRef<IMessageBus, ESPMode::ThreadSafe>& InMessageBus)
: FMessageRpcServer(FMessageEndpoint::Builder(*InDebugName, InMessageBus))
{
}
FMessageRpcServer::FMessageRpcServer(FMessageEndpointBuilder&& InEndpointBuilder)
{
MessageEndpoint = InEndpointBuilder.WithCatchall(this, &FMessageRpcServer::HandleMessage);
TickerHandle = FTicker::GetCoreTicker().AddTicker(FTickerDelegate::CreateRaw(this, &FMessageRpcServer::HandleTicker), MESSAGE_RPC_TICK_DELAY);
}
FMessageRpcServer::~FMessageRpcServer()
{
FTicker::GetCoreTicker().RemoveTicker(TickerHandle);
}
/* IMessageRpcServer interface
*****************************************************************************/
TSharedPtr<FMessageEndpoint, ESPMode::ThreadSafe> FMessageRpcServer::GetEndpoint() const
{
return MessageEndpoint;
}
void FMessageRpcServer::AddHandler(const FName& RequestMessageType, const TSharedRef<IMessageRpcHandler>& Handler)
{
Handlers.Add(RequestMessageType, Handler);
}
const FMessageAddress& FMessageRpcServer::GetAddress() const
{
return MessageEndpoint->GetAddress();
}
FOnMessageRpcNoHandler& FMessageRpcServer::OnNoHandler()
{
return NoHandlerDelegate;
}
void FMessageRpcServer::SetSendProgressUpdate(bool InSendProgress)
{
bSendProgress = InSendProgress;
}
/* FMessageRpcServer implementation
*****************************************************************************/
void FMessageRpcServer::ProcessCancelation(const FMessageRpcCancel& Message, const TSharedRef<IMessageContext, ESPMode::ThreadSafe>& Context)
{
FReturnInfo ReturnInfo;
if (Returns.RemoveAndCopyValue(Message.CallId, ReturnInfo))
{
ReturnInfo.Return->Cancel();
}
}
void FMessageRpcServer::ProcessRequest(const TSharedRef<IMessageContext, ESPMode::ThreadSafe>& Context)
{
auto Message = (FRpcMessage*)Context->GetMessage();
const FName MessageType = Context->GetMessageType();
if (!Handlers.Contains(MessageType))
{
if (!NoHandlerDelegate.IsBound())
{
return;
}
NoHandlerDelegate.Execute(MessageType);
}
auto Handler = Handlers.FindRef(MessageType);
if (Handler.IsValid())
{
FReturnInfo ReturnInfo;
{
ReturnInfo.ClientAddress = Context->GetSender();
ReturnInfo.LastProgressSent = FDateTime::UtcNow();
ReturnInfo.Return = Handler->HandleRequest(Context);
}
Returns.Add(Message->CallId, ReturnInfo);
}
else
{
// notify caller that call was not handled
MessageEndpoint->Send(FMessageEndpoint::MakeMessage<FMessageRpcUnhandled>(Message->CallId), Context->GetSender());
}
}
void FMessageRpcServer::SendProgress(const FGuid& CallId, const FReturnInfo& ReturnInfo)
{
const TSharedPtr<IAsyncProgress>& Progress = ReturnInfo.Progress;
const TSharedPtr<IAsyncTask>& Task = ReturnInfo.Task;
MessageEndpoint->Send(
new FMessageRpcProgress(
CallId,
Progress.IsValid() ? Progress->GetCompletion().Get(-1.0f) : -1.0f,
Progress.IsValid() ? Progress->GetStatusText() : FText::GetEmpty()
),
ReturnInfo.ClientAddress
);
}
void FMessageRpcServer::SendResult(const FGuid& CallId, const FReturnInfo& ReturnInfo)
{
FRpcMessage* Message = ReturnInfo.Return->CreateResponseMessage();
Message->CallId = CallId;
MessageEndpoint->Send(
Message,
ReturnInfo.Return->GetResponseTypeInfo(),
EMessageFlags::None,
nullptr,
TArrayBuilder<FMessageAddress>().Add(ReturnInfo.ClientAddress),
FTimespan::Zero(),
FDateTime::MaxValue()
);
}
/* FMessageRpcServer event handlers
*****************************************************************************/
void FMessageRpcServer::HandleMessage(const TSharedRef<IMessageContext, ESPMode::ThreadSafe>& Context)
{
const TWeakObjectPtr<UScriptStruct>& MessageTypeInfo = Context->GetMessageTypeInfo();
if (!MessageTypeInfo.IsValid())
{
return;
}
if (MessageTypeInfo == FMessageRpcCancel::StaticStruct())
{
ProcessCancelation(*static_cast<const FMessageRpcCancel*>(Context->GetMessage()), Context);
}
else if (MessageTypeInfo->IsChildOf(FRpcMessage::StaticStruct()))
{
ProcessRequest(Context);
}
}
bool FMessageRpcServer::HandleTicker(float DeltaTime)
{
QUICK_SCOPE_CYCLE_COUNTER(STAT_FMessageRpcServer_HandleTicker);
const FDateTime UtcNow = FDateTime::UtcNow();
for (TMap<FGuid, FReturnInfo>::TIterator It(Returns); It; ++It)
{
FReturnInfo& ReturnInfo = It.Value();
if (ReturnInfo.Return->IsReady())
{
SendResult(It.Key(), ReturnInfo);
It.RemoveCurrent();
}
else if (bSendProgress &&
(UtcNow - ReturnInfo.LastProgressSent > FTimespan::FromSeconds(MESSAGE_RPC_PROGRESS_INTERVAL)))
{
SendProgress(It.Key(), ReturnInfo);
ReturnInfo.LastProgressSent = UtcNow;
}
}
return true;
}