Module easyagents.backends.core

This module contains backend core classes like Backend and BackendAgent.

The concrete backends like tfagent or baselines are implemented in seprate modules.

View Source
"""This module contains backend core classes like Backend and BackendAgent.

    The concrete backends like tfagent or baselines are implemented in seprate modules.

"""

from abc import ABC, ABCMeta, abstractmethod

from typing import List, Optional, Tuple, Type, Dict

import datetime

import gym

import os

import shutil

import tempfile

import tensorflow

import numpy

import random

from easyagents import core

from easyagents.backends import monitor

from easyagents.callbacks import plot

_tf_eager_execution_active : Optional[bool] = None

def _get_temp_path():

    """Yields a path to a non-existent temporary directory inside the systems temp path."""

    result = os.path.join(tempfile.gettempdir(), tempfile.gettempprefix())

    n = datetime.datetime.now()

    result = result + f'-{n.year % 100:2}{n.month:02}{n.day:02}-{n.hour:02}{n.minute:02}{n.second:02}-' + \

             f'{n.microsecond:06}'

    return result

def _mkdir(directory: str):

    """Creates a directory with the given path. NoOps if the directory already exists.

        If a file exists at path, the file is removed.

        Returns:

            the absolute path to directory

    """

    directory = os.path.abspath(directory)

    if os.path.isfile(directory):

        _rmpath(directory)

    os.makedirs(directory, exist_ok=True)

    return directory

def _rmpath(path: str):

    """Removes the file or directory and its content. NoOps if the directory does not exist.

    Errors are ignored.

    """

    if path:

        if os.path.isdir(path):

            shutil.rmtree(path, ignore_errors=True)

        if os.path.isfile(path):

            os.remove(path)

class _BackendEvalCallback(core.AgentCallback):

    """Evaluates an agents current policy and updates its train_context accordingly."""

    def __init__(self, train_context: core.TrainContext):

        assert train_context, "train_context not set"

        assert train_context.num_episodes_per_eval > 0, "num_episodes_per_eval is 0."

        self._train_contex = train_context

    def on_play_episode_end(self, agent_context: core.AgentContext):

        pc = agent_context.play

        tc = self._train_contex

        sum_of_r = pc.sum_of_rewards.values()

        tc.eval_rewards[tc.episodes_done_in_training] = (

            min(sum_of_r), sum(sum_of_r) / len(sum_of_r), max(sum_of_r))

        steps = [len(episode_rewards) for episode_rewards in pc.rewards.values()]

        tc.eval_steps[tc.episodes_done_in_training] = (min(steps), sum(steps) / len(steps), max(steps))

class _BackendAgent(ABC):

    """Base class for all backend agent implementations.

        Implements the train loop and calls the Callbacks.

    """

    def __init__(self, model_config: core.ModelConfig, backend_name: str, tf_eager_execution: bool):

        """

        Args:

            model_config: defines the model and environment to be used

            backend_name: id of the backend to which this agent belongs to.

        """

        global _tf_eager_execution_active

        assert model_config is not None, "model_config not set."

        assert backend_name

        if _tf_eager_execution_active is None:

            _tf_eager_execution_active = tf_eager_execution

        assert _tf_eager_execution_active == tf_eager_execution, \

            "Due to an incompatibility between tensorforce and tfagents their agents can not be instantiated in the" +\

            "same python runtime instance (conflicting excpectations on tensorflows eager execution mode)."

        self._backend_name: str = backend_name

        self.model_config = model_config

        self._agent_context: core.AgentContext = core.AgentContext(self.model_config)

        self._agent_context.gym._totals = monitor._register_gym_monitor(self.model_config.original_env_name)

        self.model_config.gym_env_name = self._agent_context.gym._totals.gym_env_name

        self._preprocess_callbacks: List[core._PreProcessCallback] = [plot._PreProcess()]

        self._callbacks: List[core.AgentCallback] = []

        self._postprocess_callbacks: List[core._PostProcessCallback] = [plot._PostProcess()]

        self._train_total_episodes_on_iteration_begin: int = 0

    def _set_seed(self):

        """ sets the random seeds for all dependent packages """

        if not self.model_config.seed is None:

            seed = self.model_config.seed

            self.log_api(f'tf.random.set_seed', f'(seed={seed})')

            tensorflow.random.set_seed(seed=seed)

            self.log_api(f'numpy.random.seed', f'({seed})')

            numpy.random.seed(seed)

            self.log_api(f'random.seed', f'({seed})')

            random.seed(seed)

        return

    def _eval_current_policy(self):

        """Evaluates the current policy using play and updates the train_context

            If num_episodes_per_eval or num_iterations_per_eval is 0 no evaluation is performed.

        """

        tc = self._agent_context.train

        assert tc, "train_context not set"

        if tc.num_episodes_per_eval and tc.num_iterations_between_eval:

            callbacks = [_BackendEvalCallback(self._agent_context.train)] + self._callbacks

            self.play(play_context=core.PlayContext(self._agent_context.train), callbacks=callbacks)

    def log_api(self, api_target: str, log_msg: Optional[str] = None):

        """Logs a call to api_target with additional log_msg."""

        self._agent_context.gym._monitor_env = None

        if api_target is None:

            api_target = ''

        if log_msg is None:

            log_msg = ''

        for c in self._callbacks:

            c.on_api_log(self._agent_context, api_target, log_msg=log_msg)

    def log(self, log_msg: str):

        """Logs msg."""

        self._agent_context.gym._monitor_env = None

        if log_msg is None:

            log_msg = ''

        for c in self._callbacks:

            c.on_log(self._agent_context, log_msg=log_msg)

    def _on_gym_init_begin(self):

        """called when the monitored environment begins the instantiation of a new gym environment.

        Hint:

            the total instances count is not incremented yet."""

        self._agent_context.gym._monitor_env = None

        for c in self._callbacks:

            c.on_gym_init_begin(self._agent_context)

        self._agent_context.gym._monitor_env = None

    def _on_gym_init_end(self, env: monitor._MonitorEnv):

        """called when the monitored environment completed the instantiation of a new gym environment.

        Hint:

            o the total instances count is incremented by now

            o the new env (and its action space) is seeded with the api_context's seed

        """

        self._agent_context.gym._monitor_env = env

        if self._agent_context.model.seed is not None:

            env = self._agent_context.gym.gym_env

            seed = self._agent_context.model.seed

            env.seed(seed)

        for c in self._callbacks:

            c.on_gym_init_end(self._agent_context)

        self._agent_context.gym._monitor_env = None

    def _on_gym_reset_begin(self, env: monitor._MonitorEnv, **kwargs):

        """called when the monitored environment begins a reset.

        Hint:

            the total reset count is not incremented yet."""

        self._agent_context.gym._monitor_env = env

        for c in self._callbacks:

            c.on_gym_reset_begin(self._agent_context, **kwargs)

        self._agent_context.gym._monitor_env = None

    def _on_gym_reset_end(self, env: monitor._MonitorEnv, reset_result: Tuple, **kwargs):

        """called when the monitored environment completed a reset.

        Hint:

            the total episode count is incremented by now (if a step was performed before the last reset)."""

        self._agent_context.gym._monitor_env = env

        for c in self._callbacks:

            c.on_gym_reset_end(self._agent_context, reset_result, **kwargs)

        self._agent_context.gym._monitor_env = None

    def _on_gym_step_begin(self, env: monitor._MonitorEnv, action):

        """called when the monitored environment begins a step.

        Hint:

            o sets env.max_steps_per_episode if we are in train / play. Thus the episode is ended

              by the MonitorEnv if the step limit is exceeded

        """

        ac = self._agent_context

        ac.gym._monitor_env = env

        env.max_steps_per_episode = None

        if ac.is_play or ac.is_eval:

            env.max_steps_per_episode = ac.play.max_steps_per_episode

            self._on_play_step_begin(action)

        if ac.is_train:

            env.max_steps_per_episode = ac.train.max_steps_per_episode

            self._on_train_step_begin(action)

        for c in self._callbacks:

            c.on_gym_step_begin(self._agent_context, action)

        self._agent_context.gym._monitor_env = None

    def _on_gym_step_end(self, env: monitor._MonitorEnv, action, step_result: Tuple):

        """called when the monitored environment completed a step.

        Args:

            env: the gym_env the last step was done on

            step_result: the result (state, reward, done, info) of the last step call

        """

        ac = self._agent_context

        ac.gym._monitor_env = env

        if ac.is_play or ac.is_eval:

            self._on_play_step_end(action, step_result)

        if ac.is_train:

            self._on_train_step_end(action, step_result)

        for c in self._callbacks:

            c.on_gym_step_end(self._agent_context, action, step_result)

        self._agent_context.gym._monitor_env = None

        env.max_steps_per_episode = None

    def _on_play_begin(self):

        """Must NOT be called by play_implementation"""

        for c in self._callbacks:

            c.on_play_begin(self._agent_context)

    def _on_play_end(self):

        """Must NOT be called by play_implementation"""

        for c in self._callbacks:

            c.on_play_end(self._agent_context)

        self._agent_context.play.gym_env = None

    def on_play_episode_begin(self, env: gym.core.Env):

        """Must be called by play_implementation at the beginning of a new episode

        Args:

            env: the gym environment used to play the episode.

        """

        assert env, "env not set."

        assert isinstance(env, gym.core.Env), "env not an an instance of gym.Env."

        pc = self._agent_context.play

        pc.gym_env = env

        pc.steps_done_in_episode = 0

        pc.actions[pc.episodes_done + 1] = []

        pc.rewards[pc.episodes_done + 1] = []

        pc.sum_of_rewards[pc.episodes_done + 1] = 0

        for c in self._callbacks:

            c.on_play_episode_begin(self._agent_context)

    def on_play_episode_end(self):

        """Must be called by play_implementation at the end of an episode"""

        pc = self._agent_context.play

        pc.episodes_done += 1

        if pc.num_episodes and pc.episodes_done >= pc.num_episodes:

            pc.play_done = True

        for c in self._callbacks:

            c.on_play_episode_end(self._agent_context)

    def _on_play_step_begin(self, action):

        """Called before each call to gym.step on the current play env (agent_context.play.gym_env)

            Args:

                action: the action to be passed to the upcoming gym_env.step call

        """

        for c in self._callbacks:

            c.on_play_step_begin(self._agent_context, action)

    def _on_play_step_end(self, action, step_result: Tuple):

        """Called after each call to gym.step on the current play env (agent_context.play.gym_env)

        Args:

            step_result: the result (state, reward, done, info) of the last step call

        """

        (state, reward, done, info) = step_result

        pc = self._agent_context.play

        pc.steps_done_in_episode += 1

        pc.steps_done += 1

        pc.actions[pc.episodes_done + 1].append(action)

        pc.rewards[pc.episodes_done + 1].append(reward)

        pc.sum_of_rewards[pc.episodes_done + 1] += reward

        for c in self._callbacks:

            c.on_play_step_end(self._agent_context, action, step_result)

    def _on_train_begin(self):

        """Must NOT be called by train_implementation"""

        for c in self._callbacks:

            c.on_train_begin(self._agent_context)

    def _on_train_end(self):

        """Must NOT be called by train_implementation"""

        tc = self._agent_context.train

        if tc.episodes_done_in_training not in tc.eval_rewards:

            self._eval_current_policy()

        for c in self._callbacks:

            c.on_train_end(self._agent_context)

    def on_train_iteration_begin(self):

        """Must be called by train_implementation at the begining of a new iteration"""

        tc = self._agent_context.train

        tc.episodes_done_in_iteration = 0

        tc.steps_done_in_iteration = 0

        if tc.iterations_done_in_training == 0:

            self._eval_current_policy()

        self._train_total_episodes_on_iteration_begin = self._agent_context.gym._totals.episodes_done

        for c in self._callbacks:

            c.on_train_iteration_begin(self._agent_context)

    def on_train_iteration_end(self, loss: float, **kwargs):

        """Must be called by train_implementation at the end of an iteration

        Evaluates the current policy. Use kwargs to set additional dict values in train context.

        Eg for an ActorCriticTrainContext the losses may be set like this:

            on_train_iteration(loss=123,actor_loss=456,critic_loss=789)

        Args:

            loss: loss after the training of the model in this iteration or math.nan if the loss is not available

            **kwargs: if a keyword matches a dict property of the TrainContext instance, then

                        the dict[episodes_done_in_training] is set to the arg.

        """

        tc = self._agent_context.train

        totals = self._agent_context.gym._totals

        tc.episodes_done_in_iteration = (totals.episodes_done - self._train_total_episodes_on_iteration_begin)

        tc.episodes_done_in_training += tc.episodes_done_in_iteration

        tc.loss[tc.episodes_done_in_training] = loss

        # set traincontext dict from kwargs:

        for prop_name in kwargs:

            prop_instance = getattr(tc, prop_name, None)

            prop_value = kwargs[prop_name]

            if prop_instance is not None and isinstance(prop_instance, dict):

                prop_instance[tc.episodes_done_in_training] = prop_value

        tc.iterations_done_in_training += 1

        if tc.num_iterations is not None:

            tc.training_done = tc.iterations_done_in_training >= tc.num_iterations

        self._train_total_episodes_on_iteration_begin = 0

        if tc.num_iterations_between_eval and (tc.iterations_done_in_training % tc.num_iterations_between_eval == 0):

            self._eval_current_policy()

        for c in self._callbacks:

            c.on_train_iteration_end(self._agent_context)

    def _on_train_step_begin(self, action):

        """Called before each call to gym.step on the current train env (agent_context.train.gym_env)

            Args:

                action: the action to be passed to the upcoming gym_env.step call

        """

        pass

    def load(self, directory: str, callbacks: List[core.AgentCallback]):

        """Loads a previously trained and saved actor policy from directory.

        The loaded policy may afterwards be used by calling play().

        Args:

            directory: the directory containing the trained policy

            callbacks: list of callbacks called during the load.

        """

        assert callbacks is not None, "callbacks not set"

        assert directory

        assert os.path.isdir(directory)

        self._callbacks = callbacks

        self.load_implementation(directory)

        self._agent_context._is_policy_trained = True

        self._callbacks = None

    @abstractmethod

    def load_implementation(self, directory: str):

        """Loads a previously trained and saved actor policy from directory.

        The loaded policy may afterwards be used by calling play().

        Args:

            directory: the directory containing the trained policy.

        """

    # noinspection PyUnusedLocal

    def _on_train_step_end(self, action: object, step_result: Tuple):

        """Called after each call to gym.step on the current train env (agent_context.train.gym_env)

        Args:

            step_result: the result (state, reward, done, info) of the last step call

        """

        tc = self._agent_context.train

        tc.steps_done_in_iteration += 1

        tc.steps_done_in_training += 1

    def play(self, play_context: core.PlayContext, callbacks: List[core.AgentCallback]):

        """Forwarding to play_implementation overriden by the subclass.

            Args:

                play_context: play configuration to be used

                callbacks: list of callbacks called during play.

        """

        assert callbacks is not None, "callbacks not set"

        assert play_context, "play_context not set"

        assert self._agent_context.play is None, "play_context already set in agent_context"

        play_context._reset()

        play_context._validate()

        self._agent_context.play = play_context

        old_callbacks = self._callbacks

        self._callbacks = callbacks

        try:

            monitor._MonitorEnv._register_backend_agent(self)

            self._on_play_begin()

            self.play_implementation(self._agent_context.play)

            self._on_play_end()

        finally:

            monitor._MonitorEnv._register_backend_agent(None)

            self._callbacks = old_callbacks

            self._agent_context.play = None

    @abstractmethod

    def play_implementation(self, play_context: core.PlayContext):

        """Agent specific implementation of playing a single episode with the current policy.

            For implementation details see BackendBaseAgent.

        """

    def train(self, train_context: core.TrainContext, callbacks: List[core.AgentCallback]):

        """Forwarding to train_implementation overriden by the subclass

            Args:

                train_context: training configuration to be used

                callbacks: list of callbacks called during the training and evaluation.

        """

        assert callbacks is not None, "callbacks not set"

        assert train_context, "train_context not set"

        train_context._reset()

        train_context._validate()

        self._agent_context.train = train_context

        self._agent_context.play = None

        self._callbacks = callbacks

        try:

            self.log_api(f'backend_name', f'{self._backend_name}')

            self._set_seed()

            monitor._MonitorEnv._register_backend_agent(self)

            self._on_train_begin()

            self._agent_context._is_policy_trained = True

            self.train_implementation(self._agent_context.train)

            self._on_train_end()

        finally:

            monitor._MonitorEnv._register_backend_agent(None)

            self._callbacks = None

            self._agent_context.play = None

            self._agent_context.train = None

    @abstractmethod

    def train_implementation(self, train_context: core.TrainContext):

        """Agent specific implementation of the train loop.

            For implementational details see BackendBaseAgent.

        """

    def save(self, directory: str, callbacks: List[core.AgentCallback]):

        """Saves the currently trained actor policy in directory.

        Only the actor policy is guaranteed to be saved.

        Thus after a call to load resuming training is not supported.

        Args:

            directory: the directory to save the policy weights to. the directory must exist.

            callbacks: list of callbacks called during policy load.

        """

        assert callbacks is not None, "callbacks not set"

        assert directory

        assert os.path.isdir(directory)

        assert self._agent_context._is_policy_trained, "No trained policy available."

        self._callbacks = callbacks

        self.save_implementation(directory)

        self._callbacks = None

    @abstractmethod

    def save_implementation(self, directory: str):

        """Agent speecific implementation of saving the weights for the actor policy.

        Save must only guarantee to persist the weights of the actor policy.

        The implementation may write multiple files with fixed filenames.

        Args:

             directory: the (existing) directory to save the policy weights to.

        """

class BackendAgent(_BackendAgent, metaclass=ABCMeta):

    """Base class for all BackendAgent implementation.

        Explicitely exhibits all methods that should be overriden by an implementing agent.

    """

    @abstractmethod

    def load_implementation(self, directory: str):

        """Loads a previously trained and saved actor policy from directory.

        The loaded policy may afterwards be used by calling play().

        Args:

            directory: the directory containing the trained policy.

        """

    @abstractmethod

    def play_implementation(self, play_context: core.PlayContext):

        """Agent specific implementation of playing a number of episodes with the current policy.

            The implementation should have the form:

            while True:

                on_play_episode_begin(env)

                state = env.reset()

                while True:

                    action = _trained_policy.action(state)

                    (state, reward, done, info) = env.step(action)

                    if done:

                        break

                on_play_episode_end()

                if play_context.play_done:

                    break

            Args:

                play_context: play configuration to be used

        """

    @abstractmethod

    def save_implementation(self, directory: str):

        """Agent speecific implementation of saving the weights for the actor policy.

        Save must only guarantee to persist the weights of the actor policy.

        The implementation may write multiple files with fixed filenames.

        Args:

             directory: the directory to save the policy weights to.

        """

    @abstractmethod

    def train_implementation(self, train_context: core.TrainContext):

        """Agent specific implementation of the train loop.

            The implementation should have the form:

            while True:

                on_iteration_begin

                for e in num_episodes_per_iterations

                    play episode and record steps (while steps_in_episode < max_steps_per_episode and)

                train policy for num_epochs_per_iteration epochs

                on_iteration_end( loss )

                if training_done

                    break

            Args:

                train_context: context configuring the train loop

            Hints:

            o the subclasses training loss is passed through to BackendAgent by on_iteration_end.

              Thus the subclass must not add the experienced loss to the TrainContext.

        """

class BackendAgentFactory(ABC):

    """Backend agent factory defining the currently available agents (algorithms).

    """

    backend_name: str = 'abstract_BackendAgentFactory'

    def create_agent(self, easyagent_type: Type, model_config: core.ModelConfig) \

            -> Optional[_BackendAgent]:

        """Creates a backend agent instance implementing the algorithm given by agent_type.

        Args:

            easyagent_type: the EasyAgent derived type for which an implementing backend instance will be created

            model_config: the model_config passed to the constructor of the backend instance.

        Returns:

            instance of the agent or None if not implemented by this backend.

        """

        result: Optional[_BackendAgent] = None

        algorithms = self.get_algorithms()

        if easyagent_type in algorithms:

            result = algorithms[easyagent_type](model_config=model_config)

        return result

    def get_algorithms(self) -> Dict[Type, Type[_BackendAgent]]:

        """Yields a mapping of EasyAgent types to the implementations provided by this backend."""

        return {}

Classes

BackendAgent

class BackendAgent(
    model_config: easyagents.core.ModelConfig,
    backend_name: str,
    tf_eager_execution: bool
)

Base class for all BackendAgent implementation.

Explicitely exhibits all methods that should be overriden by an implementing agent.

View Source
class BackendAgent(_BackendAgent, metaclass=ABCMeta):

    """Base class for all BackendAgent implementation.

        Explicitely exhibits all methods that should be overriden by an implementing agent.

    """

    @abstractmethod

    def load_implementation(self, directory: str):

        """Loads a previously trained and saved actor policy from directory.

        The loaded policy may afterwards be used by calling play().

        Args:

            directory: the directory containing the trained policy.

        """

    @abstractmethod

    def play_implementation(self, play_context: core.PlayContext):

        """Agent specific implementation of playing a number of episodes with the current policy.

            The implementation should have the form:

            while True:

                on_play_episode_begin(env)

                state = env.reset()

                while True:

                    action = _trained_policy.action(state)

                    (state, reward, done, info) = env.step(action)

                    if done:

                        break

                on_play_episode_end()

                if play_context.play_done:

                    break

            Args:

                play_context: play configuration to be used

        """

    @abstractmethod

    def save_implementation(self, directory: str):

        """Agent speecific implementation of saving the weights for the actor policy.

        Save must only guarantee to persist the weights of the actor policy.

        The implementation may write multiple files with fixed filenames.

        Args:

             directory: the directory to save the policy weights to.

        """

    @abstractmethod

    def train_implementation(self, train_context: core.TrainContext):

        """Agent specific implementation of the train loop.

            The implementation should have the form:

            while True:

                on_iteration_begin

                for e in num_episodes_per_iterations

                    play episode and record steps (while steps_in_episode < max_steps_per_episode and)

                train policy for num_epochs_per_iteration epochs

                on_iteration_end( loss )

                if training_done

                    break

            Args:

                train_context: context configuring the train loop

            Hints:

            o the subclasses training loss is passed through to BackendAgent by on_iteration_end.

              Thus the subclass must not add the experienced loss to the TrainContext.

        """

Ancestors (in MRO)

  • easyagents.backends.core._BackendAgent
  • abc.ABC

Descendants

  • easyagents.backends.default.TensorforceNotActiveAgent
  • easyagents.backends.default.TfAgentsNotActiveAgent
  • easyagents.backends.default.SetTensorforceBackendAgent
  • easyagents.backends.default.NotImplementedYetAgent
  • easyagents.backends.tfagents.TfAgent

Methods

load
def load(
    self,
    directory: str,
    callbacks: List[easyagents.core.AgentCallback]
)

Loads a previously trained and saved actor policy from directory.

The loaded policy may afterwards be used by calling play().

Args: directory: the directory containing the trained policy callbacks: list of callbacks called during the load.

View Source
    def load(self, directory: str, callbacks: List[core.AgentCallback]):

        """Loads a previously trained and saved actor policy from directory.

        The loaded policy may afterwards be used by calling play().

        Args:

            directory: the directory containing the trained policy

            callbacks: list of callbacks called during the load.

        """

        assert callbacks is not None, "callbacks not set"

        assert directory

        assert os.path.isdir(directory)

        self._callbacks = callbacks

        self.load_implementation(directory)

        self._agent_context._is_policy_trained = True

        self._callbacks = None
load_implementation
def load_implementation(
    self,
    directory: str
)

Loads a previously trained and saved actor policy from directory.

The loaded policy may afterwards be used by calling play().

Args: directory: the directory containing the trained policy.

View Source
    @abstractmethod

    def load_implementation(self, directory: str):

        """Loads a previously trained and saved actor policy from directory.

        The loaded policy may afterwards be used by calling play().

        Args:

            directory: the directory containing the trained policy.

        """
log
def log(
    self,
    log_msg: str
)

Logs msg.

View Source
    def log(self, log_msg: str):

        """Logs msg."""

        self._agent_context.gym._monitor_env = None

        if log_msg is None:

            log_msg = ''

        for c in self._callbacks:

            c.on_log(self._agent_context, log_msg=log_msg)
log_api
def log_api(
    self,
    api_target: str,
    log_msg: Union[str, NoneType] = None
)

Logs a call to api_target with additional log_msg.

View Source
    def log_api(self, api_target: str, log_msg: Optional[str] = None):

        """Logs a call to api_target with additional log_msg."""

        self._agent_context.gym._monitor_env = None

        if api_target is None:

            api_target = ''

        if log_msg is None:

            log_msg = ''

        for c in self._callbacks:

            c.on_api_log(self._agent_context, api_target, log_msg=log_msg)
on_play_episode_begin
def on_play_episode_begin(
    self,
    env: gym.core.Env
)

Must be called by play_implementation at the beginning of a new episode

Args: env: the gym environment used to play the episode.

View Source
    def on_play_episode_begin(self, env: gym.core.Env):

        """Must be called by play_implementation at the beginning of a new episode

        Args:

            env: the gym environment used to play the episode.

        """

        assert env, "env not set."

        assert isinstance(env, gym.core.Env), "env not an an instance of gym.Env."

        pc = self._agent_context.play

        pc.gym_env = env

        pc.steps_done_in_episode = 0

        pc.actions[pc.episodes_done + 1] = []

        pc.rewards[pc.episodes_done + 1] = []

        pc.sum_of_rewards[pc.episodes_done + 1] = 0

        for c in self._callbacks:

            c.on_play_episode_begin(self._agent_context)
on_play_episode_end
def on_play_episode_end(
    self
)

Must be called by play_implementation at the end of an episode

View Source
    def on_play_episode_end(self):

        """Must be called by play_implementation at the end of an episode"""

        pc = self._agent_context.play

        pc.episodes_done += 1

        if pc.num_episodes and pc.episodes_done >= pc.num_episodes:

            pc.play_done = True

        for c in self._callbacks:

            c.on_play_episode_end(self._agent_context)
on_train_iteration_begin
def on_train_iteration_begin(
    self
)

Must be called by train_implementation at the begining of a new iteration

View Source
    def on_train_iteration_begin(self):

        """Must be called by train_implementation at the begining of a new iteration"""

        tc = self._agent_context.train

        tc.episodes_done_in_iteration = 0

        tc.steps_done_in_iteration = 0

        if tc.iterations_done_in_training == 0:

            self._eval_current_policy()

        self._train_total_episodes_on_iteration_begin = self._agent_context.gym._totals.episodes_done

        for c in self._callbacks:

            c.on_train_iteration_begin(self._agent_context)
on_train_iteration_end
def on_train_iteration_end(
    self,
    loss: float,
    **kwargs
)

Must be called by train_implementation at the end of an iteration

Evaluates the current policy. Use kwargs to set additional dict values in train context. Eg for an ActorCriticTrainContext the losses may be set like this: on_train_iteration(loss=123,actor_loss=456,critic_loss=789)

Args: loss: loss after the training of the model in this iteration or math.nan if the loss is not available **kwargs: if a keyword matches a dict property of the TrainContext instance, then the dict[episodes_done_in_training] is set to the arg.

View Source
    def on_train_iteration_end(self, loss: float, **kwargs):

        """Must be called by train_implementation at the end of an iteration

        Evaluates the current policy. Use kwargs to set additional dict values in train context.

        Eg for an ActorCriticTrainContext the losses may be set like this:

            on_train_iteration(loss=123,actor_loss=456,critic_loss=789)

        Args:

            loss: loss after the training of the model in this iteration or math.nan if the loss is not available

            **kwargs: if a keyword matches a dict property of the TrainContext instance, then

                        the dict[episodes_done_in_training] is set to the arg.

        """

        tc = self._agent_context.train

        totals = self._agent_context.gym._totals

        tc.episodes_done_in_iteration = (totals.episodes_done - self._train_total_episodes_on_iteration_begin)

        tc.episodes_done_in_training += tc.episodes_done_in_iteration

        tc.loss[tc.episodes_done_in_training] = loss

        # set traincontext dict from kwargs:

        for prop_name in kwargs:

            prop_instance = getattr(tc, prop_name, None)

            prop_value = kwargs[prop_name]

            if prop_instance is not None and isinstance(prop_instance, dict):

                prop_instance[tc.episodes_done_in_training] = prop_value

        tc.iterations_done_in_training += 1

        if tc.num_iterations is not None:

            tc.training_done = tc.iterations_done_in_training >= tc.num_iterations

        self._train_total_episodes_on_iteration_begin = 0

        if tc.num_iterations_between_eval and (tc.iterations_done_in_training % tc.num_iterations_between_eval == 0):

            self._eval_current_policy()

        for c in self._callbacks:

            c.on_train_iteration_end(self._agent_context)
play
def play(
    self,
    play_context: easyagents.core.PlayContext,
    callbacks: List[easyagents.core.AgentCallback]
)

Forwarding to play_implementation overriden by the subclass.

Args: play_context: play configuration to be used callbacks: list of callbacks called during play.

View Source
    def play(self, play_context: core.PlayContext, callbacks: List[core.AgentCallback]):

        """Forwarding to play_implementation overriden by the subclass.

            Args:

                play_context: play configuration to be used

                callbacks: list of callbacks called during play.

        """

        assert callbacks is not None, "callbacks not set"

        assert play_context, "play_context not set"

        assert self._agent_context.play is None, "play_context already set in agent_context"

        play_context._reset()

        play_context._validate()

        self._agent_context.play = play_context

        old_callbacks = self._callbacks

        self._callbacks = callbacks

        try:

            monitor._MonitorEnv._register_backend_agent(self)

            self._on_play_begin()

            self.play_implementation(self._agent_context.play)

            self._on_play_end()

        finally:

            monitor._MonitorEnv._register_backend_agent(None)

            self._callbacks = old_callbacks

            self._agent_context.play = None
play_implementation
def play_implementation(
    self,
    play_context: easyagents.core.PlayContext
)

Agent specific implementation of playing a number of episodes with the current policy.

The implementation should have the form:

while True: on_play_episode_begin(env) state = env.reset() while True: action = _trained_policy.action(state) (state, reward, done, info) = env.step(action) if done: break on_play_episode_end() if play_context.play_done: break

Args: play_context: play configuration to be used

View Source
    @abstractmethod

    def play_implementation(self, play_context: core.PlayContext):

        """Agent specific implementation of playing a number of episodes with the current policy.

            The implementation should have the form:

            while True:

                on_play_episode_begin(env)

                state = env.reset()

                while True:

                    action = _trained_policy.action(state)

                    (state, reward, done, info) = env.step(action)

                    if done:

                        break

                on_play_episode_end()

                if play_context.play_done:

                    break

            Args:

                play_context: play configuration to be used

        """
save
def save(
    self,
    directory: str,
    callbacks: List[easyagents.core.AgentCallback]
)

Saves the currently trained actor policy in directory.

Only the actor policy is guaranteed to be saved. Thus after a call to load resuming training is not supported.

Args: directory: the directory to save the policy weights to. the directory must exist. callbacks: list of callbacks called during policy load.

View Source
    def save(self, directory: str, callbacks: List[core.AgentCallback]):

        """Saves the currently trained actor policy in directory.

        Only the actor policy is guaranteed to be saved.

        Thus after a call to load resuming training is not supported.

        Args:

            directory: the directory to save the policy weights to. the directory must exist.

            callbacks: list of callbacks called during policy load.

        """

        assert callbacks is not None, "callbacks not set"

        assert directory

        assert os.path.isdir(directory)

        assert self._agent_context._is_policy_trained, "No trained policy available."

        self._callbacks = callbacks

        self.save_implementation(directory)

        self._callbacks = None
save_implementation
def save_implementation(
    self,
    directory: str
)

Agent speecific implementation of saving the weights for the actor policy.

Save must only guarantee to persist the weights of the actor policy. The implementation may write multiple files with fixed filenames.

Args: directory: the directory to save the policy weights to.

View Source
    @abstractmethod

    def save_implementation(self, directory: str):

        """Agent speecific implementation of saving the weights for the actor policy.

        Save must only guarantee to persist the weights of the actor policy.

        The implementation may write multiple files with fixed filenames.

        Args:

             directory: the directory to save the policy weights to.

        """
train
def train(
    self,
    train_context: easyagents.core.TrainContext,
    callbacks: List[easyagents.core.AgentCallback]
)

Forwarding to train_implementation overriden by the subclass

Args: train_context: training configuration to be used callbacks: list of callbacks called during the training and evaluation.

View Source
    def train(self, train_context: core.TrainContext, callbacks: List[core.AgentCallback]):

        """Forwarding to train_implementation overriden by the subclass

            Args:

                train_context: training configuration to be used

                callbacks: list of callbacks called during the training and evaluation.

        """

        assert callbacks is not None, "callbacks not set"

        assert train_context, "train_context not set"

        train_context._reset()

        train_context._validate()

        self._agent_context.train = train_context

        self._agent_context.play = None

        self._callbacks = callbacks

        try:

            self.log_api(f'backend_name', f'{self._backend_name}')

            self._set_seed()

            monitor._MonitorEnv._register_backend_agent(self)

            self._on_train_begin()

            self._agent_context._is_policy_trained = True

            self.train_implementation(self._agent_context.train)

            self._on_train_end()

        finally:

            monitor._MonitorEnv._register_backend_agent(None)

            self._callbacks = None

            self._agent_context.play = None

            self._agent_context.train = None
train_implementation
def train_implementation(
    self,
    train_context: easyagents.core.TrainContext
)

Agent specific implementation of the train loop.

The implementation should have the form:

while True: on_iteration_begin for e in num_episodes_per_iterations play episode and record steps (while steps_in_episode < max_steps_per_episode and) train policy for num_epochs_per_iteration epochs on_iteration_end( loss ) if training_done break

Args: train_context: context configuring the train loop

Hints: o the subclasses training loss is passed through to BackendAgent by on_iteration_end. Thus the subclass must not add the experienced loss to the TrainContext.

View Source
    @abstractmethod

    def train_implementation(self, train_context: core.TrainContext):

        """Agent specific implementation of the train loop.

            The implementation should have the form:

            while True:

                on_iteration_begin

                for e in num_episodes_per_iterations

                    play episode and record steps (while steps_in_episode < max_steps_per_episode and)

                train policy for num_epochs_per_iteration epochs

                on_iteration_end( loss )

                if training_done

                    break

            Args:

                train_context: context configuring the train loop

            Hints:

            o the subclasses training loss is passed through to BackendAgent by on_iteration_end.

              Thus the subclass must not add the experienced loss to the TrainContext.

        """

BackendAgentFactory

class BackendAgentFactory(
    /,
    *args,
    **kwargs
)

Backend agent factory defining the currently available agents (algorithms).

View Source
class BackendAgentFactory(ABC):

    """Backend agent factory defining the currently available agents (algorithms).

    """

    backend_name: str = 'abstract_BackendAgentFactory'

    def create_agent(self, easyagent_type: Type, model_config: core.ModelConfig) \

            -> Optional[_BackendAgent]:

        """Creates a backend agent instance implementing the algorithm given by agent_type.

        Args:

            easyagent_type: the EasyAgent derived type for which an implementing backend instance will be created

            model_config: the model_config passed to the constructor of the backend instance.

        Returns:

            instance of the agent or None if not implemented by this backend.

        """

        result: Optional[_BackendAgent] = None

        algorithms = self.get_algorithms()

        if easyagent_type in algorithms:

            result = algorithms[easyagent_type](model_config=model_config)

        return result

    def get_algorithms(self) -> Dict[Type, Type[_BackendAgent]]:

        """Yields a mapping of EasyAgent types to the implementations provided by this backend."""

        return {}

Ancestors (in MRO)

  • abc.ABC

Descendants

  • easyagents.backends.default.DefaultAgentFactory
  • easyagents.backends.tfagents.TfAgentAgentFactory

Class variables

backend_name

Methods

create_agent
def create_agent(
    self,
    easyagent_type: Type,
    model_config: easyagents.core.ModelConfig
) -> Union[easyagents.backends.core._BackendAgent, NoneType]

Creates a backend agent instance implementing the algorithm given by agent_type.

Args: easyagent_type: the EasyAgent derived type for which an implementing backend instance will be created model_config: the model_config passed to the constructor of the backend instance.

Returns: instance of the agent or None if not implemented by this backend.

View Source
    def create_agent(self, easyagent_type: Type, model_config: core.ModelConfig) \

            -> Optional[_BackendAgent]:

        """Creates a backend agent instance implementing the algorithm given by agent_type.

        Args:

            easyagent_type: the EasyAgent derived type for which an implementing backend instance will be created

            model_config: the model_config passed to the constructor of the backend instance.

        Returns:

            instance of the agent or None if not implemented by this backend.

        """

        result: Optional[_BackendAgent] = None

        algorithms = self.get_algorithms()

        if easyagent_type in algorithms:

            result = algorithms[easyagent_type](model_config=model_config)

        return result
get_algorithms
def get_algorithms(
    self
) -> Dict[Type, Type[easyagents.backends.core._BackendAgent]]

Yields a mapping of EasyAgent types to the implementations provided by this backend.

View Source
    def get_algorithms(self) -> Dict[Type, Type[_BackendAgent]]:

        """Yields a mapping of EasyAgent types to the implementations provided by this backend."""

        return {}