Source code for libmushu.ampdecorator

# ampdecorator.py
# Copyright (C) 2013  Bastian Venthur
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.


"""
This module provides the :class:`AmpDecorator` class.

As a user, it is very unlikely that you'll have to deal with it
directly. Its main purpose is to add additional functionality to the low
level amplifier drivers. This functionality includes features like:
saving data to a file. or being able to receive marker via network
(TCP/IP and UDP).

By using the :func:`libmushu.__init__.get_amp` method, you'll
automatically receive decorated amplifiers.

"""

from __future__ import division

import select
import socket
import time
from multiprocessing import Process, Queue, Event
import os
import struct
import json
import logging
import asyncore
import asynchat

from libmushu.amplifier import Amplifier


logger = logging.getLogger(__name__)
logger.info('Logger started')


END_MARKER = '\n'
BUFSIZE = 2**16
PORT = 12344


[docs]class AmpDecorator(Amplifier): """This class 'decorates' the Low-Level Amplifier classes with Network-Marker and Save-To-File functionality. You use it by decorating (not as in Python-Decorator, but in the GoF sense) the low level amplifier class you want to use:: import libmushu from libmushu.ampdecorator import AmpDecorator from libmushu.driver.randomamp import RandomAmp amp = Ampdecorator(RandomAmp) Waring: The network marker timings on Windows have a resolution of 10ms-15ms. On Linux the resolution is 1us. This is due to limitations of Python's time.time method, or rather a Windows specific issue. There exists currently no precise timer, providing times which are comparable between two running processes on Windows. The performance counter provided on Windows, has a much better resolution but is relative to the processes start time and it drifts (1s per 100s), so it is only precise for a relatively short amount of time. If a higher precision is needed one has to replace the time.time calls with something which provides a better precision. For example one could create a third process which provides times or regularly synchronize both processes with the clock synchronization algorithm as described here: http://en.wikipedia.org/wiki/Network_Time_Protocol Alternatively one could use `timeGetTime` from Windows' Multi Media library, which is tunable via `timeBeginPeriod` and provides a precision of 1-2ms. Apparently this is the way Chrome and many others do it.:: from __future__ import division from ctypes import windll import time timeBeginPeriod = windll.winmm.timeBeginPeriod timeEndPeriod = windll.winmm.timeEndPeriod timeGetTime = windll.winmm.timeGetTime if __name__ == '__main__': # wrap the code that needs high precision in timeBegin- and # timeEndPeriod with the same parameter. The parameter is # the interval in ms you want as precision. Usually the # minimum value allowed is 1 (best). timeBeginPeriod(1) times = [] t_start = time.time() while time.time() < (time.time() + 1): times.append(timeGetTime()) times = sorted(list(set(times))) print(1000 / len(times)) timeEndPeriod(1) """ def __init__(self, ampcls): self.amp = ampcls() self.write_to_file = False @property
[docs] def presets(self): return self.amp.presets
[docs] def start(self, filename=None): # prepare files for writing self.write_to_file = False if filename is not None: self.write_to_file = True filename_marker = filename + '.marker' filename_eeg = filename + '.eeg' filename_meta = filename + '.meta' for filename in filename_marker, filename_eeg, filename_meta: if os.path.exists(filename): logger.error('A file "%s" already exists, aborting.' % filename) raise Exception self.fh_eeg = open(filename_eeg, 'wb') self.fh_marker = open(filename_marker, 'w') self.fh_meta = open(filename_meta, 'w') # write meta data meta = {'Channels': self.amp.get_channels(), 'Sampling Frequency': self.amp.get_sampling_frequency(), 'Amp': str(self.amp) } json.dump(meta, self.fh_meta, indent=4) # start the marker server self.marker_queue = Queue() self.tcp_reader_running = Event() self.tcp_reader_running.set() tcp_reader_ready = Event() self.tcp_reader = Process(target=marker_reader, args=(self.marker_queue, self.tcp_reader_running, tcp_reader_ready ) ) self.tcp_reader.start() logger.debug('Waiting for marker server to become ready...') tcp_reader_ready.wait() logger.debug('Marker server is ready.') # zero the sample counter self.received_samples = 0 # start the amp self.amp.start()
[docs] def stop(self): # stop the amp self.amp.stop() # stop the marker server self.tcp_reader_running.clear() logger.debug('Waiting for marker server process to stop...') self.tcp_reader.join() logger.debug('Marker server process stopped.') # close the files if self.write_to_file: logger.debug('Closing files.') for fh in self.fh_eeg, self.fh_marker, self.fh_meta: fh.close()
[docs] def configure(self, **kwargs): self.amp.configure(**kwargs)
[docs] def get_data(self): """Get data from the amplifier. This method is supposed to get called as fast as possible (i.e hundreds of times per seconds) and returns the data and the markers. Returns ------- data : 2darray a numpy array (time, channels) of the EEG data markers : list of (float, str) a list of markers. Each element is a tuple of timestamp and string. The timestamp is the time in ms relative to the onset of the block of data. Note that negative values are *allowed* as well as values bigger than the length of the block of data returned. That is to be interpreted as a marker from the last block and a marker for a future block respectively. """ # get data and marker from underlying amp data, marker = self.amp.get_data() t = time.time() # length in sec of the new block according to #samples and fs block_duration = len(data) / self.amp.get_sampling_frequency() # abs time of start of the block t0 = t - block_duration # duration of all blocks in ms except the current one duration = 1000 * self.received_samples / self.amp.get_sampling_frequency() # merge markers tcp_marker = [] while not self.marker_queue.empty(): m = self.marker_queue.get() m[0] = (m[0] - t0) * 1000 tcp_marker.append(m) marker = sorted(marker + tcp_marker) # save data to files if self.write_to_file: for m in marker: self.fh_marker.write("%f %s\n" % (duration + m[0], m[1])) self.fh_eeg.write(struct.pack("f"*data.size, *data.flatten())) self.received_samples += len(data) if len(data) == 0 and len(marker) > 0: logger.error('Received marker but no data. This is an error, the amp should block on get_data until data is available. Marker timestamps will be unreliable.') return data, marker
[docs] def get_channels(self): return self.amp.get_channels()
[docs] def get_sampling_frequency(self): return self.amp.get_sampling_frequency()
[docs]def marker_reader(queue, running, ready): """Start the TCP and UDP MarkerServers and start the receiving loop. This method runs in a separate process and receives UDP and TCP markers. Whenever a marker is received, it is put together with a timestamp into a queue. After the TCP and UDP servers are set up the ``ready`` event is set and the method enters the loop that runs forever until the ``running`` Event is cleared. Received markers are put in the ``queue``. Parameters ---------- queue : Queue this queue is used to send markers to a different process running : Event this event is used to signal this process to terminate its main loop ready : Event this signal is used to signal the "parent"-process that this process is ready to receive marker """ MarkerServer(queue, 'udp') MarkerServer(queue, 'tcp') ready.set() while running.is_set(): asyncore.loop(timeout=5, count=1)
[docs]class MarkerServer(asyncore.dispatcher): """The marker server. It opens a TCP or UDP socket and assigns a :class:`MarkerHandler` to the opened socket. """ def __init__(self, queue, proto): """Initialize the Server. Parameters ---------- queue : multiprocessing.Queue instance proto : string The protocol to use. Can be either 'tcp' or 'udp'. Raises ------ ValueError : if the protocol is unsupported """ asyncore.dispatcher.__init__(self) self.queue = queue if proto.lower() == 'tcp': logger.debug('Opening TCP socket.') self.create_socket(socket.AF_INET, socket.SOCK_STREAM) self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) self.setblocking(0) self.bind(('', PORT)) self.listen(5) elif proto.lower() == 'udp': logger.debug('Opening UDP socket.') self.create_socket(socket.AF_INET, socket.SOCK_DGRAM) self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.setblocking(0) self.bind(('', PORT)) # in contrast to a TCP socket, an UDP socket has no # connection, so the socket is immediately ready to receive # data handler = MarkerHandler(self, self.queue) else: raise ValueError('Unsupported protocol: {proto}'.format(proto=proto))
[docs] def handle_accept(self): """Accept an incomming TCP connection. """ pair = self.accept() if pair is not None: sock, addr = pair logger.debug('Incoming connection from {addr}'.format(addr=addr)) handler = MarkerHandler(sock, self.queue)
[docs]class MarkerHandler(asynchat.async_chat): """Handler for incoming data streams. This handler processes incoming data from a TCP or UDP sockets. Each packet ends with a terminator character sequence. The handler takes care of incomplete packets and puts complete packets in the queue. """ def __init__(self, socket, queue): """Initialize the Handler. Parameters ---------- socket : socket.socket the socket can be TCP or UDP. In case of UDP the socket must be binded already, the TCP socket must be an opened connection (i.e. after accept) queue : multiprocessing.Queue instance The queue to send the received markers to. """ asynchat.async_chat.__init__(self, socket) self.set_terminator(END_MARKER) self.data = '' self.timestamp = None self.queue = queue
[docs] def handle_close(self): logger.debug('Connection closed by peer, closing connection.') self.close()
[docs] def writable(self): """Signal weather the socket is ready to send data. Returns ------- writable : bool ready to send or not """ # if we don't set the writable flag to false, the UDP socket # will signal that it is ready to send data on every iteration # of the asycore loop, which will cause massive CPU strain. this # is not the case for TCP sockets, but doesn't hurt either. return False
[docs] def collect_incoming_data(self, data): """Got potentially partial data packet. This method collects potentially incomplete data packets and records the timestamp when the first part of the incomplete data packet arrived. Parameters ---------- data : str the data packet """ if self.timestamp is None: self.timestamp = time.time() #logger.debug('Received maybe incomlete data: {data}'.format(data=data)) self.data = self.data + data
[docs] def found_terminator(self): """Found a complete packet. A complete data packet has arrived. Put the data packet with its timestamp in the queue. And reset the timestamp. """ # to something with data #logger.debug('Received {data}'.format(data=self.data)) self.queue.put([self.timestamp, self.data]) self.data = '' self.timestamp = None
[docs] def handle_error(self): """An error occurred. """ logger.error('An error occurred.') self.close() # the default implementation prints a condensed tracebackk which # is not useful at all, so we re-raise the exception raise