This module provides the AmpDecorator class.
As a user, it is very unlikely that you’ll have to deal with it directly. Its main purpose is to add additional functionality to the low level amplifier drivers. This functionality includes features like: saving data to a file. or being able to receive marker via network (TCP/IP and UDP).
By using the libmushu.__init__.get_amp() method, you’ll automatically receive decorated amplifiers.
Bases: libmushu.amplifier.Amplifier
This class ‘decorates’ the Low-Level Amplifier classes with Network-Marker and Save-To-File functionality.
You use it by decorating (not as in Python-Decorator, but in the GoF sense) the low level amplifier class you want to use:
import libmushu
from libmushu.ampdecorator import AmpDecorator
from libmushu.driver.randomamp import RandomAmp
amp = Ampdecorator(RandomAmp)
Waring: The network marker timings on Windows have a resolution of 10ms-15ms. On Linux the resolution is 1us. This is due to limitations of Python’s time.time method, or rather a Windows specific issue.
There exists currently no precise timer, providing times which are comparable between two running processes on Windows. The performance counter provided on Windows, has a much better resolution but is relative to the processes start time and it drifts (1s per 100s), so it is only precise for a relatively short amount of time.
If a higher precision is needed one has to replace the time.time calls with something which provides a better precision. For example one could create a third process which provides times or regularly synchronize both processes with the clock synchronization algorithm as described here:
Alternatively one could use timeGetTime from Windows’ Multi Media library, which is tunable via timeBeginPeriod and provides a precision of 1-2ms. Apparently this is the way Chrome and many others do it.:
from __future__ import division
from ctypes import windll
import time
timeBeginPeriod = windll.winmm.timeBeginPeriod
timeEndPeriod = windll.winmm.timeEndPeriod
timeGetTime = windll.winmm.timeGetTime
if __name__ == '__main__':
# wrap the code that needs high precision in timeBegin- and
# timeEndPeriod with the same parameter. The parameter is
# the interval in ms you want as precision. Usually the
# minimum value allowed is 1 (best).
timeBeginPeriod(1)
times = []
t_start = time.time()
while time.time() < (time.time() + 1):
times.append(timeGetTime())
times = sorted(list(set(times)))
print(1000 / len(times))
timeEndPeriod(1)
Methods
configure(**kwargs) | |
get_channels() | |
get_data() | Get data from the amplifier. |
get_sampling_frequency() | |
is_available() | Is the amplifier connected to the computer? |
start([filename]) | |
stop() |
Get data from the amplifier.
This method is supposed to get called as fast as possible (i.e hundreds of times per seconds) and returns the data and the markers.
Returns: | data : 2darray
markers : list of (float, str)
|
---|
Bases: asynchat.async_chat
Handler for incoming data streams.
This handler processes incoming data from a TCP or UDP sockets. Each packet ends with a terminator character sequence. The handler takes care of incomplete packets and puts complete packets in the queue.
Attributes
addr |
Methods
accept() | |
add_channel([map]) | |
bind(addr) | |
close() | |
close_when_done() | automatically close this channel once the outgoing queue is empty |
collect_incoming_data(data) | Got potentially partial data packet. |
connect(address) | |
create_socket(family, type) | |
del_channel([map]) | |
discard_buffers() | |
found_terminator() | Found a complete packet. |
get_terminator() | |
handle_accept() | |
handle_close() | |
handle_connect() | |
handle_connect_event() | |
handle_error() | An error occurred. |
handle_expt() | |
handle_expt_event() | |
handle_read() | |
handle_read_event() | |
handle_write() | |
handle_write_event() | |
initiate_send() | |
listen(num) | |
log(message) | |
log_info(message[, type]) | |
push(data) | |
push_with_producer(producer) | |
readable() | predicate for inclusion in the readable for select() |
recv(buffer_size) | |
send(data) | |
set_reuse_addr() | |
set_socket(sock[, map]) | |
set_terminator(term) | Set the input delimiter. |
writable() | Signal weather the socket is ready to send data. |
Got potentially partial data packet.
This method collects potentially incomplete data packets and records the timestamp when the first part of the incomplete data packet arrived.
Parameters: | data : str
|
---|
Bases: asyncore.dispatcher
The marker server.
It opens a TCP or UDP socket and assigns a MarkerHandler to the opened socket.
Attributes
addr |
Methods
accept() | |
add_channel([map]) | |
bind(addr) | |
close() | |
connect(address) | |
create_socket(family, type) | |
del_channel([map]) | |
handle_accept() | Accept an incomming TCP connection. |
handle_close() | |
handle_connect() | |
handle_connect_event() | |
handle_error() | |
handle_expt() | |
handle_expt_event() | |
handle_read() | |
handle_read_event() | |
handle_write() | |
handle_write_event() | |
listen(num) | |
log(message) | |
log_info(message[, type]) | |
readable() | |
recv(buffer_size) | |
send(data) | |
set_reuse_addr() | |
set_socket(sock[, map]) | |
writable() |
Start the TCP and UDP MarkerServers and start the receiving loop.
This method runs in a separate process and receives UDP and TCP markers. Whenever a marker is received, it is put together with a timestamp into a queue.
After the TCP and UDP servers are set up the ready event is set and the method enters the loop that runs forever until the running Event is cleared. Received markers are put in the queue.
Parameters: | queue : Queue
running : Event
ready : Event
|
---|
This module provides the Amplifier class, which is the base class of all low level amplifier drivers. If you want to write a driver for a specific amplifier your driver must derive from the Amplifier class and implement its methods.
Users will not use your driver directly but a decorated version of it which provides additional features like writing data to a file and receiving marker via TCP.
Bases: object
Amplifier base class.
The base class is very generic on purpose. Amplifiers from different vendors vary in so many ways that it is difficult to find a common set of methods that all support.
In the spirit of “encapsulating what varies”, I decided to encapsulate the configuration. So the main configuration of the amplifier, like setting the mode (e.g. data, impedance, etc.), sampling frequency, etc. happens in configure() and is very specific for the amplifier at hand.
start(), stop(), and get_data() is very generic and must be supported by all derived amplifier classes.
How an amplifier should be used:
amp = Amp()
# measure impedance
amp.configure(**config)
amp.start()
while 1:
data = amp.get_data()
if enough:
break
amp.stop()
# measure data
amp.configure(**config)
channels = amp.get_channels()
amp.start()
while 1:
data = amp.get_data()
if enough:
break
amp.stop()
Methods
configure(**kwargs) | Configure the amplifier. |
get_channels() | Return the list of channel names. |
get_data() | Get data from the amplifier. |
get_sampling_frequency() | Return the sampling frequency. |
is_available() | Is the amplifier connected to the computer? |
start() | Make the amplifier ready for delivering data. |
stop() | Stop the amplifier. |
Configure the amplifier.
Use this method to set the mode (i.e. impedance, data, ...), sampling frequency, filter, etc.
This depends strongly on the amplifier.
Parameters: | kwargs : dict
|
---|
Return the list of channel names.
The list has the same order as the data, i.e. the second name in the list represents the second colum of the data returned by get_data().
Returns: | channels : list of strings
|
---|
Get data from the amplifier.
This method is called as fast as possible (e.g. hundreds of times per second) and returns the data and the marker (if supported).
Returns: | data : ndarray
markers : list of (float, str)
|
---|
Examples
Create a very slow amplifier with 1Hz and 3 channels and send some markers:
>>> amp = libmushu.get_amp('randomamp')
>>> amp.configure(fs=1, channels=3)
>>> amp.start()
>>> while True:
... time.sleep(.5)
... data, marker = amp.get_data()
... print '---'
... print data
... print marker
...
---
[[590 938 72]]
[[98.75297546386719, 'foo'], [553.4558296203613, 'bar']]
---
[[167 168 40]]
[]
---
[[727 705 934]]
[[16.066789627075195, 'baz']]
Return the sampling frequency.
This method returns the sampling frequency which is currently enabled in the amplifier.
Returns: | fs : float
|
---|
Is the amplifier connected to the computer?
This method should be overwritten by derived classes and return True if the amplifier is connected to the computer or False if not.
Returns: | available : boolean |
---|
This module provides two functions: get_available_amps() which will tell you which amplifiers are currently available on your computer, and get_amp() which you can use to get an amplifier instance to work with.
How to use libmushu with the decorated drivers (recommended):
import libmushu
# you know what amp you want to use:
amp = libmushu.get_amp('epoc')
...
# Or: you select one of the available amps:
amps = libmushu.get_available_amps()
amp = libmushu.get_amp(amps[0])
...
How to use the libmushu’s low level driver drivers:
from libmushu.driver.randomamp import RandomAmp
amp = RandomAmp()
You’ll will most likely want to use the decorated drivers and only deal with the low level drivers if you’re a developer or find that the libmushu.ampdecorator.AmpDecorator does not provide the features you need.
Retrieves all available (e.g. connected) amplifiers.
This method tests all supported amplifiers if they are connected to the system. More precisely: if the amplifiers is_available method returns True.
Returns: | available_amps : list of strings
|
---|
Examples
>>> import libmushu as lm
>>> lm.get_available_amps()
['gusbamp', 'randomamp']
Get an amplifier instance.
This factory method takes a low level amplifier driver, wraps it in an AmpDecorator and returns an instance.
Parameters: | ampname : str
|
---|---|
Returns: | amp : Amplifier
|
Examples
>>> import libmushu as lm
>>> amps = lm.get_available_amps()
>>> amp = lm.get_amp(amps[0])