tensortrade.agents.parallel.parallel_queue module

class tensortrade.agents.parallel.parallel_queue.ParallelQueue[source]

Bases: multiprocessing.queues.Queue

A portable implementation of multiprocessing.Queue.

Because of multithreading / multiprocessing semantics, Queue.qsize() may raise the NotImplementedError exception on Unix platforms like Mac OS X where sem_getvalue() is not implemented. This subclass addresses this problem by using a synchronized shared counter (initialized to zero) and increasing / decreasing its value every time the put() and get() methods are called, respectively. This not only prevents NotImplementedError from being raised, but also allows us to implement a reliable version of both qsize() and empty().

empty() → bool[source]

Reliable implementation of multiprocessing.Queue.empty().

get(*args, **kwargs) → object[source]
put(*args, **kwargs) → None[source]
qsize() → int[source]

Reliable implementation of multiprocessing.Queue.qsize().

class tensortrade.agents.parallel.parallel_queue.SharedCounter(n: int = 0)[source]

Bases: object

A synchronized shared counter.

The locking done by multiprocessing.Value ensures that only a single process or thread may read or write the in-memory ctypes object. However, in order to do n += 1, Python performs a read followed by a write, so a second process may read the old value before the new one is written by the first process. The solution is to use a multiprocessing.Lock to guarantee the atomicity of the modifications to Value.

Parameters:n (int) – The count to start at.

References

[1]http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
increment(n: int = 1) → None[source]

Increment the counter by n.

Parameters:n (int) – The amount to increment the counter by.
value

The value of the counter. (int, read-only)