Source code for streamkinect2.client

"""
Client
======

"""
from collections import namedtuple, deque
from logging import getLogger
import functools

from blinker import Signal
import tornado.ioloop
import zmq
from zmq.eventloop.zmqstream import ZMQStream

from .common import EndpointType, ProtocolError, MessageType
from .common import make_msg, parse_msg

# Global logging object
log = getLogger(__name__)

[docs]class Client(object): """Client for a streaming kinect2 server. Usually the client will be used with a ``with`` statement:: with Client(endpoint) as c: # c is connected here pass # c is disconnected here *control_endpoint* is the zeromq control endpoint for the server which should be connected to. If not *None*, *zmq_ctx* is the zeromq context to create sockets in. If *zmq_ctx* is *None*, the global context returned by :py:meth:`zmq.Context.instance` is used. If not *None*, *io_loop* is the event loop to pass to :py:class:`zmq.eventloop.zmqstream.ZMQStream` used to listen to responses from the server. If *None* then global IO loop is used. If *connect_immediately* is *True* then the client attempts to connect when constructed. If *False* then :py:meth:`connect` must be used explicitly. .. py:attribute:: server_name A string giving a human-readable name for the server or *None* if the server has not yet replied to our initial query. .. py:attribute:: endpoints A :py:class:`dict` of endpoint addresses keyed by :py:class:`streamkinect2common.EndpointType`. .. py:attribute:: is_connected *True* if the client is connected. *False* otherwise. The following attributes are mostly of use to the unit tests and advanced users. .. py:attribute:: heartbeat_period The delay, in milliseconds, between "heartbeat" requests to the server. These are used to ensure the server is still alive. Changes to this attribute are ignored once :py:meth:`connect` has been called. .. py:attribute:: response_timeout The maximum wait time, in milliseconds, the client waits for the server to reply before giving up. """ on_connect = Signal() """A signal which is emitted when the client connects to a server.""" on_disconnect = Signal() """A signal which is emitted when the client disconnects from a server.""" on_add_kinect = Signal() """A signal which is emitted when a new kinect device is available. Handlers should accept a single keyword argument *kinect_id* which is the unique id associated with the new device.""" on_remove_kinect = Signal() """A signal which is emitted when a kinect device is removed. Handlers should accept a single keyword argument *kinect_id* which is the unique id associated with the new device.""" on_depth_frame = Signal() """A signal which is emitted when a new depth frame is available. Handlers should accept two keyword arguments: *depth_frame* which will be an instance of an object with the same interface as :py:class:`DepthFrame` and *kinect_id* which will be the unique id of the kinect device producing the depth frame.""" def __init__(self, control_endpoint, connect_immediately=False, zmq_ctx=None, io_loop=None): self.is_connected = False self.server_name = None self.endpoints = { EndpointType.control: control_endpoint } # Default values for timeouts, periods, etc self.heartbeat_period = 10000 self.response_timeout = 5000 if zmq_ctx is None: zmq_ctx = zmq.Context.instance() self._zmq_ctx = zmq_ctx self._io_loop = io_loop or tornado.ioloop.IOLoop.instance() self._response_handlers = deque() # Heartbeat callback self._heartbeat_callback = None # Dictionary of device records keyed by id self._kinect_records = {} # ZMQStream for control socket self._control_stream = None # Handle to timeout when waiting for a response self._response_timeout_handle = None if connect_immediately: self.connect() @property def kinect_ids(self): return list(self._kinect_records.keys())
[docs] def ping(self, pong_cb=None): """Send a 'ping' request to the server. If *pong_cb* is not *None*, it is a callable which is called with no arguments when the pong response has been received. """ self._ensure_connected() def pong(type, payload, pong_cb=pong_cb): if pong_cb is not None: pong_cb() self._control_send(MessageType.ping, recv_cb=pong)
[docs] def enable_depth_frames(self, kinect_id): """Enable streaming of depth frames. *kinect_id* is the id of the device which should have streaming enabled. :raises ValueError: if *kinect_id* does not correspond to a connected device """ try: record = self._kinect_records[kinect_id] except KeyError: raise ValueError('Kinect id "{0}" does not correspond to a connected device'.format( kinect_id)) # Create subscriber stream socket = self._zmq_ctx.socket(zmq.SUB) socket.connect(record.endpoints[EndpointType.depth]) socket.setsockopt_string(zmq.SUBSCRIBE, u'') stream = ZMQStream(socket, self._io_loop) record.streams[EndpointType.depth] = stream # Fire signal on incoming depth frame def on_recv(msg, kinect_id=kinect_id): # TODO: decompress frame self.on_depth_frame.send(self, kinect_id=kinect_id, depth_frame=None) # Wire up callback stream.on_recv(on_recv)
[docs] def connect(self): """Explicitly connect the client.""" if self.is_connected: log.warn('Client already connected') return # Create, connect and wire up control socket listener self._connect_control_endpoint() # We should not have any pending response timeouts assert self._response_timeout_handle is None self.is_connected = True # Kick off an initial "who-me" request self._who_me() # Create and start the heartbeat callbacl self._heartbeat_callback = tornado.ioloop.PeriodicCallback( self._who_me, self.heartbeat_period, self._io_loop) self._heartbeat_callback.start() # Finally, signal connection self.on_connect.send(self)
[docs] def disconnect(self): """Explicitly disconnect the client.""" if not self.is_connected: log.warn('Client not connected') return # Cancel any pending response timeout if self._response_timeout_handle is not None: self._io_loop.remove_timeout(self._response_timeout_handle) # Stop heartbeat callback if self._heartbeat_callback is not None: self._heartbeat_callback.stop() self._heartbeat_callback = None # TODO: check if disconnect() on the sockets is necessary self._control_stream = None self.is_connected = False # Finally, signal disconnection self.on_disconnect.send(self)
_KinectRecord = namedtuple('_KinectRecord', ['endpoints', 'streams']) def _who_me(self): """Request the list of endpoints from the server. """ # Handler function def got_me(type, payload): if type != MessageType.me: raise ProtocolError('Expected me list but got "{0}" instead'.format(type)) log.info('Received "me" from server') if payload is None or 'version' not in payload or payload['version'] != 1: log.error('me had wrong or missing version') raise ProtocolError('unknown server protocol') # Fill in server information self.server_name = payload['name'] log.info('Server identifies itself as "{0}"'.format(self.server_name)) # Remember the old kinect ids old_kinect_ids = set(self._kinect_records.keys()) # Extract kinects devices = payload['devices'] new_records = {} for device in devices: # Fetch or create the record for this device try: record = self._kinect_records[device['id']] except KeyError: record = Client._KinectRecord(endpoints={}, streams={}) new_records[device['id']] = record # Fill in endpoint and stream dictionaries for device for ep_type in EndpointType: # See if this endpoint is in the payload ep = None try: ep = device['endpoints'][ep_type.name] except KeyError: pass if ep is None and ep_type in record.endpoints: # Endpoint has gone away but was there del record.endpoints[ep_type] del record.streams[ep_type] elif ep is not None: # Is this a new or changed endpoint endpoint? if ep_type not in record.endpoints or record.endpoints[ep_type] != ep: # Record new/changed endpoint record.endpoints[ep_type] = ep # Initially there are no streams for any endpoint to avoid # subscribing to services we do not need. record.streams[ep_type] = None # Update kinect records self._kinect_records = new_records # Fill in out server endpoint list from payload endpoints = payload['endpoints'] for endpoint_type in EndpointType: try: self.endpoints[endpoint_type] = endpoints[endpoint_type.name] log.info('Server added "{0.name}" endpoint at "{1}"'.format( endpoint_type, endpoints[endpoint_type.name])) except KeyError: # Skip endpoints we don't know about pass # Send {add,remove}_kinect events... new_kinect_ids = set(self._kinect_records.keys()) # ... for devices in new list and not in old for k_id in new_kinect_ids.difference(old_kinect_ids): self.on_add_kinect.send(self, kinect_id=k_id) # ... for devices in old list and not in new for k_id in old_kinect_ids.difference(new_kinect_ids): self.on_remove_kinect.send(self, kinect_id=k_id) # Send packet log.info('Requesting server identity') self._control_send(MessageType.who, recv_cb=got_me) def _ensure_connected(self): if not self.is_connected: raise RuntimeError('Client is not connected') def __enter__(self): self.connect() return self def __exit__(self, type, value, traceback): self.disconnect() def _connect_control_endpoint(self): control_endpoint = self.endpoints[EndpointType.control] # Disconnect any existing socket (or, rather, let GC do it) if self._control_stream is not None: self._control_stream = None # Create, connect and wire up control socket listener control_socket = self._zmq_ctx.socket(zmq.REQ) control_socket.connect(control_endpoint) self._control_stream = ZMQStream(control_socket, self._io_loop) self._control_stream.on_recv(self._control_recv) def _control_send(self, type, payload=None, recv_cb=None): """Send *payload* formatted as a JSON object along the control socket. If *recv_cb* is not *None*, it is a callable which is called with the type and Python object representing the response payload from the server. If there is no payload, None is passed. """ # (Re-)set response timeout if self._response_timeout_handle is not None: self._io_loop.remove_timeout(self._response_timeout_handle) self._io_loop.call_later(self.response_timeout * 1e-3, self._response_timed_out) # Add the response handler and send the message self._response_handlers.append(recv_cb) self._control_stream.send_multipart(make_msg(type, payload)) def _control_recv(self, msg): """Called when there is something to be received on the control socket.""" # If we're disconnected, then just drop the incoming packet. if not self.is_connected: return # Parse message type, payload = parse_msg(msg) # Do we have a recv handler? handler = self._response_handlers.popleft() if handler is not None: handler(type, payload) def _response_timed_out(self): """Called when the response timeout fires.""" # Do nothing if already disconnected or if there are no pending requests if not self.is_connected or len(self._response_handlers) == 0: return log.error('Client timed out while waiting for server response') self.disconnect()