Source code for streamkinect2.mock

"""
Mock kinect
===========

.. note::

    This module requires :py:mod:`numpy` to be installed.

Support for a mock kinect when testing.

"""
from collections import namedtuple
import threading
import time
import uuid

from blinker import Signal
import numpy as np

def _make_mock(frame_shape):
    xs, ys = np.meshgrid(np.arange(frame_shape[1]), np.arange(frame_shape[0]))
    wall = np.abs(ys>>1) + 1000
    sphere = np.sqrt((xs-(frame_shape[1]>>1))*(xs-(frame_shape[1]>>1)) + \
            (ys-(frame_shape[0]>>1))*(ys-(frame_shape[0]>>1))) + 500
    return wall.astype(np.uint16), sphere.astype(np.uint16)

[docs]class DepthFrame(namedtuple('DepthFrame', ('data', 'shape'))): """A single frame of depth data. .. py:attribute:: data Python buffer-like object pointing to raw frame data as a C-ordered array of uint16. .. py:attribute:: shape Pair giving the width and height of the depth frame. """
[docs]class MockKinect(threading.Thread): """A mock Kinect device. This class implements a "virtual" Kinect which generates some mock data. It can be used for testing or benchmarking. Use :py:meth:`start` and :py:meth:`stop` to start and stop the device or wrap it in a ``with`` statement:: with MockKinect() as kinect: # kinect is running here pass # kinect has stopped running .. note:: Listener callbacks are called in a separate thread. If using something like :py:class:`tornado.ioloop.IOLoop`, then you will need to make sure that server messages are sent on the right thread. The :py:class:`streamkinect2.server.Server` class should take care of that in most cases you will encounter. .. py:attribute:: unique_kinect_id A string with an opaque, unique id for this Kinect. """ on_depth_frame = Signal() """A signal which is emitted when a new depth frame is available. Handlers should accept a single keyword argument *depth_frame* which will be an instance of :py:class:`DepthFrame`.""" def __init__(self): super(MockKinect, self).__init__() # Invent unique id self.unique_kinect_id = uuid.uuid4().hex self._frame_shape = (424, 512) self._wall, self._sphere = _make_mock(self._frame_shape) self._should_stop = False def __enter__(self): self.start() return self def __exit__(self, type, value, traceback): self.stop()
[docs] def start(self): """Start the mock device running. Mock data is generated on a separate thread. """ super(MockKinect, self).start()
[docs] def stop(self): """Stop the mock device running. Blocks until the thread shuts down gracefully with a one second timeout. """ self._should_stop = True self.join(1)
def run(self): while not self._should_stop: then = time.time() dx = int(np.sin(then) * 100) df = np.minimum(self._wall, np.roll(self._sphere, dx, 1)) df = np.asarray(df, order='C', dtype=np.uint16) depth_frame = DepthFrame(data=bytes(df.data), shape=df.shape[::-1]) self.on_depth_frame.send(self, depth_frame=depth_frame) now = time.time() # HACK: aim for just above 30FPS time.sleep(max(0, (1.0/35.0) - (now-then)))