import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
import pickle
import bz2
import _pickle as cPickle
import dill
from rlberry.manager import ExperimentManager
import rlberry
logger = rlberry.logger
[docs]def evaluate_agents(
experiment_manager_list,
n_simulations=5,
choose_random_agents=True,
fignum=None,
show=True,
plot=True,
):
"""
Evaluate and compare each of the agents in experiment_manager_list.
Parameters
----------
experiment_manager_list : list of ExperimentManager objects.
n_simulations: int
Number of calls to the eval() method of each ExperimentManager instance.
choose_random_agents: bool
If true and n_fit>1, use a random fitted agent from each ExperimentManager at each evaluation.
Otherwise, each fitted agent of each ExperimentManager is evaluated n_simulations times.
fignum: string or int
Identifier of plot figure.
show: bool
If true, calls plt.show().
plot: bool
If false, do not plot.
Returns
-------
dataframe with the evaluation results.
Examples
--------
>>> from rlberry.agents.torch import A2CAgent, DQNAgent
>>> from rlberry.manager import ExperimentManager, evaluate_agents
>>> from rlberry.envs import gym_make
>>>
>>> if __name__=="__main__":
>>> managers = [ ExperimentManager(
>>> agent_class,
>>> (gym_make, dict(id="CartPole-v1")),
>>> fit_budget=1e4,
>>> eval_kwargs=dict(eval_horizon=500),
>>> n_fit=1,
>>> parallelization="process",
>>> mp_context="spawn",
>>> seed=42,
>>> ) for agent_class in [A2CAgent, DQNAgent]]
>>> for manager in managers:
>>> manager.fit()
>>> data = evaluate_agents(managers, n_simulations=50, plot=False)
"""
#
# evaluation
#
eval_outputs = []
for experiment_manager in experiment_manager_list:
logger.info(f"Evaluating {experiment_manager.agent_name}...")
if choose_random_agents:
outputs = experiment_manager.eval_agents(n_simulations)
else:
outputs = []
for idx in range(len(experiment_manager.agent_handlers)):
outputs += list(
experiment_manager.eval_agents(n_simulations, agent_id=idx)
)
if len(outputs) > 0:
eval_outputs.append(outputs)
if len(eval_outputs) == 0:
logger.error(
"[evaluate_agents]: No evaluation data. Make sure ExperimentManager.fit() has been called."
)
return
#
# plot
#
# build unique agent IDs (in case there are two agents with the same ID)
unique_ids = []
id_count = {}
for experiment_manager in experiment_manager_list:
name = experiment_manager.agent_name
if name not in id_count:
id_count[name] = 1
else:
id_count[name] += 1
unique_ids.append(name + "*" * (id_count[name] - 1))
# convert output to DataFrame
data = {}
for agent_id, out in zip(unique_ids, eval_outputs):
data[agent_id] = out
output = pd.DataFrame(data)
# plot
if plot:
plt.figure(fignum)
plt.boxplot(output.values, labels=output.columns)
plt.xlabel("agent")
plt.ylabel("evaluation output")
if show:
plt.show()
return output
[docs]def read_writer_data(data_source, tag=None, preprocess_func=None, id_agent=None):
"""
Given a list of ExperimentManager or a folder, read data (corresponding to info) obtained in each episode.
The dictionary returned by agents' .fit() method must contain a key equal to `info`.
Parameters
----------
data_source : :class:`~rlberry.manager.ExperimentManager`, or list of :class:`~rlberry.manager.ExperimentManager` or str or list of str
- If ExperimentManager or list of ExperimentManager, load data from it (the agents must be fitted).
- If str, the string must be the string path of a directory, each
subdirectory of this directory must contain pickle files.
Load the data from the directory of the latest experiment in date.
This str should be equal to the value of the `output_dir` parameter in
:class:`~rlberry.manager.ExperimentManager`.
- If list of str, each string must be a directory containing pickle files.
Load the data from these pickle files.
Note: the agent's save function must save its writer at the key `_writer`.
This is the default for rlberry agents.
tag : str or list of str or None
Tag of data that we want to preprocess.
preprocess_func: Callable or None or list of Callable or None
Function to apply to 'tag' column before returning data.
For instance, if tag=episode_rewards,setting preprocess_func=np.cumsum
will return cumulative rewards
If None, do not preprocess.
If tag is a list, preprocess_func must be None or a list of Callable or
None that matches the length of tag.
id_agent: int or None, default=None
If not None, returns the data only for agent 'id_agent'.
Returns
-------
Pandas DataFrame with data from writers.
Examples
--------
>>> from rlberry.agents.torch import A2CAgent, DQNAgent
>>> from rlberry.manager import ExperimentManager, read_writer_data
>>> from rlberry.envs import gym_make
>>>
>>> if __name__=="__main__":
>>> managers = [ ExperimentManager(
>>> agent_class,
>>> (gym_make, dict(id="CartPole-v1")),
>>> fit_budget=1e4,
>>> eval_kwargs=dict(eval_horizon=500),
>>> n_fit=1,
>>> parallelization="process",
>>> mp_context="spawn",
>>> seed=42,
>>> ) for agent_class in [A2CAgent, DQNAgent]]
>>> for manager in managers:
>>> manager.fit()
>>> data = read_writer_data(managers)
"""
input_dir = None
if not isinstance(data_source, list):
if isinstance(data_source, ExperimentManager):
data_source = [data_source]
else:
take_last_date = True
else:
if not isinstance(data_source[0], ExperimentManager):
take_last_date = False
for dir in data_source:
files = list(Path(dir).iterdir())
if len(files) == 0:
raise RuntimeError(
"One of the files in data_source does not contain pickle files"
)
if isinstance(data_source[0], ExperimentManager):
experiment_manager_list = data_source
else:
input_dir = data_source
if isinstance(tag, str):
tags = [tag]
preprocess_funcs = [preprocess_func or (lambda x: x)]
elif tag is not None:
tags = tag
if preprocess_func is None:
preprocess_funcs = [lambda x: x for _ in range(len(tags))]
else:
assert len(preprocess_func) == len(tags)
preprocess_funcs = preprocess_func
writer_datas = []
if input_dir is not None:
if take_last_date:
subdirs = list((Path(input_dir) / "manager_data").iterdir())
agent_name_list = [str(p.stem).split("_")[0] for p in subdirs]
for name in agent_name_list:
filename, dir_name = _get_last_xp(input_dir, name)
writer_datas.append(_load_data(filename, dir_name, id_agent))
else:
agent_name_list = [str(Path(p).stem).split("_")[0] for p in input_dir]
agent_dirs = [str(Path(p).parent).split("_")[0] for p in input_dir]
for id_f, filename in enumerate(input_dir):
writer_datas.append(_load_data(filename, agent_dirs[id_f], id_agent))
else:
for manager in experiment_manager_list:
# Important: since manager can be a RemoteExperimentManager,
# it is important to avoid repeated accesses to its methods and properties.
# That is why writer_data is taken from the manager instance only in
# the line below.
writer_datas.append(manager.get_writer_data())
agent_name_list = [manager.agent_name for manager in experiment_manager_list]
# preprocess agent stats
data_list = []
for id_agent, agent_name in enumerate(agent_name_list):
writer_data = writer_datas[id_agent]
if writer_data is not None:
for idx in writer_data:
if tag is None:
tags = list(writer_data[idx]["tag"].unique())
preprocess_funcs = [lambda x: x for _ in range(len(tags))]
for id_tag, tag in enumerate(tags):
df = writer_data[idx]
processed_df = pd.DataFrame(df[df["tag"] == tag])
processed_df["value"] = preprocess_funcs[id_tag](
processed_df["value"].values
)
# update name according to ExperimentManager name and
# n_simulation
processed_df["name"] = agent_name
processed_df["n_simu"] = idx
if len(df[df["tag"] != tag]) > 0:
processed_df = pd.concat(
[processed_df, df[df["tag"] != tag]], ignore_index=True
)
# add column
data_list.append(processed_df)
all_writer_data = pd.concat(data_list, ignore_index=True)
return all_writer_data
def _get_last_xp(input_dir, name):
dir_name = Path(input_dir) / "manager_data"
# list all of the experiments for this particular agent
agent_xp = list(dir_name.glob(name + "*"))
# get the times at which the experiment have been made
times = [str(p).split("_")[-2] for p in agent_xp]
days = [str(p).split("_")[-3] for p in agent_xp]
hashs = [str(p).split("_")[-1] for p in agent_xp]
datetimes = [
datetime.strptime(days[i] + "_" + times[i], "%Y-%m-%d_%H-%M-%S")
for i in range(len(days))
]
if len(datetimes) == 0:
raise ValueError(
"input dir not found, verify that the agent are trained "
'and that ExperimentManager.outdir_id_style="timestamp"'
)
# get the date of last experiment
max_date = np.max(datetimes)
id_max = np.argmax(datetimes)
hash = hashs[id_max]
agent_folder = (
name + "_" + datetime.strftime(max_date, "%Y-%m-%d_%H-%M-%S") + "_" + hash
)
return agent_folder, dir_name
def is_bz_file(filepath):
with open(filepath, "rb") as test_f:
return test_f.read(2) == b"BZ"
def _load_data(agent_folder, dir_name, id_agent):
writer_data = {}
agent_dir = Path(dir_name) / agent_folder
# list all the fits of this experiment
exp_files = (agent_dir / Path("agent_handlers")).iterdir()
nfit = len(
[1 for a_ in [str(e).split(".") for e in exp_files] if a_[-1] == "pickle"]
)
# nfit = len(list(exp_files))
if nfit == 0:
raise ValueError("Folders do not contain pickle files")
if id_agent is not None:
id_fits = [id_agent]
else:
id_fits = range(nfit)
for ii in id_fits:
# For each fit, load the writer data
handler_name = agent_dir / Path(f"agent_handlers/idx_{ii}.pickle")
try:
if is_bz_file(handler_name):
with bz2.BZ2File(handler_name, "rb") as ff:
tmp_dict = cPickle.load(ff)
else:
with handler_name.open("rb") as ff:
tmp_dict = pickle.load(ff)
except Exception:
if not is_bz_file(handler_name):
with handler_name.open("rb") as ff:
tmp_dict = dill.load(ff)
else:
with bz2.BZ2File(handler_name, "rb") as ff:
tmp_dict = dill.load(ff)
writer_data[str(ii)] = tmp_dict.get("_writer").data
return writer_data