Files
UnrealEngineUWP/Engine/Plugins/AI/MLAdapter/Source/python/tests/mock_server.py
mieszko zielinski 7848b68be0 Renamed UE4ML plugin to MLAdapter and updated all its contents accordingly
#lockdown nick.whiting
[at]Nick.Whiting, [at]Mikko.Mononen
#rb Nick.Whiting, Mikko.Mononen
#jira UE-111731
#jira UE-111119
#jira UE-111117
#jira UE-111114
#jira UE-111091
#jira UE-111075
#jira UE-111060
#jira UE-111058
#jira UE-111049
#jira UE-111044
#jira UE-111040
#jira UE-111039
#jira UE-111038
#jira UE-111036
#jira UE-111033
#jira UE-111032
#jira UE-111029
#jira UE-111026
#jira UE-111024
#jira UE-111021
#jira UE-111020
#jira UE-111018
#jira UE-111017
#jira UE-111016
#jira UE-111015
#jira UE-111014
#jira UE-111013
#jira UE-111012
#jira UE-110977
#jira UE-110975
#jira UE-110974
#jira UE-110973
#jira UE-110971
#jira UE-110969
#jira UE-110965
#jira UE-110949
#preflight 606ebe61db0bbb00016e242e

#ROBOMERGE-OWNER: mieszko.zielinski
#ROBOMERGE-AUTHOR: mieszko.zielinski
#ROBOMERGE-SOURCE: CL 15955453 in //UE5/Release-5.0-EarlyAccess/...
#ROBOMERGE-BOT: STARSHIP (Release-5.0-EarlyAccess -> Main) (v787-15839533)
#ROBOMERGE-CONFLICT from-shelf

[CL 15975769 by mieszko zielinski in ue5-main branch]
2021-04-12 03:41:10 -04:00

47 lines
1.4 KiB
Python

# Copyright Epic Games, Inc. All Rights Reserved.
import msgpackrpc
from unreal.mladapter.client import Client
import unreal.mladapter.utils as utils
class MockFunctions(object):
def __init__(self):
# making sure the class has all the required functions
self.__dict__[Client.FUNCNAME_LIST_FUNCTIONS] = self._list_functions
self.__dict__[Client.FUNCNAME_PING] = self._ping
def sum(self, x, y):
return x + y
def foo(self):
return True
def _ping(self):
return True
def _list_functions(self):
return [Client.FUNCNAME_LIST_FUNCTIONS, Client.FUNCNAME_PING, 'foo', 'sum', 'non_implemented']
class MockServer(msgpackrpc.Server):
def __init__(self, port, dispatcher=MockFunctions()):
super(MockServer, self).__init__(dispatcher, pack_encoding=None)
self.listen(msgpackrpc.Address(utils.LOCALHOST, port))
# not calling self.start() since it's a blocking call. Triggering a thread instead
import threading
self._thread = threading.Thread(target=self.start) # , args=(1,))
self._thread.start()
def close(self):
# del self._thread
self.stop()
self._thread.join(timeout=0.01)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# del self._thread
self.stop()
self._thread.join(timeout=0.01)