Source code for streamkinect2.compress

"""
Depth frame compression
=======================

"""
from logging import getLogger
from io import BytesIO
from multiprocessing.pool import Pool
from multiprocessing import cpu_count

from blinker import Signal
import lz4
import numpy as np
import tornado.ioloop

log = getLogger(__name__)

def _compress_depth_frame(depth_frame):
    try:
        d = np.frombuffer(depth_frame.data, dtype=np.uint16).reshape(
                depth_frame.shape[::-1], order='C')
        high_bits = ((d >> 4) & 0xff).astype(np.uint8)
        low_bits = (d & 0xf).astype(np.uint8)
        packed_low_bits = (low_bits[:,0::2]<<4) | low_bits[:,1::2]

        bio = BytesIO()
        bio.write(np.asarray(high_bits, order='C').data)
        bio.write(np.asarray(packed_low_bits, order='C').data)
        return lz4.dumps(bio.getvalue())
    except Exception as e:
        print('Error: {0}'.format(e))
        return None

[docs]class DepthFrameCompressor(object): """ Asynchronous compression pipeline for depth frames. *kinect* is a :py:class:`streamkinect2.mock.MockKinect`-like object. Depth frames emitted by :py:meth:`on_depth_frame` will be compressed with frame-drop if the compressor becomes overloaded. If *io_loop* is provided, it specifies the :py:class:`tornado.ioloop.IOLoop` which is used to co-ordinate the worker process. If not provided, the global instance is used. .. py:attribute:: kinect Kinect object associated with this compressor. """ on_compressed_frame = Signal() """Signal emitted when a new compressed frame is available. Receivers take a single keyword argument, *compressed_frame*, which is a Python buffer-like object containing the compressed frame data. The signal is emitted on the IOLoop thread.""" # The maximum number of frames we can be waiting for before we start # dropping them. _MAX_IN_FLIGHT = cpu_count() + 1 def __init__(self, kinect, io_loop=None): # Public attributes self.kinect = kinect # Private attributes self._io_loop = io_loop or tornado.ioloop.IOLoop.instance() self._pool = Pool() # worker process pool self._n_in_flight = 0 # How many frames are we waiting for? self._n_dropped = 0 # Wire ourselves up for depth frame events kinect.on_depth_frame.connect(self._on_depth_frame, sender=kinect) def __del__(self): # As a courtesy, terminate the worker pool to avoid having a sea of # dangling processes. self._pool.terminate() def _on_compressed_frame(self, compressed_frame): # Record arrival of frame self._n_in_flight -= 1 # Send signal try: self._io_loop.add_callback( self.on_compressed_frame.send, self, compressed_frame=compressed_frame ) except Exception as e: # HACK: Since multiprocessing *might* call this handler after the # io loop has shut down (which will raise an Exception) and because # there's no documented way to determine if the io loop is still # alive ahead of time, we swallow exceptions here. This should # happen rarely when one is rapidly starting and stopping IOLoops # (such as in the test-suite!) so log it as a warning. log.warn('DepthFrameCompressor swallowed {0} exception'.format(e)) def _on_depth_frame(self, kinect, depth_frame): # If we aren't waiting on too many frames, submit if self._n_in_flight < DepthFrameCompressor._MAX_IN_FLIGHT: self._pool.apply_async(_compress_depth_frame, args=(depth_frame,), callback=self._on_compressed_frame) self._n_in_flight += 1 else: # Only log every 10 dropped frames to avoid being too spammy self._n_dropped += 1 if self._n_dropped % 10 == 0: log.warn('Dropped {0} depth frames'.format(self._n_dropped))