Source code for libmushu.driver.replayamp
from __future__ import division
import time
import numpy as np
from libmushu.amplifier import Amplifier
class ReplayAmp(Amplifier):
[docs] def configure(self, data, marker, channels, fs, realtime=True, blocksize_ms=None, blocksize_samples=None):
"""
[docs] self.pos = 0
def stop(self):
pass
[docs]
def get_data(self):
"""
[docs]
Returns
-------
chunk, markers: Markers is time in ms since relative to the
first sample of that block.
"""
if self.realtime:
elapsed = time.time() - self.last_sample_time
blocks = (self.fs * elapsed) // self.samples
samples = blocks * self.samples
else:
samples = self.samples
elapsed = samples / self.fs
self.last_sample_time += elapsed
# data
chunk = self.data[self.pos:self.pos+samples]
#self.data = self.data[samples:]
# markers
# slow python version
#markers = [x for x in self.marker if x[0] < elapsed * 1000]
#self.marker = [x for x in self.marker if x[0] >= elapsed * 1000]
#self.marker = [[x[0] - elapsed * 1000, x[1]] for x in self.marker]
# fast numpy version
mask = self.marker_ts < (elapsed * 1000)
markers = zip(self.marker_ts[mask], self.marker_s[mask])
self.marker_ts = self.marker_ts[~mask]
self.marker_s = self.marker_s[~mask]
self.marker_ts -= elapsed * 1000
self.pos += samples
return chunk, markers
def get_channels(self):
return self.channels
[docs]
def get_sampling_frequency(self):
return self.fs
[docs]
@staticmethod
def is_available():
return True