mirror of
https://github.com/AdaCore/OpenUxAS.git
synced 2026-02-12 13:07:16 -08:00
This commit makes significant effort towards refactoring the repository so that it is in a cleaner and more consistent state going forward. Since we are now explicitly expecting multiple languages to be used for OpenUxAS, we have reorganized `src` accordingly. Likewise, we have reorganized `tests`. This is a candidate for the rebaseline of afrl-rq/OpenUxAS.
90 lines
3.0 KiB
Python
90 lines
3.0 KiB
Python
from pylmcp import Object
|
|
from pylmcp.util import Buffer
|
|
import json
|
|
|
|
|
|
class Message(object):
|
|
|
|
def __init__(self,
|
|
obj: Object,
|
|
source_entity_id: int,
|
|
source_service_id: int,
|
|
source_group: str = '',
|
|
content_type: str = 'lmcp',
|
|
address=None,
|
|
descriptor=None):
|
|
self.obj = obj
|
|
self.address = obj.object_class.full_name
|
|
self.descriptor = obj.object_class.full_name
|
|
self.content_type = content_type
|
|
self.source_group = source_group
|
|
self.source_entity_id = source_entity_id
|
|
self.source_service_id = source_service_id
|
|
if descriptor is not None:
|
|
self.descriptor = descriptor
|
|
if address is not None:
|
|
self.address = address
|
|
|
|
@classmethod
|
|
def unpack(self, raw_msg: bytes) -> "Message":
|
|
"""Unpack a received message.
|
|
|
|
:param raw_msg: the received message
|
|
:return: a Message object
|
|
"""
|
|
# Unpack headers and payload
|
|
address, attributes, payload = raw_msg.split(b'$', 2)
|
|
content_type, descriptor, source_group, \
|
|
source_entity_id, source_service_id = \
|
|
attributes.split(b'|', 4)
|
|
|
|
# Unpack the LMCP object
|
|
buf = Buffer(payload)
|
|
|
|
# Control character
|
|
buf.unpack("uint32")
|
|
|
|
# Size
|
|
buf.unpack("uint32")
|
|
|
|
obj = Object.unpack(data=buf)
|
|
assert obj is not None
|
|
|
|
return Message(
|
|
obj=obj,
|
|
source_entity_id=int(source_entity_id.decode('utf-8')),
|
|
source_service_id=int(source_service_id.decode('utf-8')),
|
|
source_group=source_group.decode('utf-8'),
|
|
content_type=content_type.decode('utf-8'),
|
|
address=address.decode('utf-8'),
|
|
descriptor=descriptor.decode('utf-8'))
|
|
|
|
def pack(self) -> bytes:
|
|
"""Create a message that can be sent through the network.
|
|
|
|
:return: a string
|
|
:rtype: str
|
|
"""
|
|
payload = self.obj.pack()
|
|
attributes = b"|".join([self.content_type.encode('utf-8'),
|
|
self.descriptor.encode('utf-8'),
|
|
self.source_group.encode('utf-8'),
|
|
str(self.source_entity_id).encode('utf-8'),
|
|
str(self.source_service_id).encode('utf-8')])
|
|
raw_msg = b"$".join([self.address.encode('utf-8'),
|
|
attributes,
|
|
payload])
|
|
return raw_msg
|
|
|
|
def as_dict(self):
|
|
return {'address': self.address,
|
|
'descriptor': self.descriptor,
|
|
'content_type': self.content_type,
|
|
'source_group': self.source_group,
|
|
'source_entitiy_id': self.source_entity_id,
|
|
'source_service_id': self.source_service_id,
|
|
'obj': self.obj.as_dict()}
|
|
|
|
def __str__(self):
|
|
return json.dumps(self.as_dict(), indent=2)
|