Source code for streamkinect2.server

"""
Server
======
"""
from collections import namedtuple
from logging import getLogger
import platform
import socket
import uuid
import weakref

from blinker import Signal
from tornado.ioloop import IOLoop
import zeroconf
import zmq
from zmq.eventloop.zmqstream import ZMQStream

from .common import EndpointType, MessageType, make_msg, parse_msg
from .compress import DepthFrameCompressor

# Global zeroconf object pool keyed by bind address
_ZC_POOL = {}
def _get_zeroconf(bindaddress):
    try:
        return _ZC_POOL[bindaddress]
    except KeyError:
        _ZC_POOL[bindaddress] = zeroconf.Zeroconf(bindaddress)
        return _ZC_POOL[bindaddress]

# Our Zeroconf service type
_ZC_SERVICE_TYPE = '_kinect2._tcp.local.'

# Global logging object
log = getLogger(__name__)

[docs]class ServerInfo(namedtuple('ServerInfo', ['name', 'endpoint'])): """Kinect2 Stream server information. This is a subclass of the bultin :py:class:`tuple` class with named accessors for convenience. The tuple holds *name*, *endpoint* pairs. .. py:attribute:: name A server-provided human-readable name for the server. .. py:attribute:: endpoint Connection information for control channel which should be passed to :py:class:`streamkinect2.client.Client`. """
class _KinectRecord(namedtuple('_KinectRecord', ['kinect', 'endpoints', 'streams', 'depth_compresser'])): pass
[docs]class Server(object): """A server capable of streaming Kinect2 data to interested clients. Servers may have their lifetime managed by using them within a ``with`` statement:: with Server() as s: # server is running pass # server has stopped *address* and *port* are the bind address (as a decimal-dotted IP address) and port from which to start serving. If *port* is None, a random port is chosen. If *address* is *None* then attempt to infer a sensible default. *name* should be some human-readable string describing the server. If *None* then a sensible default name is used. *zmq_ctx* should be the zmq context to create servers in. If *None*, then :py:meth:`zmq.Context.instance` is used to get the global instance. If not *None*, *io_loop* is the event loop to pass to :py:class:`zmq.eventloop.zmqstream.ZMQStream` used to communicate with the cleint. If *None* then global IOLoop instance is used. If *announce* is True then the server will be announced over ZeroConf when it starts running. .. py:attribute:: address The address bound to as a decimal-dotted string. .. py:attribute:: endpoints The zeromq endpoints for this server. A *dict*-like object keyed by endpoint type. (See :py:class:`streamkinect2.common.EndpointType`.) .. py:attribute:: is_running *True* when the server is running, *False* otherwise. .. py:attribute:: kinects :py:class:`list` of kinect devices managed by this server. See :py:meth:`add_kinect`. """ def __init__(self, address=None, start_immediately=False, name=None, zmq_ctx=None, io_loop=None, announce=True): # Choose a sensible name if none is specified if name is None: import getpass import platform name = 'Kinect {0}'.format(uuid.uuid4()) # Get a zeroconf instance appropriate to the bind address if announce: self._zc = _get_zeroconf(address) else: self._zc = None if address is None: if announce: address = self._zc.intf else: address = '0.0.0.0' # By default bind to any interface # Set public attributes self.is_running = False self.name = name self.address = address self.endpoints = {} self._announce = announce # If we announce over zero conf then we can use '.local' addressing. # Otherwise, fall back to the specified address. if self._announce: self._server_address = '{0}.local'.format(platform.node()) else: self._server_address = socket.gethostbyaddr(self.address)[0] # zmq streams for each endpoint self._streams = {} self._io_loop = io_loop # kinects which we manage. Keyed by device id. self._kinects = { } if zmq_ctx is None: zmq_ctx = zmq.Context.instance() self._zmq_ctx = zmq_ctx if start_immediately: self.start() def __del__(self): if self.is_running: self.stop()
[docs] def add_kinect(self, kinect): """Add a Kinect device to this server. *kinect* should be a object implementing the same interface as :py:class:`streamkinect2.mock.MockKinect`. """ endpoints, streams = {}, {} # Create zeromq sockets endpoints_to_create = [ (zmq.PUB, EndpointType.depth), ] for type, key in endpoints_to_create: streams[key], endpoints[key] = self._create_and_bind_socket(type) depth_compresser = DepthFrameCompressor(kinect, io_loop=self._io_loop) self._kinects[kinect.unique_kinect_id] = _KinectRecord(kinect, endpoints, streams, depth_compresser) # Register our interest in compressed frames DepthFrameCompressor.on_compressed_frame.connect( self._on_compressed_frame, sender=depth_compresser)
[docs] def remove_kinect(self, kinect): """Remove a Kinect device previously added via :py:meth:`add_kinect`.""" # Find this kinect's record record = self._kinects[kinect.unique_kinect_id] # Remove it from the list del self._kinects[kinect.unique_kinect_id] # Disconnect signal handlers DepthFrameCompressor.on_compressed_frame.disconnect( self._on_compressed_frame, sender=record.depth_compresser)
@property
[docs] def kinects(self): # Return a list rather than exposing the fact that we store kinects in # a set. return list(k.kinect for k in self._kinects.values())
[docs] def start(self): """Explicitly start the server. If the server is already running, this has no effect beyond logging a warning. """ if self.is_running: log.warn('Server already running') return # Create zeromq sockets endpoints_to_create = [ (zmq.REP, EndpointType.control), ] for type, key in endpoints_to_create: self._streams[key], self.endpoints[key] = self._create_and_bind_socket(type) # Listen for incoming messages self._streams[EndpointType.control].on_recv_stream(self._control_recv) # Use the control endpoint's port as the port to advertise on zeroconf control_port = int(self.endpoints[EndpointType.control].split(':')[2]) if self._announce: # Create a Zeroconf service info for ourselves self._zc_info = zeroconf.ServiceInfo(_ZC_SERVICE_TYPE, '.'.join((self.name, _ZC_SERVICE_TYPE)), address=socket.inet_aton(self._zc.intf), port=control_port, properties={}, server=self._server_address) # register ourselves with zeroconf log.info('Registering server "{0}" with Zeroconf'.format(self.name)) self._zc.registerService(self._zc_info) self.is_running = True
[docs] def stop(self): """Explicitly stop the server. If the server is not running this has no effect beyond logging a warning. """ if not self.is_running: log.warn('Server already stopped') return if self._announce: # unregister ourselves with zeroconf log.info('Unregistering server "{0}" with Zeroconf'.format(self.name)) self._zc.unregisterService(self._zc_info) # close the sockets for s in self._streams.values(): s.socket.close() self._streams = {} self.is_running = False
def _current_me(self): devices = [] for device in self._kinects.values(): devices.append({ 'id': device.kinect.unique_kinect_id, 'endpoints': dict((k.name, v) for k, v in device.endpoints.items()), }) return { 'version': 1, 'name': self.name, 'endpoints': dict((k.name, v) for k, v in self.endpoints.items()), 'devices': devices, } def _handle_control(self, type, payload): """Handle a control message. Return a pair giving the type and payload of the response.""" if type == MessageType.ping: log.info('Got ping from client') return MessageType.pong, None elif type == MessageType.who: return MessageType.me, self._current_me() else: log.warn('Unknown message type from client: "{0}"'.format(type)) return MessageType.error, { 'code': 400, 'reason': 'Unknown message type "{0}"'.format(type) } def _create_and_bind_socket(self, type): """Create and bind a socket of the specified type. Returns the ZMQStream and endpoint address. """ socket = self._zmq_ctx.socket(type) port = socket.bind_to_random_port('tcp://{0}'.format(self.address)) return ZMQStream(socket, self._io_loop), 'tcp://{0}:{1}'.format(self._server_address, port) def __enter__(self): self.start() return self def __exit__(self, type, value, traceback): self.stop() def _control_recv(self, stream, msg): # Read message try: type, payload = parse_msg(msg) except ValueError as e: stream.send_multipart(make_msg(MessageType.error, { 'code': 400, 'reason': str(e), })) log.warn('Server received a bad message: {0}'.format(e)) return # Handle control packet and receive response type and payload r_type, r_payload = self._handle_control(type, payload) # Send response stream.send_multipart(make_msg(r_type, r_payload)) def _on_compressed_frame(self, depth_compresser, compressed_frame): kinect_id = depth_compresser.kinect.unique_kinect_id try: record = self._kinects[kinect_id] except KeyError: log.warn('Got depth from from unknown kinect "{0}"'.format(kinect_id)) # Send data to clients stream = record.streams[EndpointType.depth] stream.send(compressed_frame) stream.flush()
[docs]class ServerBrowser(object): """An object which listens for kinect2 streaming servers on the network. The object will keep listening as long as it is alive and so if you want to continue to receive notification of servers, you should keep it around. *io_loop* is an instance of :py:class:`tornado.ioloop.IOLoop` which should be used to schedule sending signals. If *None* then the global instance is used. This is needed because server discovery happens on a separate thread to the tornado event loop which is used for the rest of the network communication. Hence, when a server is discovered, the browser co-ordinates with the event loop to call the :py:meth:`add_server` and :py:meth:`remove_server` methods on the main IOLoop thread. *address* is an explicit bind IP address for an interface to listen on as a decimal-dotted string or *None* to use the default. """ on_add_server = Signal() """Signal emitted when a new server is discovered on the network. Receivers should take a single keyword argument, *server_info*, which will be an instance of :py:class:`ServerInfo` describing the server.""" on_remove_server = Signal() """Signal emitted when a server removes itself from the network. Receivers should take a single keyword argument, *server_info*, which will be an instance of :py:class:`ServerInfo` describing the server.""" def __init__(self, io_loop=None, address=None): self._io_loop = io_loop or IOLoop.instance() # A browser. Note the use of a weak reference to us. self._browser = zeroconf.ServiceBrowser(_get_zeroconf(address), _ZC_SERVICE_TYPE, ServerBrowser._Listener(weakref.ref(self))) class _Listener(object): """Listen for ZeroConf service announcements. The browser object is kept as a weak reference so that we don't end up with circular references. """ def __init__(self, browser_ref): self.browser_ref = browser_ref # List of ServerInfo records keyed by FQDN self._servers = { } def addService(self, zeroconf, type, name): browser = self.browser_ref() if browser is None: return # Skip types we don't know about if type != _ZC_SERVICE_TYPE: return # pragma: no cover assert name.endswith('.' + _ZC_SERVICE_TYPE) zc_info = zeroconf.getServiceInfo(type, name) if zc_info is None: # Service went away return log.info('Service discovered: {0}'.format(name)) short_name = name[:-(len(_ZC_SERVICE_TYPE)+1)] # Normalise FQDNs by stripping trailing period address = zc_info.getServer().rstrip('.') port = zc_info.getPort() # Form control endpoint address endpoint = 'tcp://{0}:{1}'.format(address, port) info = ServerInfo(name=short_name, endpoint=endpoint) self._servers[name] = info browser._io_loop.add_callback( browser.on_add_server.send, browser, server_info=info) def removeService(self, zeroconf, type, name): browser = self.browser_ref() if browser is None: return # Skip types we don't know about if type != _ZC_SERVICE_TYPE: return # pragma: no cover log.info('Service removed: {0}'.format(name)) try: info = self._servers[name] del self._servers[name] browser._io_loop.add_callback( browser.on_remove_server.send, browser, server_info=info) except KeyError: # pragma: no cover log.warn('Ignoring server which we know nothing about')