"""Data type definitions.
This module provides the basic data types for Wyrm, like the
:class:`Data` and :class:`RingBuffer` classes.
"""
from __future__ import division
import copy
import logging
import numpy as np
from wyrm.processing import append_cnt
logging.basicConfig(level=logging.NOTSET)
logger = logging.getLogger(__name__)
class Data(object):
[docs] """Generic, self-describing data container.
This data structure is very generic on purpose. The goal here was to
provide something which can fit the various different known and yet
unknown requirements for BCI algorithms.
At the core of ``Data`` is its n-dimensional ``.data`` attribute
which holds the actual data. Along with the data, there is meta
information about each axis of the data, contained in ``.axes``,
``.names``, and ``.units``.
Most toolbox methods rely on a *convention* how specific data should
be structured (i.e. they assume that the channels are always in the
last dimension). You don't have to follow this convention (or
sometimes it might not even be possible when trying out new things),
and all methods, provide an optional parameter to tell them on which
axis they should work on.
Continuous Data:
Continuous Data is usually EEG data and consists of a 2d array
``[time, channel]``. Whenever you have continuous data, time and
channel should be the last two dimensions.
Epoched Data:
Epoched data can be seen as an array of (non-epoched) data. The
epoch should always be the first dimension. Most commonly used is
epoched continuous EEG data which looks like this: ``[class,
time, channel]``.
Feature Vector:
Similar to Epoched Data, with classes in the first dimension.
:meth:`Data.__eq__` and :meth:`Data.__ne__` functions are provided
to test for equality of two Data objects (via ``==`` and ``!=``).
This method only checks for the known attributes and does not
guaranty correct result if the Data object contains custom
attributes. It is mainly used in unittests.
Parameters
----------
data : ndarray
axes : nlist of 1darrays
names : nlist of strings
units : nlist of strings
Attributes
----------
data : ndarray
n-dimensional data array if the array is empty
(i.e. ``data.size == 0``), the ``Data`` object is assumed to be
empty
axes : nlist of 1-darrays
each element of corresponds to a dimension of ``.data`` (i.e.
the first one in ``.axes`` to the first dimension in ``.data``
and so on). The 1-dimensional arrays contain the description of
the data along the appropriate axis in ``.data``. For example if
``.data`` contains Continuous Data, then ``.axes[0]`` should be
an array of timesteps and ``.axes[1]`` an array of channel names
names : nlist of strings
the human readable description of each axis, like 'time', or
'channel'
units : nlist of strings
the human readable description of the unit used for the data in
``.axes``
"""
def __init__(self, data, axes, names, units):
"""Initialize a new ``Data`` object.
Upon initialization we check if ``axes``, ``names``, and
``units`` have the same length and if their respective length
matches the shape of ``data``.
Raises
------
AssertionError
if the lengths of the parameters are not correct.
"""
if data.size == 0:
pass
else:
assert data.ndim == len(axes) == len(names) == len(units)
assert [len(a) for a in axes] == list(data.shape)
self.data = data
self.axes = [np.array(i) for i in axes]
self.names = names
self.units = units
def __eq__(self, other):
[docs] """Test for equality.
Don't trust this method it only checks for known attributes and
assumes equality if those are equal. This method is heavily used
in unittests.
Parameters
----------
other : Data
Returns
-------
equal : Boolean
True if ``self`` and ``other`` are equal, False if not.
"""
# check if both have the same attributes
if sorted(self.__dict__.keys()) != sorted(other.__dict__.keys()):
return False
# .data
if not np.array_equal(self.data, other.data):
return False
# .axes
if len(self.axes) != len(other.axes):
return False
for i in range(len(self.axes)):
if self.axes[i].shape != other.axes[i].shape:
return False
if not (self.axes[i] == other.axes[i]).all():
return False
# .names
if self.names != other.names:
return False
# .units
if self.units != other.units:
return False
# optional extra attributes
if hasattr(self, 'markers') and self.markers != other.markers:
return False
if hasattr(self, 'fs') and self.fs != other.fs:
return False
# the stuff we care about seems to be equal, this does not mean
# the rest we didn't check is, but anyways...
return True
def __ne__(self, other):
[docs] """Test for inequality.
If :func:`__eq__` is implemented and :func:`__ne__` is not,
strange comparisons evaluate to True like:
>>> d1 == d2 and d1 != d2
This method just returns the negation of :meth:`__eq__`. So the
same restrictions of :meth:`__eq__` about its reliability apply.
Parameters
----------
other : Data
Returns
-------
equal : Boolean
True if ``self`` and ``other`` are not equal, False
otherwise.
"""
return not self.__eq__(other)
def __nonzero__(self):
[docs] """Return the truth value for the object instance.
Similar to Python's built in types we return ``False`` if the
data instance is empty and ``True`` otherwise. Please note that
we only check for the size of ``.data`` and ignore other
attributes like ``.markers`` which might not be empty.
Examples
--------
Easy checking if a data object contains data or not:
>>> if not cnt:
... continue
is equivalent to:
>>> if cnt.data.size == 0:
... continue
Returns
-------
nonzero : int
``self.data.size``
"""
return self.data.size
# This method was added for Python3 compatibility
def __bool__(self):
[docs] """Return truth value of the object instance.
This method returns False if the __nonzero__ value is 0 else
True.
Returns
-------
truth : Bool
``False`` if :func:`__nonzero__` was ``0``, else ``True``.
See Also
--------
:func:`__nonzero__`
"""
return False if self.__nonzero__() == 0 else True
def __str__(self):
[docs] """Human readable representation for a data object.
Returns
-------
str : str
a human readable representation of the data object
"""
data = 'Data: \n%s' % self.data
axes = 'Axes: \n%s' % self.axes
names = 'Names: \n%s' % self.names
units = 'Units: \n%s' % self.units
return '\n'.join([data, axes, names, units])
def copy(self, **kwargs):
[docs] """Return a memory efficient deep copy of ``self``.
It first creates a shallow copy of ``self``, sets the attributes
in ``kwargs`` if necessary and returns a deep copy of the
resulting object.
Parameters
----------
kwargs : dict, optional
if provided ``copy`` will try to overwrite the name, value
pairs after the shallow- and before the deep copy. If no
``kwargs`` are provided, it will just return the deep copy.
Returns
-------
dat : Data
a deep copy of ``self``.
Examples
--------
>>> # perform an ordinary deep copy of dat
>>> dat2 = dat.copy()
>>> # perform a deep copy but overwrite .axes first
>>> dat.axes
['time', 'channels']
>>> dat3 = dat.copy(axes=['foo'], ['bar'])
>>> dat3.axes
['foo', 'bar']
>>> dat.axes
['time', 'channel']
"""
obj = copy.copy(self)
for name, value in list(kwargs.items()):
setattr(obj, name, value)
return copy.deepcopy(obj)
class RingBuffer(object):
[docs] """Circular Buffer implementation.
This implementation has a guaranteed upper bound for read and write
operations as well as a constant memory usage, which is the size of
the maximum length of the buffer in memory.
Reading and writing will take at most the time it takes to copy a
continuous chunk of length ``MAXLEN`` in memory. E.g. for the
extreme case of storing the last 60 seconds of 64bit data, sampled
with 1kHz and 128 channels (~60MB), reading a full buffer will take
~25ms, as well as writing when storing more than than 60 seconds at
once. Writing will be usually much faster, as one stores usually
only a few milliseconds of data per run. In that case writing will
be a fraction of a millisecond.
Parameters
----------
length_ms : int
the length of the ring buffer in milliseconds
Attributes
----------
length_ms : int
the length of the ring buffer in milliseconds
length : int
the length of the ring buffer in samples
data : ndarray
the contents of the ring buffer, you should not read or write
this attribute directly but via the :meth:`RingBuffer.get` and
:meth:`RingBuffer.append` methods
markers : array of [int, str]
the markers belonging to the data currently in the ring buffer
full : boolean
indicates if the buffer has at least ``length`` elements stored
idx : int
the starting position of the oldest data in the ring buffer
Examples
--------
>>> rb = RingBuffer(length)
>>> while True:
... rb.append(amp.get_data())
... buffered = rb.get()
... # do something with buffered
"""
def __init__(self, length_ms):
"""Initialize the Ringbuffer.
Parameters
----------
length : int
the length of the ring buffer in milliseconds
"""
# the maximum length of the ring buffer in ms
self.length_ms = length_ms
# the length of the buffer in samples
self.length = None
self.data = None
self.markers = []
self.axes = []
self.units = []
self.names = []
self.fs = None
# indicate if the buffer write was wrapped around at least once
self.full = False
# the index where to insert new data (= the start of the oldest
# data)
self.idx = 0
def _move_markers(self, markers, steps):
"""Move marker `steps` samples to the left or right.
This method respects the sampling frequency of the data.
Parameters
----------
markers : list of (float, str)
steps : int
the number of samples to move the markers (a negative value
moves the indices to the left)
Returns
-------
markers : list of (float, str)
"""
shift_ms = 1000 / self.fs * steps
return [[x[0] + shift_ms, x[1]] for x in markers]
def append(self, dat):
[docs] """Append data to the Ringbuffer, overwriting old data if necessary.
Parameters
----------
dat : Data
a continuous data object
Raises
------
ValueError
if the [1:]-dimensions (all but the first one) of ``data``
does not match the ring buffer dimensions
"""
assert hasattr(dat, 'markers')
assert hasattr(dat, 'fs')
data = dat.data.copy()
markers = dat.markers[:]
# we have nothing to append
if len(data) == 0:
if markers:
logger.warning('Received Empty Data with markers. Discarding markers.')
logger.warning(markers)
return
# we append the first time, initialize .data with the correct
# shape
if self.data is None:
self.fs = dat.fs
self.length = self.length_ms / 1000 * self.fs
if not self.length.is_integer():
logger.error('Length is not an integer, please check length_ms and fs. Rounding errors will lead to loss of samples.')
self.length = int(self.length)
buffershape = list(data.shape)
buffershape[0] = self.length
self.data = np.empty(buffershape)
self.axes = dat.axes[:]
self.axes[0] = np.linspace(0, 1000 * self.length / self.fs, self.length, endpoint=False)
self.names = dat.names[:]
self.units = dat.units[:]
# incoming data is bigger than the buffer's capacity
if len(data) > self.length:
logger.warning('Discarding data that was longer than the ring buffer.')
surplus = len(data) - self.length
data = data[surplus:]
markers = self._move_markers(markers, -surplus)
# the markers, please be careful when changing it, this is quite
# tricky:
# size of the buffer (0..self.length-1)
size = self.length if self.full else self.idx
# append the new markers to the end of the existing ones,
# shifting the new indices by 'size'
markers = self._move_markers(markers, size)
self.markers.extend(markers)
# if we wrapped around, move all elements to the left by the
# size of the surplus elements
if size + len(data) > self.length:
move = self.length - (size + len(data))
self.markers = self._move_markers(self.markers, move)
self.markers = [x for x in self.markers if x[0] >= 0]
# /end of markers
# we can write without wrapping around the buffer's end
if self.idx + len(data) < self.length:
self.data[self.idx:self.idx+len(data)] = data
self.idx += len(data)
# we will wrap around the buffer's end
else:
self.full = True
l1 = self.length - self.idx
l2 = len(data) - l1
self.data[-l1:] = data[:l1]
self.data[:l2] = data[l1:]
self.idx = l2
def get(self):
[docs] """Get all buffered data.
The returned data will have *at most* the length of ``length``.
Returns
-------
data : Data
the full contents of the ring buffer
"""
# no data has ever been appended to this ring buffer
if self.data is None:
data = np.array([])
axes = []
# the ringbuffer wrapped around at least once
elif self.full:
data = np.concatenate([self.data[self.idx:], self.data[:self.idx]], axis=0)
axes = self.axes[:]
# the ring buffer hasn't been filled completely yet
else:
data = self.data[:self.idx].copy()
axes = self.axes[:]
axes[0] = axes[0][:self.idx]
d = Data(data=data, axes=axes, names=self.names[:], units=self.units[:])
d.markers = self.markers[:]
d.fs = self.fs
return d
class BlockBuffer(object):
[docs] """A buffer that returns data chunks in multiples of a block length.
This buffer is a first-in-first-out (FIFO) buffer that returns data
in multiples of a desired block length. The block length is defined
in samples.
Parameters
----------
samples : int, optional
the desired block length in samples
Examples
--------
>>> bbuffer = BlockBuffer(10)
>>> ...
>>> while 1:
... cnt = some_aquisition_method()
... # How to use the BlockBuffer
... bbuffer.append(cnt)
... cnt = bbuffer.get()
... if not cnt:
... continue
... # after here cnt is guaranteed to be in multiples of 10 samples
"""
def __init__(self, samples=50):
"""Initialize the Block Buffer.
Parameters
----------
samples : int, optional
the desired block length in samples
"""
self.samples = samples
self.dat = None
def append(self, dat):
[docs] """Append data to the Block Buffer.
This method accumulates the incoming data.
Parameters
----------
dat : Data
continuous Data object
"""
if self.dat is None:
self.dat = dat.copy()
elif not dat:
pass
else:
self.dat = append_cnt(self.dat, dat)
def get(self):
[docs] """Pop the contents of the Block Buffer.
The data returned has a length of multiples of ``samples``. If
there is a fraction of ``samples`` data more in the buffer, that
data is kept and future :meth:`append` operations will append
new data to it.
Returns
-------
dat : Data
continuous Data object
"""
if self.dat is None or self.dat.data.shape[0] < self.samples:
return Data(np.array([]), [], [], [])
if self.dat.data.shape[0] % self.samples == 0:
ret = self.dat.copy()
self.dat = None
return ret
else:
marker_orig = self.dat.markers[:]
# number of samples to return
n = (self.dat.data.shape[0] // self.samples) * self.samples
# first part
dat1 = self.dat.copy()
dat1.data = dat1.data[:n]
dat1.axes[0] = dat1.axes[0][:n]
# remaining (incomplete) part
dat2 = self.dat.copy()
dat2.data = dat2.data[n:]
dat2.axes[0] = dat2.axes[0][n:]
# split the markers
t0 = dat2.axes[0][0]
dat1.markers = [x for x in self.dat.markers if x[0] < t0]
dat2.markers = [x for x in self.dat.markers if x[0] >= t0]
# align the second part to t0
dat2.axes[0] -= t0
dat2.markers = [[x[0] - t0, x[1]] for x in dat2.markers]
self.dat = dat2
return dat1