2019-10-29 12:51:14 +01:00
|
|
|
"""Implement a server that can send/receive uxas message."""
|
2020-01-30 17:13:51 +01:00
|
|
|
from queue import Queue
|
2019-10-29 12:51:14 +01:00
|
|
|
from pylmcp.message import Message
|
2019-11-15 11:33:51 +01:00
|
|
|
from pylmcp import Object
|
2020-01-30 17:13:51 +01:00
|
|
|
from pylmcp.uxas import UxAS, PublishPullBridge, SubscribePushBridge, AutomationRequestValidator, UxASConfig
|
2019-10-29 12:51:14 +01:00
|
|
|
import threading
|
2019-11-15 11:33:51 +01:00
|
|
|
import time
|
2020-01-30 17:13:51 +01:00
|
|
|
import typing
|
2019-10-29 12:51:14 +01:00
|
|
|
import zmq
|
2020-02-06 10:08:47 +01:00
|
|
|
import os
|
2019-10-29 12:51:14 +01:00
|
|
|
|
2020-02-06 10:08:47 +01:00
|
|
|
DEFAULT_IN_URL=os.environ.get('IN_SERVER_URL', 'tcp://127.0.0.1:5560')
|
|
|
|
|
DEFAULT_OUT_URL=os.environ.get('OUT_SERVER_URL', 'tcp://127.0.0.1:5561')
|
2019-10-29 12:51:14 +01:00
|
|
|
|
2019-11-15 11:33:51 +01:00
|
|
|
class ServerTimeout(Exception):
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
2019-10-29 12:51:14 +01:00
|
|
|
class Server(object):
|
|
|
|
|
"""Handle messages from UxAS bus.
|
|
|
|
|
|
|
|
|
|
Example::
|
|
|
|
|
|
|
|
|
|
with Server() as s:
|
|
|
|
|
m = s.read_msg()
|
|
|
|
|
s.send_msg(m)
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self,
|
2020-02-06 10:08:47 +01:00
|
|
|
out_url: str = DEFAULT_IN_URL,
|
|
|
|
|
in_url: str = DEFAULT_OUT_URL,
|
2020-01-30 17:13:51 +01:00
|
|
|
bridge_service: bool = True,
|
|
|
|
|
bridge_cfg: typing.Union[None, UxASConfig] = None,
|
|
|
|
|
entity_id: int = 100):
|
2019-10-29 12:51:14 +01:00
|
|
|
"""Start a server that listen for UxAS messages.
|
|
|
|
|
|
|
|
|
|
:param out_url: url on which messages can be sent
|
|
|
|
|
:type out_url: str
|
|
|
|
|
:param in_url: url on which the server listen
|
|
|
|
|
:type in_url: str
|
2019-11-15 11:33:51 +01:00
|
|
|
:param bridge_service: if True start an uxas with the
|
|
|
|
|
PubPull bridge set
|
|
|
|
|
:type bridge_service: bool
|
|
|
|
|
:param bridge_cfg: if not None, use that as base
|
|
|
|
|
configuration for the bridge
|
|
|
|
|
:type bridge_cfg: pylmcp.uxas.UxASConfig
|
|
|
|
|
:param entity_id: entity id
|
|
|
|
|
:type entity_id: int
|
2019-10-29 12:51:14 +01:00
|
|
|
"""
|
|
|
|
|
self.out_url = out_url
|
|
|
|
|
self.in_url = in_url
|
|
|
|
|
self.ctx = zmq.Context()
|
2019-11-15 11:33:51 +01:00
|
|
|
self.entity_id = entity_id
|
2019-10-29 12:51:14 +01:00
|
|
|
|
|
|
|
|
# Set incoming messages socket (subscribe to all messages)
|
|
|
|
|
self.in_socket = self.ctx.socket(zmq.SUB)
|
|
|
|
|
self.in_socket.connect(self.in_url)
|
2020-01-30 17:13:51 +01:00
|
|
|
self.in_socket.setsockopt(zmq.SUBSCRIBE, b"")
|
2019-10-29 12:51:14 +01:00
|
|
|
|
|
|
|
|
# Set socket to send messages
|
|
|
|
|
self.out_socket = self.ctx.socket(zmq.PUSH)
|
|
|
|
|
self.out_socket.connect(self.out_url)
|
|
|
|
|
|
|
|
|
|
# Thread that read UxAS message in background
|
|
|
|
|
self.listen_task = None
|
|
|
|
|
|
|
|
|
|
# Internal queue holding all zeromq received messages
|
2020-01-30 17:13:51 +01:00
|
|
|
self.in_msg_queue = Queue() # type: Queue
|
2019-10-29 12:51:14 +01:00
|
|
|
|
|
|
|
|
# Setting stop_listening to True will cause the thread in
|
|
|
|
|
# charge of listening for incoming messages to stop
|
|
|
|
|
self.stop_listening = False
|
|
|
|
|
|
2020-01-30 17:13:51 +01:00
|
|
|
self.bridge = None # type: typing.Union[UxAS, None]
|
|
|
|
|
|
2019-11-15 11:33:51 +01:00
|
|
|
if bridge_service:
|
|
|
|
|
self.bridge = UxAS(self.entity_id)
|
|
|
|
|
if bridge_cfg is not None:
|
|
|
|
|
self.bridge.cfg = bridge_cfg
|
|
|
|
|
self.bridge.cfg += PublishPullBridge(
|
|
|
|
|
pub_address=self.in_url,
|
|
|
|
|
pull_address=self.out_url)
|
|
|
|
|
# self.bridge.cfg += AutomationRequestValidator()
|
|
|
|
|
self.bridge.run()
|
|
|
|
|
# Needed ???
|
|
|
|
|
time.sleep(0.2)
|
|
|
|
|
|
2019-10-29 12:51:14 +01:00
|
|
|
# Start the listening thread
|
|
|
|
|
self.start_listening()
|
|
|
|
|
|
|
|
|
|
def stop(self):
|
|
|
|
|
"""Stop listening for new messages."""
|
|
|
|
|
self.stop_listening = True
|
2019-11-15 11:33:51 +01:00
|
|
|
if self.bridge is not None:
|
|
|
|
|
self.bridge.interrupt()
|
|
|
|
|
|
|
|
|
|
def get_uxas(self, **kwargs):
|
|
|
|
|
"""Get an uxas instance.
|
|
|
|
|
|
|
|
|
|
The instance has at least the SubscribePushBridge service enabled.
|
|
|
|
|
|
|
|
|
|
:param kwargs: arguments are passed to the UxAS initialize function.
|
|
|
|
|
:type kwargs: dict
|
|
|
|
|
"""
|
|
|
|
|
result = UxAS(entity_id=self.entity_id, **kwargs)
|
|
|
|
|
result.cfg += SubscribePushBridge(
|
|
|
|
|
sub_address=self.in_url,
|
|
|
|
|
push_address=self.out_url)
|
|
|
|
|
return result
|
2019-10-29 12:51:14 +01:00
|
|
|
|
|
|
|
|
def __del__(self):
|
|
|
|
|
# Ensure that we don't block on termination
|
|
|
|
|
self.stop()
|
|
|
|
|
|
|
|
|
|
def has_msg(self):
|
|
|
|
|
"""Check for new messages.
|
|
|
|
|
|
|
|
|
|
:return: True if they are messages in the queue.
|
|
|
|
|
:rtype: bool
|
|
|
|
|
"""
|
|
|
|
|
return not self.in_msg_queue.empty()
|
|
|
|
|
|
|
|
|
|
def read_msg(self):
|
|
|
|
|
"""Read a message from the queue.
|
|
|
|
|
|
|
|
|
|
This call is blocking so ensure to call has_msg before to check
|
|
|
|
|
for message availability.
|
|
|
|
|
|
|
|
|
|
:return: a decoded UxAS message
|
|
|
|
|
:rtype: pylmcp.message.Message
|
|
|
|
|
"""
|
|
|
|
|
return Message.unpack(self.in_msg_queue.get())
|
|
|
|
|
|
|
|
|
|
def send_msg(self, msg):
|
|
|
|
|
"""Send an UxAS message.
|
|
|
|
|
|
2019-11-15 11:33:51 +01:00
|
|
|
:param msg: a UxAS message or an LMCP Object. When an object is
|
|
|
|
|
given, a message is create automatically with the bridge
|
|
|
|
|
entity_id and a service_id set to 0
|
|
|
|
|
:type msg: pylmcp.message.Message | pylmcp.Object
|
2019-10-29 12:51:14 +01:00
|
|
|
"""
|
2019-11-15 11:33:51 +01:00
|
|
|
if isinstance(msg, Object):
|
|
|
|
|
m = Message(obj=msg,
|
|
|
|
|
source_entity_id=self.bridge.cfg.entity_id,
|
|
|
|
|
source_service_id=0)
|
|
|
|
|
else:
|
|
|
|
|
m = msg
|
|
|
|
|
|
|
|
|
|
return self.out_socket.send(m.pack())
|
2019-10-29 12:51:14 +01:00
|
|
|
|
|
|
|
|
def __enter__(self):
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def __exit__(self, _type, _value, _tb):
|
|
|
|
|
self.stop()
|
|
|
|
|
|
2019-11-15 11:33:51 +01:00
|
|
|
def wait_for_msg(self, descriptor=None, timeout=10.0):
|
|
|
|
|
"""Wait for a message.
|
|
|
|
|
|
|
|
|
|
:param descriptor: if not None wait for a specific message that match
|
|
|
|
|
the descriptor name
|
|
|
|
|
:type descriptor: str
|
|
|
|
|
:param timeout: timeout in s
|
|
|
|
|
:type timeout: float | int
|
|
|
|
|
"""
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
current_time = start_time
|
|
|
|
|
msg = None
|
|
|
|
|
|
|
|
|
|
while current_time - start_time < timeout:
|
|
|
|
|
if self.has_msg():
|
|
|
|
|
msg = self.read_msg()
|
|
|
|
|
if descriptor is None or msg.descriptor == descriptor:
|
|
|
|
|
break
|
|
|
|
|
else:
|
|
|
|
|
msg = None
|
|
|
|
|
time.sleep(0.2)
|
|
|
|
|
current_time = time.time()
|
|
|
|
|
|
|
|
|
|
if msg is None:
|
|
|
|
|
raise ServerTimeout
|
|
|
|
|
|
|
|
|
|
return msg
|
|
|
|
|
|
2019-10-29 12:51:14 +01:00
|
|
|
def start_listening(self):
|
|
|
|
|
"""Starts a thread listening for incoming messages.
|
|
|
|
|
|
|
|
|
|
This function is called on Server initialization
|
|
|
|
|
"""
|
|
|
|
|
def listen():
|
|
|
|
|
poller = zmq.Poller()
|
|
|
|
|
poller.register(self.in_socket, zmq.POLLIN)
|
|
|
|
|
|
|
|
|
|
while not self.stop_listening:
|
|
|
|
|
ready_sockets = poller.poll(timeout=1000)
|
|
|
|
|
if ready_sockets:
|
|
|
|
|
for socket in ready_sockets:
|
|
|
|
|
raw_msg = socket[0].recv(0, True, False)
|
|
|
|
|
self.in_msg_queue.put(raw_msg)
|
|
|
|
|
|
|
|
|
|
self.listen_task = threading.Thread(target=listen,
|
|
|
|
|
name='uxas-listen')
|
|
|
|
|
self.listen_task.daemon = True
|
|
|
|
|
self.listen_task.start()
|