Source code for tensortrade.env.default.observers


from typing import List


import datetime as dt
import numpy as np
import pandas as pd

from gym.spaces import Box, Space
from random import randrange


from tensortrade.feed.core import Stream, NameSpace, DataFeed
from tensortrade.oms.wallets import Wallet
from tensortrade.env.generic import Observer
from collections import OrderedDict


def _create_wallet_source(wallet: 'Wallet', include_worth: bool = True) -> 'List[Stream[float]]':
    """Creates a list of streams to describe a `Wallet`.

    Parameters
    ----------
    wallet : `Wallet`
        The wallet to make streams for.
    include_worth : bool, default True
        Whether or

    Returns
    -------
    `List[Stream[float]]`
        A list of streams to describe the `wallet`.
    """
    exchange_name = wallet.exchange.name
    symbol = wallet.instrument.symbol

    streams = []

    with NameSpace(exchange_name + ":/" + symbol):
        free_balance = Stream.sensor(wallet, lambda w: w.balance.as_float(), dtype="float").rename("free")
        locked_balance = Stream.sensor(wallet, lambda w: w.locked_balance.as_float(), dtype="float").rename("locked")
        total_balance = Stream.sensor(wallet, lambda w: w.total_balance.as_float(), dtype="float").rename("total")

        streams += [free_balance, locked_balance, total_balance]

        if include_worth:
            price = Stream.select(wallet.exchange.streams(), lambda node: node.name.endswith(symbol))
            worth = price.mul(total_balance).rename('worth')
            streams += [worth]

    return streams


def _create_internal_streams(portfolio: 'Portfolio') -> 'List[Stream[float]]':
    """Creates a list of streams to describe a `Portfolio`.

    Parameters
    ----------
    portfolio : `Portfolio`
        The portfolio to make the streams for.

    Returns
    -------
    `List[Stream[float]]`
        A list of streams to describe the `portfolio`.
    """
    base_symbol = portfolio.base_instrument.symbol
    sources = []

    for wallet in portfolio.wallets:
        symbol = wallet.instrument.symbol
        sources += wallet.exchange.streams()
        sources += _create_wallet_source(wallet, include_worth=(symbol != base_symbol))

    worth_streams = []
    for s in sources:
        if s.name.endswith(base_symbol + ":/total") or s.name.endswith("worth"):
            worth_streams += [s]

    net_worth = Stream.reduce(worth_streams).sum().rename("net_worth")
    sources += [net_worth]

    return sources


[docs]class ObservationHistory(object): """Stores observations from a given episode of the environment. Parameters ---------- window_size : int The amount of observations to keep stored before discarding them. Attributes ---------- window_size : int The amount of observations to keep stored before discarding them. rows : pd.DataFrame The rows of observations that are used as the environment observation at each step of an episode. """ def __init__(self, window_size: int) -> None: self.window_size = window_size self.rows = OrderedDict() self.index = 0
[docs] def push(self, row: dict) -> None: """Stores an observation. Parameters ---------- row : dict The new observation to store. """ self.rows[self.index] = row self.index += 1 if len(self.rows.keys()) > self.window_size: del self.rows[list(self.rows.keys())[0]]
[docs] def observe(self) -> 'np.array': """Gets the observation at a given step in an episode Returns ------- `np.array` The current observation of the environment. """ rows = self.rows.copy() if len(rows) < self.window_size: size = self.window_size - len(rows) padding = np.zeros((size, len(rows[list(rows.keys())[0]]))) r = np.array([list(inner_dict.values()) for inner_dict in rows.values()]) rows = np.concatenate((padding, r)) if isinstance(rows, OrderedDict): rows = np.array([list(inner_dict.values()) for inner_dict in rows.values()]) rows = np.nan_to_num(rows) return rows
[docs] def reset(self) -> None: """Resets the observation history""" self.rows = OrderedDict() self.index = 0
[docs]class TensorTradeObserver(Observer): """The TensorTrade observer that is compatible with the other `default` components. Parameters ---------- portfolio : `Portfolio` The portfolio to be used to create the internal data feed mechanism. feed : `DataFeed` The feed to be used to collect observations to the observation window. renderer_feed : `DataFeed` The feed to be used for giving information to the renderer. window_size : int The size of the observation window. min_periods : int The amount of steps needed to warmup the `feed`. **kwargs : keyword arguments Additional keyword arguments for observer creation. Attributes ---------- feed : `DataFeed` The master feed in charge of streaming the internal, external, and renderer data feeds. window_size : int The size of the observation window. min_periods : int The amount of steps needed to warmup the `feed`. history : `ObservationHistory` The observation history. renderer_history : `List[dict]` The history of the renderer data feed. """ def __init__(self, portfolio: 'Portfolio', feed: 'DataFeed' = None, renderer_feed: 'DataFeed' = None, window_size: int = 1, min_periods: int = None, **kwargs) -> None: internal_group = Stream.group(_create_internal_streams(portfolio)).rename("internal") external_group = Stream.group(feed.inputs).rename("external") if renderer_feed: renderer_group = Stream.group(renderer_feed.inputs).rename("renderer") self.feed = DataFeed([ internal_group, external_group, renderer_group ]) else: self.feed = DataFeed([ internal_group, external_group ]) self.window_size = window_size self.min_periods = min_periods self._observation_dtype = kwargs.get('dtype', np.float32) self._observation_lows = kwargs.get('observation_lows', -np.inf) self._observation_highs = kwargs.get('observation_highs', np.inf) self.history = ObservationHistory(window_size=window_size) initial_obs = self.feed.next()["external"] n_features = len(initial_obs.keys()) self._observation_space = Box( low=self._observation_lows, high=self._observation_highs, shape=(self.window_size, n_features), dtype=self._observation_dtype ) self.feed = self.feed.attach(portfolio) self.renderer_history = [] self.feed.reset() self.warmup() @property def observation_space(self) -> Space: return self._observation_space
[docs] def warmup(self) -> None: """Warms up the data feed. """ if self.min_periods is not None: for _ in range(self.min_periods): if self.has_next(): obs_row = self.feed.next()["external"] self.history.push(obs_row)
[docs] def observe(self, env: 'TradingEnv') -> np.array: """Observes the environment. As a consequence of observing the `env`, a new observation is generated from the `feed` and stored in the observation history. Returns ------- `np.array` The current observation of the environment. """ data = self.feed.next() # Save renderer information to history if "renderer" in data.keys(): self.renderer_history += [data["renderer"]] # Push new observation to observation history obs_row = data["external"] self.history.push(obs_row) obs = self.history.observe() obs = obs.astype(self._observation_dtype) return obs
[docs] def has_next(self) -> bool: """Checks if there is another observation to be generated. Returns ------- bool Whether there is another observation to be generated. """ return self.feed.has_next()
[docs] def reset(self, random_start=0) -> None: """Resets the observer""" self.renderer_history = [] self.history.reset() self.feed.reset(random_start) self.warmup()
[docs]class IntradayObserver(Observer): """The IntradayObserver observer that is compatible with the other `default` components. Parameters ---------- portfolio : `Portfolio` The portfolio to be used to create the internal data feed mechanism. feed : `DataFeed` The feed to be used to collect observations to the observation window. renderer_feed : `DataFeed` The feed to be used for giving information to the renderer. stop_time : datetime.time The time at which the episode will stop. window_size : int The size of the observation window. min_periods : int The amount of steps needed to warmup the `feed`. randomize : bool Whether or not to select a random episode when reset. **kwargs : keyword arguments Additional keyword arguments for observer creation. Attributes ---------- feed : `DataFeed` The master feed in charge of streaming the internal, external, and renderer data feeds. stop_time : datetime.time The time at which the episode will stop. window_size : int The size of the observation window. min_periods : int The amount of steps needed to warmup the `feed`. randomize : bool Whether or not a random episode is selected when reset. history : `ObservationHistory` The observation history. renderer_history : `List[dict]` The history of the renderer data feed. """ def __init__(self, portfolio: 'Portfolio', feed: 'DataFeed' = None, renderer_feed: 'DataFeed' = None, stop_time: 'datetime.time' = dt.time(16, 0, 0), window_size: int = 1, min_periods: int = None, randomize: bool = False, **kwargs) -> None: internal_group = Stream.group(_create_internal_streams(portfolio)).rename("internal") external_group = Stream.group(feed.inputs).rename("external") if renderer_feed: renderer_group = Stream.group(renderer_feed.inputs).rename("renderer") self.feed = DataFeed([ internal_group, external_group, renderer_group ]) else: self.feed = DataFeed([ internal_group, external_group ]) self.stop_time = stop_time self.window_size = window_size self.min_periods = min_periods self.randomize = randomize self._observation_dtype = kwargs.get('dtype', np.float32) self._observation_lows = kwargs.get('observation_lows', -np.inf) self._observation_highs = kwargs.get('observation_highs', np.inf) self.history = ObservationHistory(window_size=window_size) initial_obs = self.feed.next()["external"] initial_obs.pop('timestamp', None) n_features = len(initial_obs.keys()) self._observation_space = Box( low=self._observation_lows, high=self._observation_highs, shape=(self.window_size, n_features), dtype=self._observation_dtype ) self.feed = self.feed.attach(portfolio) self.renderer_history = [] if self.randomize: self.num_episodes = 0 while self.feed.has_next(): ts = self.feed.next()["external"]["timestamp"] if ts.time() == self.stop_time: self.num_episodes += 1 self.feed.reset() self.warmup() self.stop = False @property def observation_space(self) -> Space: return self._observation_space
[docs] def warmup(self) -> None: """Warms up the data feed. """ if self.min_periods is not None: for _ in range(self.min_periods): if self.has_next(): obs_row = self.feed.next()["external"] obs_row.pop('timestamp', None) self.history.push(obs_row)
[docs] def observe(self, env: 'TradingEnv') -> np.array: """Observes the environment. As a consequence of observing the `env`, a new observation is generated from the `feed` and stored in the observation history. Returns ------- `np.array` The current observation of the environment. """ data = self.feed.next() # Save renderer information to history if "renderer" in data.keys(): self.renderer_history += [data["renderer"]] # Push new observation to observation history obs_row = data["external"] try: obs_ts = obs_row.pop('timestamp') except KeyError: raise KeyError("Include Stream of Timestamps named 'timestamp' in feed") self.history.push(obs_row) # Check if episode should be stopped if obs_ts.time() == self.stop_time: self.stop = True obs = self.history.observe() obs = obs.astype(self._observation_dtype) return obs
[docs] def has_next(self) -> bool: """Checks if there is another observation to be generated. Returns ------- bool Whether there is another observation to be generated. """ return self.feed.has_next() and not self.stop
[docs] def reset(self) -> None: """Resets the observer""" self.renderer_history = [] self.history.reset() if self.randomize or not self.feed.has_next(): self.feed.reset() if self.randomize: episode_num = 0 while episode_num < randrange(self.num_episodes): ts = self.feed.next()["external"]["timestamp"] if ts.time() == self.stop_time: episode_num += 1 self.warmup() self.stop = False