# Copyright 2019 The TensorTrade Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
from deprecated import deprecated
import multiprocessing as mp
from multiprocessing.queues import Queue
[docs]@deprecated(version='1.0.4', reason="Builtin agents are being deprecated in favor of external implementations (ie: Ray)")
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
Parameters
----------
n : int
The count to start at.
References
----------
.. [1] http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n: int = 0) -> None:
self.count = mp.Value('i', n)
[docs] def increment(self, n: int = 1) -> None:
"""Increment the counter by n.
Parameters
----------
n : int
The amount to increment the counter by.
"""
with self.count.get_lock():
self.count.value += n
@property
def value(self) -> int:
"""The value of the counter. (int, read-only) """
return self.count.value
[docs]@deprecated(version='1.0.4', reason="Builtin agents are being deprecated in favor of external implementations (ie: Ray)")
class ParallelQueue(Queue):
"""A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
"""
def __init__(self):
super().__init__(ctx=mp.get_context())
self.size = SharedCounter(0)
[docs] def put(self, *args, **kwargs) -> None:
self.size.increment(1)
super().put(*args, **kwargs)
[docs] def get(self, *args, **kwargs) -> object:
self.size.increment(-1)
return super().get(*args, **kwargs)
[docs] def qsize(self) -> int:
"""Reliable implementation of multiprocessing.Queue.qsize()."""
return self.size.value
[docs] def empty(self) -> bool:
"""Reliable implementation of multiprocessing.Queue.empty()."""
return not self.qsize()