Source code for wyrm.io

# encoding: utf8

"""Various input/output methods.

This module provides methods for loading and saving data from- and into
various formats.

"""

from __future__ import division

from os import path
import logging
import re
import json
import socket

import numpy as np
from scipy.io import loadmat

from wyrm.types import Data


logging.basicConfig(level=logging.NOTSET)
logger = logging.getLogger(__name__)


[docs]def save(dat, filename): """Save a ``Data`` object into a NumPy .npy file. Parameters ---------- dat : Data `Data` object filename : str Filename of the file to save to. If the filename does not end with ``.npy``, the ``.npy`` extension will be automatically appended. See Also -------- :func:`load` Examples -------- >>> io.save(dat, 'foo.npy') >>> dat2 = io.load('foo.npy') """ np.save(filename, dat)
[docs]def load(filename): """Load a ``Data`` object from a file. Parameters ---------- filename : str the file to load the data from Returns ------- dat : Data the data loaded from the file See Also -------- :func:`save` Examples -------- >>> io.save(dat, 'foo.npy') >>> dat2 = io.load('foo.npy') """ dat = np.load(filename) # this is truly disgusting, apparently np.load returns a numpy array # that contains the pickled ``Data`` object but the array has no # elements, shape or whatever. The workaround is to flatten it and # return the first (i.e. only) element... dat = dat.flatten()[0] return dat
[docs]def load_brain_vision_data(vhdr): """Load Brain Vision data from a file. This methods loads the continuous EEG data, and returns a ``Data`` object of continuous data ``[time, channel]``, along with the markers and the sampling frequency. The EEG data is returned in micro Volt. Parameters ---------- vhdr : str Path to a VHDR file Returns ------- dat : Data Continuous Data with the additional attributes ``.fs`` for the sampling frequency and ``.marker`` for a list of markers. Each marker is a tuple of ``(time in ms, marker)``. Raises ------ AssertionError If one of the consistency checks fails Examples -------- >>> dat = load_brain_vision_data('path/to/vhdr') >>> dat.fs 1000 >>> dat.data.shape (54628, 61) """ logger.debug('Loading Brain Vision Data Exchange Header File') with open(vhdr) as fh: fdata = map(str.strip, fh.readlines()) fdata = filter(lambda x: not x.startswith(';'), fdata) fdata = filter(lambda x: len(x) > 0, fdata) # check for the correct file version: assert fdata[0].endswith('1.0') # read all data into a dict where the key is the stanza of the file file_dict = dict() for line in fdata[1:]: if line.startswith('[') and line.endswith(']'): current_stanza = line[1:-1] file_dict[current_stanza] = [] else: file_dict[current_stanza].append(line) # translate known stanzas from simple list of strings to a dict for stanza in 'Common Infos', 'Binary Infos', 'Channel Infos': logger.debug(stanza) file_dict[stanza] = {line.split('=', 1)[0]: line.split('=', 1)[1] for line in file_dict[stanza]} # now file_dict contains the parsed data from the vhdr file # load the rest data_f = file_dict['Common Infos']['DataFile'] marker_f = file_dict['Common Infos']['MarkerFile'] data_f = path.sep.join([path.dirname(vhdr), data_f]) marker_f = path.sep.join([path.dirname(vhdr), marker_f]) n_channels = int(file_dict['Common Infos']['NumberOfChannels']) sampling_interval_microseconds = float(file_dict['Common Infos']['SamplingInterval']) fs = 1 / (sampling_interval_microseconds / 10**6) channels = [file_dict['Channel Infos']['Ch%i' % (i + 1)] for i in range(n_channels)] channels = map(lambda x: x.split(',')[0], channels) resolutions = [file_dict['Channel Infos']['Ch%i' % (i + 1)] for i in range(n_channels)] resolutions = map(lambda x: float(x.split(',')[2]), resolutions) # assert all channels have the same resolution of 0.1 # FIXME: that is not always true, for example if we measure pulse or # emg #assert all([i == 0.1 for i in resolutions]) # some assumptions about the data... assert file_dict['Common Infos']['DataFormat'] == 'BINARY' assert file_dict['Common Infos']['DataOrientation'] == 'MULTIPLEXED' assert file_dict['Binary Infos']['BinaryFormat'] == 'INT_16' # load EEG data logger.debug('Loading EEG Data.') data = np.fromfile(data_f, np.int16) data = data.reshape(-1, n_channels) data *= resolutions[0] n_samples = data.shape[0] # duration in ms duration = 1000 * n_samples / fs time = np.linspace(0, duration, n_samples, endpoint=False) # load marker logger.debug('Loading Marker.') regexp = r'^Mk(?P<mrk_nr>[0-9]*)=.*,(?P<mrk_descr>.*),(?P<mrk_pos>[0-9]*),[0-9]*,[0-9]*$' mrk = [] with open(marker_f) as fh: for line in fh: line = line.strip() match = re.match(regexp, line) if match is None: continue mrk_pos = match.group('mrk_pos') mrk_descr = match.group('mrk_descr') if len(mrk_descr) > 1: # marker := [samplenr, marker] #mrk.append([int(mrk_pos), mrk_descr]) # marker := [time in ms, marker] mrk.append([time[int(mrk_pos)], mrk_descr]) dat = Data(data, [time, channels], ['time', 'channel'], ['ms', '#']) dat.fs = fs dat.markers = mrk return dat
[docs]def load_mushu_data(meta): """Load saved EEG data in Mushu's format. This method loads saved data in Mushu's format and returns a continuous ``Data`` object. Parameters ---------- meta : str Path to `.meta` file. A Mushu recording consists of three different files: `.eeg`, `.marker`, and `.meta`. Returns ------- dat : Data Continuous Data object Examples -------- >>> dat = load_mushu_data('testrecording.meta') """ # reverse and replace and reverse again to replace only the last # (occurrence of .meta) datafile = meta[::-1].replace('atem.', 'gee.', 1)[::-1] markerfile = meta[::-1].replace('atem.', 'rekram.', 1)[::-1] assert path.exists(meta) and path.exists(datafile) and path.exists(markerfile) # load meta data with open(meta, 'r') as fh: metadata = json.load(fh) fs = metadata['Sampling Frequency'] channels = np.array(metadata['Channels']) # load eeg data data = np.fromfile(datafile, np.float32) data = data.reshape((-1, len(channels))) # load markers markers = [] with open(markerfile, 'r') as fh: for line in fh: ts, m = line.split(' ', 1) markers.append([float(ts), str(m).strip()]) # construct Data duration = len(data) * 1000 / fs axes = [np.linspace(0, duration, len(data), endpoint=False), channels] names = ['time', 'channels'] units = ['ms', '#'] dat = Data(data=data, axes=axes, names=names, units=units) dat.fs = fs dat.markers = markers return dat
[docs]def convert_mushu_data(data, markers, fs, channels): """Convert mushu data into wyrm's ``Data`` format. This convenience method creates a continuous ``Data`` object from the parameters given. The timeaxis always starts from zero and its values are calculated from the sampling frequency ``fs`` and the length of ``data``. The ``names`` and ``units`` attributes are filled with default vaules. Parameters ---------- data : 2d array an 2 dimensional numpy array with the axes: (time, channel) markers : list of tuples: (float, str) a list of markers. Each element is a tuple of timestamp and string. The timestamp is the time in ms relative to the onset of the block of data. Note that negative values are *allowed* as well as values bigger than the length of the block of data returned. That is to be interpreted as a marker from the last block and a marker for a future block respectively. fs : float the sampling frequency, this number is used to calculate the timeaxis for the data channels : list or 1d array of strings the channel names Returns ------- cnt : continuous ``Data`` object Examples -------- Assuming that ``amp`` is an Amplifier instance from ``libmushu``, already configured but not started yet: >>> amp_fs = amp.get_sampling_frequency() >>> amp_channels = amp.get_channels() >>> amp.start() >>> while True: ... data, markers = amp.get_data() ... cnt = convert_mushu_data(data, markers, amp_fs, amp_channels) ... # some more code >>> amp.stop() References ---------- https://github.com/bbci/mushu """ time_axis = np.linspace(0, 1000 * data.shape[0] / fs, data.shape[0], endpoint=False) chan_axis = channels[:] axes = [time_axis, chan_axis] names = ['time', 'channel'] units = ['uV', '#'] cnt = Data(data=data.copy(), axes=axes, names=names, units=units) cnt.markers = markers[:] cnt.fs = fs return cnt
[docs]def load_bcicomp3_ds1(dirname): """Load the BCI Competition III Data Set 1. This method loads the data set and converts it into Wyrm's ``Data`` format. Before you use it, you have to download the training- and test data in Matlab format and unpack it into a directory. .. note:: If you need the true labels of the test sets, you'll have to download them separately from http://bbci.de/competition/iii/results/index.html#labels Parameters ---------- dirname : str the directory where the ``Competition_train.mat`` and ``Competition_test.mat`` are located Returns ------- epo_train, epo_test : epoched ``Data`` objects Examples -------- >>> epo_test, epo_train = load_bcicomp3_ds1('/home/foo/bcicomp3_dataset1/') """ # construct the filenames from the dirname training_file = path.sep.join([dirname, 'Competition_train.mat']) test_file = path.sep.join([dirname, 'Competition_test.mat']) # load the training data training_data_mat = loadmat(training_file) data = training_data_mat['X'].astype('double') data = data.swapaxes(-1, -2) labels = training_data_mat['Y'].astype('int').ravel() # convert into wyrm Data axes = [np.arange(i) for i in data.shape] axes[0] = labels axes[2] = [str(i) for i in range(data.shape[2])] names = ['Class', 'Time', 'Channel'] units = ['#', 'ms', '#'] dat_train = Data(data=data, axes=axes, names=names, units=units) dat_train.fs = 1000 dat_train.class_names = ['pinky', 'tongue'] # load the test data test_data_mat = loadmat(test_file) data = test_data_mat['X'].astype('double') data = data.swapaxes(-1, -2) # convert into wyrm Data axes = [np.arange(i) for i in data.shape] axes[2] = [str(i) for i in range(data.shape[2])] names = ['Epoch', 'Time', 'Channel'] units = ['#', 'ms', '#'] dat_test = Data(data=data, axes=axes, names=names, units=units) dat_test.fs = 1000 # map labels -1 -> 0 dat_test.axes[0][dat_test.axes[0] == -1] = 0 dat_train.axes[0][dat_train.axes[0] == -1] = 0 return dat_train, dat_test
[docs]def load_bcicomp3_ds2(filename): """Load the BCI Competition III Data Set 2. This method loads the data set and converts it into Wyrm's ``Data`` format. Before you use it, you have to download the data set in Matlab format and unpack it. The directory with the extracted files must contain the ``Subject_*.mat``- and the ``eloc64.txt`` files. .. note:: If you need the true labels of the test sets, you'll have to download them separately from http://bbci.de/competition/iii/results/index.html#labels Parameters ---------- filename : str The path to the matlab file to load Returns ------- cnt : continuous `Data` object Examples -------- >>> dat = load_bcicomp3_ds2('/home/foo/data/Subject_A_Train.mat') """ STIMULUS_CODE = { # cols from left to right 1 : "agmsy5", 2 : "bhntz6", 3 : "ciou17", 4 : "djpv28", 5 : "ekqw39", 6 : "flrx4_", # rows from top to bottom 7 : "abcdef", 8 : "ghijkl", 9 : "mnopqr", 10: "stuvwx", 11: "yz1234", 12: "56789_" } # load the matlab data data_mat = loadmat(filename) # load the channel names (the same for all datasets eloc_file = path.sep.join([path.dirname(filename), 'eloc64.txt']) with open(eloc_file) as fh: data = fh.read() channels = [] for line in data.splitlines(): if line: chan = line.split()[-1] chan = chan.replace('.', '') channels.append(chan) # fix the channel names, some letters have the wrong capitalization for i, s in enumerate(channels): s2 = s.upper() s2 = s2.replace('Z', 'z') s2 = s2.replace('FP', 'Fp') channels[i] = s2 # The signal is recorded with 64 channels, bandpass filtered # 0.1-60Hz and digitized at 240Hz. The format is Character Epoch x # Samples x Channels data = data_mat['Signal'] data = data.astype('double') # For each sample: 1 if a row/colum was flashed, 0 otherwise flashing = data_mat['Flashing'].reshape(-1) #flashing = np.flatnonzero((np.diff(a) == 1)) + 1 tmp = [] for i, _ in enumerate(flashing): if i == 0: tmp.append(flashing[i]) continue if flashing[i] == flashing[i-1] == 1: tmp.append(0) continue tmp.append(flashing[i]) flashing = np.array(tmp) # For each sample: 0 when no row/colum was intensified, # 1..6 for intensified columns, 7..12 for intensified rows stimulus_code = data_mat['StimulusCode'].reshape(-1) stimulus_code = stimulus_code[flashing == 1] # 0 if no row/col was intensified or the intensified did not contain # the target character, 1 otherwise stimulus_type = data_mat.get('StimulusType', np.array([])).reshape(-1) # The target characters target_chars = data_mat.get('TargetChar', np.array([])).reshape(-1) fs = 240 data = data.reshape(-1, 64) timeaxis = np.linspace(0, data.shape[0] / fs * 1000, data.shape[0], endpoint=False) dat = Data(data=data, axes=[timeaxis, channels], names=['time', 'channel'], units=['ms', '#']) dat.fs = fs # preparing the markers target_mask = np.logical_and((flashing == 1), (stimulus_type == 1)) if len(stimulus_type) > 0 else [] nontarget_mask = np.logical_and((flashing == 1), (stimulus_type == 0)) if len(stimulus_type) > 0 else [] flashing = (flashing == 1) flashing = [[i, 'flashing'] for i in timeaxis[flashing]] targets = [[i, 'target'] for i in timeaxis[target_mask]] nontargets = [[i, 'nontarget'] for i in timeaxis[nontarget_mask]] dat.stimulus_code = stimulus_code[:] stimulus_code = zip([t for t, _ in flashing], [STIMULUS_CODE[i] for i in stimulus_code]) markers = flashing[:] markers.extend(targets) markers.extend(nontargets) markers.extend(stimulus_code) markers.sort() dat.markers = markers[:] return dat
[docs]class PyffComm(object): """Pyff communication object. This class allows for communication with a running Pyff [1]_ instance. It uses the json protocol, so you have to start Pyff with the ``--protocol=json`` parameter. Receiving data from Pyff (i.e. the available feedbacks and variables) is not supported for now. Examples -------- This is an example session, demonstrating how to load a feedback application, set a variable, start it, quit it and closing Pyff in the end. >>> pyff = PyffComm() >>> pyff.send_init('TrivialPong') >>> pyff.set_variables({'FPS': 30}) >>> pyff.play() >>> pyff.quit() >>> pyff.quit_pyff() References ---------- .. [1] Bastian Venthur, Simon Scholler, John Williamson, Sven Dähne, Matthias S Treder, Maria T Kramarek, Klaus-Robert Müller and Benjamin Blankertz. Pyff---A Pythonic Framework for Feedback Applications and Stimulus Presentation in Neuroscience. Frontiers in Neuroscience. 2010. doi: 10.3389/fnins.2010.00179. """ def __init__(self, host='localhost', port=12345): """Initialize the Pyff Communicator object. Parameters ---------- host : str, optional hostname or IP of the machine running Pyff port : int, optional port on which Pyff is listening on. """ self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.host = host self.port = port
[docs] def send_interaction_signal(self, cmd, data=None): """Send interaction signal to Pyff. .. warning:: This method is used internally to send low level JSON messages to Pyff. You should not use this method directly. Parameters ---------- cmd : str data : dict """ data = {'type' : 'interaction-signal', 'data' : data, 'commands' : [[cmd, dict()]]} json_str = json.dumps(data) self.socket.sendto(json_str, (self.host, self.port))
[docs] def send_control_signal(self, variables): """Send a control signal to the running feedback. This method is used to send events to the feedback like a new classifier output. Parameters ---------- variables : dict the keys are the variable names and the values the values. Those variables sent by the control signal are not set directly in the feedback. If you want this behave use :func:`set_variables` """ data = {'type' : 'control-signal', 'data' : variables, 'commands' : None} json_str = json.dumps(data) self.socket.sendto(json_str, (self.host, self.port))
[docs] def send_init(self, fb): """Load a Feedback. This method sends Pyff the ``send_init(feedback)`` command which loads a feedback. Parameters ---------- fb : string The name of the feedback. """ self.send_interaction_signal('sendinit', {'_feedback': fb})
[docs] def play(self): """Start the feedback. """ self.send_interaction_signal('play')
[docs] def pause(self): """Pause the feedback. """ self.send_interaction_signal('pause')
[docs] def stop(self): """Stop the feedback. """ self.send_interaction_signal('stop')
[docs] def quit(self): """Quit the feedback. """ self.send_interaction_signal('quit')
[docs] def quit_pyff(self): """Quit Pyff. """ self.send_interaction_signal('quitfeedbackcontroller')
[docs] def set_variables(self, variables): """Set internal variables in the feedback. Use this method to create or modify instance variables of the currently running feedback. Parameters ---------- variables : dict The variable names are the keys, the values are the values of the variables. """ self.send_interaction_signal(None, variables)