import numpy as np
from rlberry.agents import AgentWithSimplePolicy
from gymnasium.spaces import Discrete
import rlberry
logger = rlberry.logger
def run_lsvi(
dim,
horizon,
bonus_factor,
lambda_mat_inv,
reward_hist,
gamma,
feat_hist,
n_actions,
feat_ns_all_actions,
v_max,
total_time_steps,
):
"""
Least-Squares Value Iteration.
Parameters
----------
dim : int
Dimension of the features
horiton : int
bonus_factor : int
lambda_mat_inv : numpy array (dim, dim)
Inverse of the design matrix
reward_hist : numpy array (time,)
gamma : double
feat_hist : numpy array (time, dim)
n_actions : int
feat_ns_all_actions : numpy array (time, n_actions, dim)
History of next state features for all actions
vmax : double
Maximum value of the value function
total_time_steps : int
Current step count
"""
# run value iteration
q_w = np.zeros((horizon + 1, dim))
for hh in range(horizon - 1, -1, -1):
T = total_time_steps
b = np.zeros(dim)
for tt in range(T):
# compute q function at next state, q_ns
q_ns = np.zeros(n_actions)
for aa in range(n_actions):
#
feat_ns_aa = feat_ns_all_actions[tt, aa, :]
inverse_counts = feat_ns_aa.dot(lambda_mat_inv.T.dot(feat_ns_aa))
bonus = bonus_factor * np.sqrt(
inverse_counts
) + v_max * inverse_counts * (bonus_factor > 0.0)
#
q_ns[aa] = feat_ns_aa.dot(q_w[hh + 1, :]) + bonus
q_ns[aa] = min(q_ns[aa], v_max)
# compute regretion targets
target = reward_hist[tt] + gamma * q_ns.max()
feat = feat_hist[tt, :]
b = b + target * feat
# solve M x = b, where x = q_w, and M = self.lambda_mat
q_w[hh, :] = lambda_mat_inv.T @ b
return q_w
[docs]class LSVIUCBAgent(AgentWithSimplePolicy):
"""
A version of Least-Squares Value Iteration with UCB (LSVI-UCB),
proposed by Jin et al. (2020).
If bonus_scale_factor is 0.0, performs random exploration.
Notes
-----
The computation of exploration bonuses was adapted to match the "simplified Bernstein"
bonuses that works well empirically for UCBVI in the tabular case.
The transition probabilities are assumed to be *independent* of the timestep h.
Parameters
----------
env : Model
Online model of an environment.
horizon : int
Maximum length of each episode.
feature_map_fn : function(env, kwargs)
Function that returns a feature map instance
(rlberry.agents.features.FeatureMap class).
feature_map_kwargs:
kwargs for feature_map_fn
gamma : double
Discount factor.
bonus_scale_factor : double
Constant by which to multiply the exploration bonus.
reg_factor : double
Linear regression regularization factor.
**kwargs : Keyword Arguments
Arguments to be passed to `AgentWithSimplePolicy.__init__(self, env, **kwargs)` (:class:`~rlberry.agents.AgentWithSimplePolicy`).
References
----------
Jin, C., Yang, Z., Wang, Z., & Jordan, M. I. (2020, July).
Provably efficient reinforcement learning with linear
function approximation. In Conference on Learning Theory (pp. 2137-2143).
"""
name = "LSVI-UCB"
def __init__(
self,
env,
horizon,
feature_map_fn,
feature_map_kwargs=None,
gamma=0.99,
bonus_scale_factor=1.0,
reg_factor=0.1,
**kwargs
):
AgentWithSimplePolicy.__init__(self, env, **kwargs)
self.n_episodes = None
self.horizon = horizon
self.gamma = gamma
self.bonus_scale_factor = bonus_scale_factor
self.reg_factor = reg_factor
feature_map_kwargs = feature_map_kwargs or {}
self.feature_map = feature_map_fn(self.env, **feature_map_kwargs)
#
if self.bonus_scale_factor == 0.0:
self.name = "LSVI-Random-Expl"
# maximum value
r_range = self.env.reward_range[1] - self.env.reward_range[0]
if r_range == np.inf:
logger.warning(
"{}: Reward range is infinity. ".format(self.name) + "Clipping it to 1."
)
r_range = 1.0
if self.gamma == 1.0:
self.v_max = r_range * horizon
else:
self.v_max = (
r_range
* (1.0 - np.power(self.gamma, self.horizon))
/ (1.0 - self.gamma)
)
#
assert isinstance(
self.env.action_space, Discrete
), "LSVI-UCB requires discrete actions."
#
assert len(self.feature_map.shape) == 1
self.dim = self.feature_map.shape[0]
# attributes initialized in reset()
self.episode = None
self.lambda_mat = None # lambda matrix
self.lambda_mat_inv = None # inverse of lambda matrix
self.w_vec = None # vector representation of Q
self.w_policy = None # representation of Q for final policy
self.reward_hist = None # reward history
self.state_hist = None # state history
self.action_hist = None # action history
self.nstate_hist = None # next state history
self.feat_hist = None # feature history
self.feat_ns_all_actions = None # next state features for all actions
#
# aux variables (init in reset() too)
self._rewards = None
def reset(self):
self.episode = 0
self.total_time_steps = 0
self.lambda_mat = self.reg_factor * np.eye(self.dim)
self.lambda_mat_inv = (1.0 / self.reg_factor) * np.eye(self.dim)
self.w_vec = np.zeros((self.horizon + 1, self.dim))
self.reward_hist = np.zeros(self.n_episodes * self.horizon)
self.state_hist = []
self.action_hist = []
self.nstate_hist = []
# episode rewards
self._rewards = np.zeros(self.n_episodes)
#
self.feat_hist = np.zeros((self.n_episodes * self.horizon, self.dim))
self.feat_ns_all_actions = np.zeros(
(self.n_episodes * self.horizon, self.env.action_space.n, self.dim)
)
#
self.w_policy = None
[docs] def fit(self, budget, **kwargs):
"""
Train the agent using the provided environment.
Parameters
----------
budget: int
number of episodes. Each episode runs for self.horizon unless it
enconters a terminal state in which case it stops early.
Warning: Calling fit() more than once will reset the algorithm
(to realocate memory according to the number of episodes)
**kwargs : Keyword Arguments
Extra arguments. Not used for this agent.
"""
del kwargs
# Allocate memory according to budget.
# TODO: avoid the need to reset() the algorithm if fit() is called again.
if self.n_episodes is not None:
logger.warning(
"[LSVI-UCB]: Calling fit() more than once will reset the algorithm"
+ " (to realocate memory according to the number of episodes)."
)
self.n_episodes = budget
self.reset()
for ep in range(self.n_episodes):
self.run_episode()
if self.bonus_scale_factor > 0.0 or ep == self.n_episodes - 1:
# update Q function representation
self.w_vec = self._run_lsvi(self.bonus_scale_factor)
self.w_policy = self._run_lsvi(bonus_factor=0.0)[0, :]
[docs] def policy(self, observation):
q_w = self.w_policy
assert q_w is not None
#
q_vec = self._compute_q_vec(q_w, observation, 0.0)
return q_vec.argmax()
def _optimistic_policy(self, observation, hh):
q_w = self.w_vec[hh, :]
q_vec = self._compute_q_vec(q_w, observation, self.bonus_scale_factor)
return q_vec.argmax()
def run_episode(self):
observation, info = self.env.reset()
episode_rewards = 0
for hh in range(self.horizon):
if self.bonus_scale_factor == 0.0:
action = self.env.action_space.sample()
else:
action = self._optimistic_policy(observation, hh)
next_observation, reward, terminated, truncated, info = self.env.step(
action
)
done = terminated or truncated
feat = self.feature_map.map(observation, action)
outer_prod = np.outer(feat, feat)
inv = self.lambda_mat_inv
#
self.lambda_mat += np.outer(feat, feat)
# update inverse
self.lambda_mat_inv -= (inv @ outer_prod @ inv) / (1 + feat @ inv.T @ feat)
# update history
self.reward_hist[self.total_time_steps] = reward
self.state_hist.append(observation)
self.action_hist.append(action)
self.nstate_hist.append(next_observation)
#
tt = self.total_time_steps
self.feat_hist[tt, :] = self.feature_map.map(observation, action)
for aa in range(self.env.action_space.n):
self.feat_ns_all_actions[tt, aa, :] = self.feature_map.map(
next_observation, aa
)
# increments
self.total_time_steps += 1
episode_rewards += reward
#
observation = next_observation
if done:
break
# store data
self._rewards[self.episode] = episode_rewards
# update ep
self.episode += 1
#
if self.writer is not None:
self.writer.add_scalar("episode_rewards", episode_rewards, self.episode)
return episode_rewards
def _compute_q(self, q_w, state, action, bonus_factor):
"""q_w is the vector representation of the Q function."""
feat = self.feature_map.map(state, action)
inverse_counts = feat @ (self.lambda_mat_inv.T @ feat)
bonus = bonus_factor * np.sqrt(inverse_counts) + self.v_max * inverse_counts * (
bonus_factor > 0.0
)
q = feat.dot(q_w) + bonus
return q
def _compute_q_vec(self, q_w, state, bonus_factor):
A = self.env.action_space.n
q_vec = np.zeros(A)
for aa in range(A):
# q_vec[aa] = self._compute_q(q_w, state, aa, bonus_factor)
feat = self.feature_map.map(state, aa)
inverse_counts = feat @ (self.lambda_mat_inv.T @ feat)
bonus = bonus_factor * np.sqrt(
inverse_counts
) + self.v_max * inverse_counts * (bonus_factor > 0.0)
q_vec[aa] = feat.dot(q_w) + bonus
# q_vec[aa] = min(q_vec[aa], self.v_max) # !!!!!!!!!
return q_vec
def _run_lsvi(self, bonus_factor):
# run value iteration
q_w = run_lsvi(
self.dim,
self.horizon,
bonus_factor,
self.lambda_mat_inv,
self.reward_hist,
self.gamma,
self.feat_hist,
self.env.action_space.n,
self.feat_ns_all_actions,
self.v_max,
self.total_time_steps,
)
return q_w