# Copyright 2019 The TensorTrade Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
"""
References:
- http://inoryy.com/post/tensorflow2-deep-reinforcement-learning/#agent-interface
"""
from deprecated import deprecated
import random
import numpy as np
import tensorflow as tf
from collections import namedtuple
from tensortrade.agents import Agent, ReplayMemory
from datetime import datetime
A2CTransition = namedtuple('A2CTransition', ['state', 'action', 'reward', 'done', 'value'])
[docs]@deprecated(version='1.0.4', reason="Builtin agents are being deprecated in favor of external implementations (ie: Ray)")
class A2CAgent(Agent):
def __init__(self,
env: 'TradingEnvironment',
shared_network: tf.keras.Model = None,
actor_network: tf.keras.Model = None,
critic_network: tf.keras.Model = None):
self.env = env
self.n_actions = env.action_space.n
self.observation_shape = env.observation_space.shape
self.shared_network = shared_network or self._build_shared_network()
self.actor_network = actor_network or self._build_actor_network()
self.critic_network = critic_network or self._build_critic_network()
self.env.agent_id = self.id
def _build_shared_network(self):
network = tf.keras.Sequential([
tf.keras.layers.InputLayer(input_shape=self.observation_shape),
tf.keras.layers.Conv1D(filters=64, kernel_size=6, padding="same", activation="tanh"),
tf.keras.layers.MaxPooling1D(pool_size=2),
tf.keras.layers.Conv1D(filters=32, kernel_size=3, padding="same", activation="tanh"),
tf.keras.layers.MaxPooling1D(pool_size=2),
tf.keras.layers.Flatten()
])
return network
def _build_actor_network(self):
actor_head = tf.keras.Sequential([
tf.keras.layers.Dense(50, activation='relu'),
tf.keras.layers.Dense(self.n_actions, activation='relu')
])
return tf.keras.Sequential([self.shared_network, actor_head])
def _build_critic_network(self):
critic_head = tf.keras.Sequential([
tf.keras.layers.Dense(50, activation='relu'),
tf.keras.layers.Dense(25, activation='relu'),
tf.keras.layers.Dense(1, activation='relu')
])
return tf.keras.Sequential([self.shared_network, critic_head])
[docs] def restore(self, path: str, **kwargs):
actor_filename: str = kwargs.get('actor_filename', None)
critic_filename: str = kwargs.get('critic_filename', None)
if not actor_filename or not critic_filename:
raise ValueError(
'The `restore` method requires a directory `path`, a `critic_filename`, and an `actor_filename`.')
self.actor_network = tf.keras.models.load_model(path + actor_filename)
self.critic_network = tf.keras.models.load_model(path + critic_filename)
[docs] def save(self, path: str, **kwargs):
episode: int = kwargs.get('episode', None)
if episode:
suffix = self.id[:7] + "__" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".hdf5"
actor_filename = "actor_network__" + suffix
critic_filename = "critic_network__" + suffix
else:
actor_filename = "actor_network__" + self.id[:7] + "__" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".hdf5"
critic_filename = "critic_network__" + self.id[:7] + "__" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".hdf5"
self.actor_network.save(path + actor_filename)
self.critic_network.save(path + critic_filename)
[docs] def get_action(self, state: np.ndarray, **kwargs) -> int:
threshold: float = kwargs.get('threshold', 0)
rand = random.random()
if rand < threshold:
return np.random.choice(self.n_actions)
else:
logits = self.actor_network(state[None, :], training=False)
return tf.squeeze(tf.squeeze(tf.random.categorical(logits, 1), axis=-1), axis=-1)
def _apply_gradient_descent(self,
memory: ReplayMemory,
batch_size: int,
learning_rate: float,
discount_factor: float,
entropy_c: float,):
huber_loss = tf.keras.losses.Huber()
wsce_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
optimizer = tf.keras.optimizers.Adam(lr=learning_rate)
transitions = memory.tail(batch_size)
batch = A2CTransition(*zip(*transitions))
states = tf.convert_to_tensor(batch.state)
actions = tf.convert_to_tensor(batch.action)
rewards = tf.convert_to_tensor(batch.reward, dtype=tf.float32)
dones = tf.convert_to_tensor(batch.done)
values = tf.convert_to_tensor(batch.value)
returns = []
exp_weighted_return = 0
for reward, done in zip(rewards[::-1], dones[::-1]):
exp_weighted_return = reward + discount_factor * exp_weighted_return * (1 - int(done))
returns += [exp_weighted_return]
returns = returns[::-1]
with tf.GradientTape() as tape:
state_values = self.critic_network(states)
critic_loss_value = huber_loss(returns, state_values)
gradients = tape.gradient(critic_loss_value, self.critic_network.trainable_variables)
optimizer.apply_gradients(zip(gradients, self.critic_network.trainable_variables))
with tf.GradientTape() as tape:
returns = tf.reshape(returns, [batch_size, 1])
advantages = returns - values
actions = tf.cast(actions, tf.int32)
logits = self.actor_network(states)
policy_loss_value = wsce_loss(actions, logits, sample_weight=advantages)
probs = tf.nn.softmax(logits)
entropy_loss_value = tf.keras.losses.categorical_crossentropy(probs, probs)
policy_total_loss_value = policy_loss_value - entropy_c * entropy_loss_value
gradients = tape.gradient(policy_total_loss_value,
self.actor_network.trainable_variables)
optimizer.apply_gradients(zip(gradients, self.actor_network.trainable_variables))
[docs] def train(self,
n_steps: int = None,
n_episodes: int = None,
save_every: int = None,
save_path: str = None,
callback: callable = None,
**kwargs) -> float:
batch_size: int = kwargs.get('batch_size', 128)
discount_factor: float = kwargs.get('discount_factor', 0.9999)
learning_rate: float = kwargs.get('learning_rate', 0.0001)
eps_start: float = kwargs.get('eps_start', 0.9)
eps_end: float = kwargs.get('eps_end', 0.05)
eps_decay_steps: int = kwargs.get('eps_decay_steps', 200)
entropy_c: int = kwargs.get('entropy_c', 0.0001)
memory_capacity: int = kwargs.get('memory_capacity', 1000)
memory = ReplayMemory(memory_capacity, transition_type=A2CTransition)
episode = 0
steps_done = 0
total_reward = 0
stop_training = False
if n_steps and not n_episodes:
n_episodes = np.iinfo(np.int32).max
print('==== AGENT ID: {} ===='.format(self.id))
while episode < n_episodes and not stop_training:
state = self.env.reset()
done = False
print('==== EPISODE ID ({}/{}): {} ===='.format(episode + 1,
n_episodes,
self.env.episode_id))
while not done:
threshold = eps_end + (eps_start - eps_end) * np.exp(-steps_done / eps_decay_steps)
action = self.get_action(state, threshold=threshold)
next_state, reward, done, _ = self.env.step(action)
value = self.critic_network(state[None, :], training=False)
value = tf.squeeze(value, axis=-1)
memory.push(state, action, reward, done, value)
state = next_state
total_reward += reward
steps_done += 1
if len(memory) < batch_size:
continue
self._apply_gradient_descent(memory,
batch_size,
learning_rate,
discount_factor,
entropy_c)
if n_steps and steps_done >= n_steps:
done = True
stop_training = True
is_checkpoint = save_every and episode % save_every == 0
if save_path and (is_checkpoint or episode == n_episodes):
self.save(save_path, episode=episode)
episode += 1
mean_reward = total_reward / steps_done
return mean_reward