{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"
"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"# Tutorial 3: Neural network modularity\n",
"\n",
"**Week 2, Day 1: Macrocircuits**\n",
"\n",
"**By Neuromatch Academy**\n",
"\n",
"__Content creators:__ Ruiyi Zhang\n",
"\n",
"__Content reviewers:__ Xaq Pitkow, Hlib Solodzhuk, Patrick Mineault\n",
"\n",
"__Production editors:__ Konstantine Tsafatinos, Ella Batty, Spiros Chavlis, Samuele Bolotta, Hlib Solodzhuk"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"___\n",
"\n",
"# Tutorial objectives\n",
"\n",
"*Estimated timing of tutorial: 1 hour*\n",
"\n",
"This tutorial will exemplify the importance of modularity in neural network architecture. We will train deep reinforcement learning (RL) agents with two types of neural network architectures, one modular and the other holistic, in a neuroscience navigation task.\n",
"\n",
"As you have learned in previous lectures, better learning and generalization are important benefits of an appropriate inductive bias. This applies to both biological and artificial learning systems. Indeed, it has been shown that macaques trained in this navigation task can master the training task well and generalize to novel tasks derived from it. Since the brain is quite modular, it will be interesting to see if artificial models with a more modular architecture also allow for better learning and generalization than those with a less modular, holistic architecture.\n",
"\n",
"Our learning objectives for today are:\n",
"\n",
"1. Build RL agents with different neural architectures for a spatial\n",
"navigation task.\n",
"2. Compare the differences in **learning** between RL agents with different architectures.\n",
"3. Compare the differences in **generalization** between RL agents with different architectures.\n",
"4. Use neural decoding to understand why the modular architecture for this specific task is advantageous.\n",
"5. Keep the No-Free-Lunch Theorem in mind: the benefits of a modular architecture for one task cannot apply to all possible tasks.\n",
"\n",
"This tutorial is based on this [paper](https://www.science.org/doi/10.1126/sciadv.adk1256)."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"remove-input"
]
},
"outputs": [],
"source": [
"# @markdown\n",
"from IPython.display import IFrame\n",
"from ipywidgets import widgets\n",
"out = widgets.Output()\n",
"with out:\n",
" print(f\"If you want to download the slides: https://osf.io/download/9n4fj/\")\n",
" display(IFrame(src=f\"https://mfr.ca-1.osf.io/render?url=https://osf.io/9n4fj/?direct%26mode=render%26action=download%26mode=render\", width=730, height=410))\n",
"display(out)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"---\n",
"# Setup\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Install and import feedback gadget\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Install and import feedback gadget\n",
"\n",
"!pip install vibecheck datatops --quiet\n",
"!pip install pandas~=2.0.0 --quiet\n",
"\n",
"from vibecheck import DatatopsContentReviewContainer\n",
"def content_review(notebook_section: str):\n",
" return DatatopsContentReviewContainer(\n",
" \"\", # No text prompt\n",
" notebook_section,\n",
" {\n",
" \"url\": \"https://pmyvdlilci.execute-api.us-east-1.amazonaws.com/klab\",\n",
" \"name\": \"neuromatch_neuroai\",\n",
" \"user_key\": \"wb2cxze8\",\n",
" },\n",
" ).render()\n",
"\n",
"feedback_prefix = \"W2D1_T3\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Imports\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Imports\n",
"\n",
"#plotting\n",
"import matplotlib.pyplot as plt\n",
"from matplotlib.ticker import FuncFormatter\n",
"from matplotlib.patches import Circle\n",
"\n",
"#modeling\n",
"from scipy.stats import sem\n",
"from sklearn.linear_model import LinearRegression, Ridge\n",
"import torch\n",
"import torch.nn as nn\n",
"import torch.nn.functional as F\n",
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"#utils\n",
"from tqdm.notebook import tqdm\n",
"import pickle\n",
"import logging\n",
"import os\n",
"from pathlib import Path\n",
"import requests\n",
"import hashlib\n",
"import zipfile"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Figure settings\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Figure settings\n",
"\n",
"logging.getLogger('matplotlib.font_manager').disabled = True\n",
"\n",
"%matplotlib inline\n",
"%config InlineBackend.figure_format = 'retina' # perfrom high definition rendering for images and plots\n",
"plt.style.use(\"https://raw.githubusercontent.com/NeuromatchAcademy/course-content/main/nma.mplstyle\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Helper functions\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Helper functions\n",
"\n",
"def my_tickformatter(value, pos):\n",
" if abs(value) > 0 and abs(value) < 1:\n",
" value = str(value).replace('0.', '.').replace('-', '\\u2212')\n",
" elif value == 0:\n",
" value = 0\n",
" elif int(value) == value:\n",
" value = int(value)\n",
" return value\n",
"\n",
"def cart2pol(x, y):\n",
" rho = np.sqrt(x**2 + y**2)\n",
" phi = np.arctan2(y, x)\n",
" return rho, phi\n",
"\n",
"def get_neural_response(agent, df):\n",
" responses = []\n",
" with torch.no_grad():\n",
" for _, trial in df.iterrows():\n",
" response = agent.actor.rnn(trial.state_input)[0]\n",
" responses.append(response.squeeze(1))\n",
" df['response'] = responses\n",
"\n",
"def fit_decoder(trajectory, variables=['pos_x', 'pos_y'], train_frac=0.7):\n",
" key = 'response'\n",
" train_trajectory = trajectory[:round(len(trajectory) * train_frac)]\n",
" train_X = np.vstack(train_trajectory[key])\n",
" test_trajectory = trajectory[round(len(trajectory) * train_frac):]\n",
" test_X = np.vstack(test_trajectory[key])\n",
"\n",
" y = train_trajectory[variables].values\n",
" train_y = np.vstack([np.hstack([v for v in y[:, i]]) for i in range(y.shape[1])]).T\n",
" y = test_trajectory[variables].values\n",
" test_y = np.vstack([np.hstack([v for v in y[:, i]]) for i in range(y.shape[1])]).T\n",
"\n",
" decoder = Ridge(alpha=0.1)\n",
" decoder.fit(train_X, train_y)\n",
"\n",
" return decoder, test_X, test_y\n",
"\n",
"def filter_fliers(data, whis=1.5, return_idx=False):\n",
" if not isinstance(data, list):\n",
" data = [data]\n",
" filtered_data = []; data_ides = []\n",
" for value in data:\n",
" Q1, Q2, Q3 = np.percentile(value, [25, 50, 75])\n",
" lb = Q1 - whis * (Q3 - Q1); ub = Q3 + whis * (Q3 - Q1)\n",
" filtered_data.append(value[(value > lb) & (value < ub)])\n",
" data_ides.append(np.where((value > lb) & (value < ub))[0])\n",
" if return_idx:\n",
" return filtered_data, data_ides\n",
" else:\n",
" return filtered_data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Plotting functions\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Plotting functions\n",
"\n",
"major_formatter = FuncFormatter(my_tickformatter)\n",
"modular_c = 'lightseagreen'; holistic_c = 'salmon'\n",
"reward_c = 'rosybrown'; unreward_c = 'dodgerblue'\n",
"\n",
"fontsize = 7\n",
"lw = 1\n",
"\n",
"def set_violin_plot(vp, facecolor, edgecolor, linewidth=1, alpha=1, ls='-', hatch=r''):\n",
" with plt.xkcd():\n",
" plt.setp(vp['bodies'], facecolor=facecolor, edgecolor=edgecolor,\n",
" linewidth=linewidth, alpha=alpha ,ls=ls, hatch=hatch)\n",
" plt.setp(vp['cmins'], facecolor=facecolor, edgecolor=edgecolor,\n",
" linewidth=linewidth, alpha=alpha)\n",
" plt.setp(vp['cmaxes'], facecolor=facecolor, edgecolor=edgecolor,\n",
" linewidth=linewidth, alpha=alpha)\n",
" plt.setp(vp['cbars'], facecolor='None', edgecolor='None',\n",
" linewidth=linewidth, alpha=alpha)\n",
"\n",
" linecolor = 'k' if facecolor == 'None' else 'snow'\n",
" if 'cmedians' in vp:\n",
" plt.setp(vp['cmedians'], facecolor=linecolor, edgecolor=linecolor,\n",
" linewidth=linewidth, alpha=alpha, ls=ls)\n",
" if 'cmeans' in vp:\n",
" plt.setp(vp['cmeans'], facecolor=linecolor, edgecolor=linecolor,\n",
" linewidth=linewidth, alpha=alpha, ls=ls)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set random seed\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Set random seed\n",
"\n",
"import random\n",
"import numpy as np\n",
"\n",
"def set_seed(seed=None, seed_torch=True):\n",
" if seed is None:\n",
" seed = np.random.choice(2 ** 32)\n",
" random.seed(seed)\n",
" np.random.seed(seed)\n",
" if seed_torch:\n",
" torch.manual_seed(seed)\n",
" torch.cuda.manual_seed_all(seed)\n",
" torch.cuda.manual_seed(seed)\n",
" torch.backends.cudnn.benchmark = False\n",
" torch.backends.cudnn.deterministic = True\n",
"\n",
"set_seed(seed = 42)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data retrieval\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Data retrieval\n",
"\n",
"def download_file(fname, url, expected_md5):\n",
" \"\"\"\n",
" Downloads a file from the given URL and saves it locally.\n",
" \"\"\"\n",
" if not os.path.isfile(fname):\n",
" try:\n",
" r = requests.get(url)\n",
" except requests.ConnectionError:\n",
" print(\"!!! Failed to download data !!!\")\n",
" return\n",
" if r.status_code != requests.codes.ok:\n",
" print(\"!!! Failed to download data !!!\")\n",
" return\n",
" if hashlib.md5(r.content).hexdigest() != expected_md5:\n",
" print(\"!!! Data download appears corrupted !!!\")\n",
" return\n",
" with open(fname, \"wb\") as fid:\n",
" fid.write(r.content)\n",
"\n",
"def extract_zip(zip_fname):\n",
" \"\"\"\n",
" Extracts a ZIP file to the current directory.\n",
" \"\"\"\n",
" with zipfile.ZipFile(zip_fname, 'r') as zip_ref:\n",
" zip_ref.extractall(\".\")\n",
"\n",
"# Details for the zip files to be downloaded and extracted\n",
"zip_files = [\n",
" {\n",
" \"fname\": \"agents.zip\",\n",
" \"url\": \"https://osf.io/v9xqp/download\",\n",
" \"expected_md5\": \"2cd35da7ea34e10e6ed2e7c983f0b908\"\n",
" },\n",
" {\n",
" \"fname\": \"training_curve.zip\",\n",
" \"url\": \"https://osf.io/9kjy4/download\",\n",
" \"expected_md5\": \"eb7e07398aa12bd0fd9cf507d9a142c6\"\n",
" }\n",
"]\n",
"\n",
"# New addition for other files to be downloaded, specifically non-zip files\n",
"model_files = [\n",
" {\n",
" \"fname\": \"holistic_dfs.pkl\",\n",
" \"url\": \"https://osf.io/9h7tq/download\",\n",
" \"expected_md5\": \"92d0adb175724641590a611c48d721cc\"\n",
" },\n",
" {\n",
" \"fname\": \"holistic_dfs_2x.pkl\",\n",
" \"url\": \"https://osf.io/ybdmp/download\",\n",
" \"expected_md5\": \"173e13bea9c2bbe8737a40e0d36063d4\"\n",
" },\n",
" {\n",
" \"fname\": \"modular_dfs.pkl\",\n",
" \"url\": \"https://osf.io/apkym/download\",\n",
" \"expected_md5\": \"38a4603464b3e4351bd56625d24d5e16\"\n",
" },\n",
" {\n",
" \"fname\": \"modular_dfs_2x.pkl\",\n",
" \"url\": \"https://osf.io/wqbe2/download\",\n",
" \"expected_md5\": \"5f19bafa3308f25ca163c627b7d9f63f\"\n",
" }\n",
"]\n",
"\n",
"# Process zip files: download and extract\n",
"for zip_file in zip_files:\n",
" download_file(zip_file[\"fname\"], zip_file[\"url\"], zip_file[\"expected_md5\"])\n",
" extract_zip(zip_file[\"fname\"])\n",
"\n",
"# Process model files: download only\n",
"for model_file in model_files:\n",
" download_file(model_file[\"fname\"], model_file[\"url\"], model_file[\"expected_md5\"])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Video 1: Introduction\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"remove-input"
]
},
"outputs": [],
"source": [
"# @title Video 1: Introduction\n",
"\n",
"from ipywidgets import widgets\n",
"from IPython.display import YouTubeVideo\n",
"from IPython.display import IFrame\n",
"from IPython.display import display\n",
"\n",
"class PlayVideo(IFrame):\n",
" def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n",
" self.id = id\n",
" if source == 'Bilibili':\n",
" src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n",
" elif source == 'Osf':\n",
" src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n",
" super(PlayVideo, self).__init__(src, width, height, **kwargs)\n",
"\n",
"def display_videos(video_ids, W=400, H=300, fs=1):\n",
" tab_contents = []\n",
" for i, video_id in enumerate(video_ids):\n",
" out = widgets.Output()\n",
" with out:\n",
" if video_ids[i][0] == 'Youtube':\n",
" video = YouTubeVideo(id=video_ids[i][1], width=W,\n",
" height=H, fs=fs, rel=0)\n",
" print(f'Video available at https://youtube.com/watch?v={video.id}')\n",
" else:\n",
" video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n",
" height=H, fs=fs, autoplay=False)\n",
" if video_ids[i][0] == 'Bilibili':\n",
" print(f'Video available at https://www.bilibili.com/video/{video.id}')\n",
" elif video_ids[i][0] == 'Osf':\n",
" print(f'Video available at https://osf.io/{video.id}')\n",
" display(video)\n",
" tab_contents.append(out)\n",
" return tab_contents\n",
"\n",
"video_ids = [('Youtube', 'Q9Ja6rm8xqk'), ('Bilibili', 'BV1T4421Q7gH')]\n",
"tab_contents = display_videos(video_ids, W=730, H=410)\n",
"tabs = widgets.Tab()\n",
"tabs.children = tab_contents\n",
"for i in range(len(tab_contents)):\n",
" tabs.set_title(i, video_ids[i][0])\n",
"display(tabs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_introduction\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"---\n",
"# Section 1: RL agents in a spatial navigation task"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We will use a naturalistic virtual navigation task to train and test RL agents. This task was previously used to investigate the neural computations underlying macaques' flexible behaviors."
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"## Task setup"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Video 2: Task Setup\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"remove-input"
]
},
"outputs": [],
"source": [
"# @title Video 2: Task Setup\n",
"\n",
"from ipywidgets import widgets\n",
"from IPython.display import YouTubeVideo\n",
"from IPython.display import IFrame\n",
"from IPython.display import display\n",
"\n",
"class PlayVideo(IFrame):\n",
" def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n",
" self.id = id\n",
" if source == 'Bilibili':\n",
" src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n",
" elif source == 'Osf':\n",
" src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n",
" super(PlayVideo, self).__init__(src, width, height, **kwargs)\n",
"\n",
"def display_videos(video_ids, W=400, H=300, fs=1):\n",
" tab_contents = []\n",
" for i, video_id in enumerate(video_ids):\n",
" out = widgets.Output()\n",
" with out:\n",
" if video_ids[i][0] == 'Youtube':\n",
" video = YouTubeVideo(id=video_ids[i][1], width=W,\n",
" height=H, fs=fs, rel=0)\n",
" print(f'Video available at https://youtube.com/watch?v={video.id}')\n",
" else:\n",
" video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n",
" height=H, fs=fs, autoplay=False)\n",
" if video_ids[i][0] == 'Bilibili':\n",
" print(f'Video available at https://www.bilibili.com/video/{video.id}')\n",
" elif video_ids[i][0] == 'Osf':\n",
" print(f'Video available at https://osf.io/{video.id}')\n",
" display(video)\n",
" tab_contents.append(out)\n",
" return tab_contents\n",
"\n",
"video_ids = [('Youtube', 'lGccgy-l3ck'), ('Bilibili', 'BV14D421u7Gz')]\n",
"tab_contents = display_videos(video_ids, W=730, H=410)\n",
"tabs = widgets.Tab()\n",
"tabs.children = tab_contents\n",
"for i in range(len(tab_contents)):\n",
" tabs.set_title(i, video_ids[i][0])\n",
"display(tabs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_task_setup\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"At the beginning of each trial, the subject is situated at the center of the ground plane facing forward; a target is presented at a random location within the field of view (distance: $100$ to $400$ cm, angle: $-35$ to $+35^{\\circ}$) on the ground plane and disappears after $300$ ms. The subject can freely control its linear and angular velocities with a joystick (maximum: $200$ cm/s and $90^{\\circ}$/s, referred to as the joystick gain) to move along its heading in the virtual environment. The objective is to navigate toward the memorized target location and then stop inside the reward zone, a circular region centered at the target location with a radius of $65$ cm. A reward is given only if the subject stops inside the reward zone (see figure below).\n",
"\n",
"The subject's self-location is not directly observable because there are no stable landmarks; instead, the subject needs to use optic flow cues on the ground plane to perceive self-motion and perform path integration. Each textural element of the optic flow, an isosceles triangle, appears at random locations and orientations, disappearing after only a short lifetime, making it impossible to use as a stable landmark. A new trial starts after the subject stops moving."
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"
"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"**Task modeling**"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Video 3: Task Modeling\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"remove-input"
]
},
"outputs": [],
"source": [
"# @title Video 3: Task Modeling\n",
"\n",
"from ipywidgets import widgets\n",
"from IPython.display import YouTubeVideo\n",
"from IPython.display import IFrame\n",
"from IPython.display import display\n",
"\n",
"class PlayVideo(IFrame):\n",
" def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n",
" self.id = id\n",
" if source == 'Bilibili':\n",
" src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n",
" elif source == 'Osf':\n",
" src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n",
" super(PlayVideo, self).__init__(src, width, height, **kwargs)\n",
"\n",
"def display_videos(video_ids, W=400, H=300, fs=1):\n",
" tab_contents = []\n",
" for i, video_id in enumerate(video_ids):\n",
" out = widgets.Output()\n",
" with out:\n",
" if video_ids[i][0] == 'Youtube':\n",
" video = YouTubeVideo(id=video_ids[i][1], width=W,\n",
" height=H, fs=fs, rel=0)\n",
" print(f'Video available at https://youtube.com/watch?v={video.id}')\n",
" else:\n",
" video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n",
" height=H, fs=fs, autoplay=False)\n",
" if video_ids[i][0] == 'Bilibili':\n",
" print(f'Video available at https://www.bilibili.com/video/{video.id}')\n",
" elif video_ids[i][0] == 'Osf':\n",
" print(f'Video available at https://osf.io/{video.id}')\n",
" display(video)\n",
" tab_contents.append(out)\n",
" return tab_contents\n",
"\n",
"video_ids = [('Youtube', 'tVBo-h0Va3I'), ('Bilibili', 'BV1n4421Q7n3')]\n",
"tab_contents = display_videos(video_ids, W=730, H=410)\n",
"tabs = widgets.Tab()\n",
"tabs.children = tab_contents\n",
"for i in range(len(tab_contents)):\n",
" tabs.set_title(i, video_ids[i][0])\n",
"display(tabs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_task_modeling\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We formulate this task as a Partially Observable Markov Decision Process (POMDP) in discrete time, with continuous state and action spaces (see figure below). At each time step $t$, the environment is in the state $\\boldsymbol{s}_t$ (including the agent's position and velocity and the target's position). The agent takes an action $\\boldsymbol{a}_t$ (controlling its linear and angular velocities) to update $\\boldsymbol{s}_t$ to the next state $\\boldsymbol{s}_{t+1}$ following the environmental dynamics given by the transition probability $T(\\boldsymbol{s}_{t+1}|\\boldsymbol{s}_{t},\\boldsymbol{a}_{t})$, and receives a reward $r_t$ from the environment following the reward function $R(\\boldsymbol{s}_{t},\\boldsymbol{a}_{t})$ ($1$ if the agent stops inside the reward zone otherwise $0$).\n",
"\n",
"
\n",
"\n",
"We use a model-free **actor**-**critic** approach to learning, with the actor and critic implemented using distinct neural networks:\n",
"\n",
"* *The actor*: At each $t$, the actor receives two sources of inputs $\\boldsymbol{i}_{t}$ about the state: observation $\\boldsymbol{o}_t$ and last action $\\boldsymbol{a}_{t-1}$. It then outputs an action $\\boldsymbol{a}_{t}$, aiming to maximize the state-action value $Q_t$. This value is a function of the state and action, representing the expected discounted rewards when an action is taken at a state, and future rewards are then accumulated from $t$ until the trial's last step. \n",
"* *The critic*: Since the ground truth value is unknown, the critic is used to approximate the value. In addition to receiving the same inputs as the actor to infer the state, the critic also takes as inputs the action $\\boldsymbol{a}_t$ taken by the actor in this state. It then outputs the estimated $Q_t$ for this action, trained through the temporal-difference reward prediction error (TD error) after receiving the reward $r_t$ $(|r_t+\\gamma Q_{t+1}-Q_{t}|$, where $\\gamma$ denotes the temporal discount factor).\n",
"\n",
"The state $\\boldsymbol{s}_t$ is not fully observable, so the agent must maintain an internal state representation (belief $b_t$) for deciding $\\boldsymbol{a}_{t}$ and $Q_t$. Both actor and critic undergo end-to-end training through back-propagation without explicit objectives for shaping $b_t$. Consequently, networks are free to learn diverse forms of $b_t$ encoded in their neural activities that aid them in achieving their learning objectives. Ideally, networks may develop an effective belief update rule, using the two sources of evidence in the inputs $\\boldsymbol{i}_t=\\{\\boldsymbol{o}_t, \\boldsymbol{a}_{t-1}\\}$. They may predict the state $\\boldsymbol{s}_t$ based on its internal model of the dynamics, its previous belief $b_{t-1}$, and the last self-action $\\boldsymbol{a}_{t-1}$ (e.g., a motor efference copy). The second source is a partial and noisy observation $\\boldsymbol{o}_t$ of $\\boldsymbol{s}_t$ drawn from the observation probability $O(\\boldsymbol{o}_t|\\boldsymbol{s}_t)$. Note that the actual $O$ in the brain for this task is unknown. For simplicity, we model $\\boldsymbol{o}_t$ as a low-dimensional vector, including the target's location when visible (the first $300$ ms, $\\Delta t=0.1$ s), and the agent's observation of its velocities through optic flow, with velocities subject to Gaussian additive noise."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Video 4: Task Parameters\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"remove-input"
]
},
"outputs": [],
"source": [
"# @title Video 4: Task Parameters\n",
"\n",
"from ipywidgets import widgets\n",
"from IPython.display import YouTubeVideo\n",
"from IPython.display import IFrame\n",
"from IPython.display import display\n",
"\n",
"class PlayVideo(IFrame):\n",
" def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n",
" self.id = id\n",
" if source == 'Bilibili':\n",
" src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n",
" elif source == 'Osf':\n",
" src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n",
" super(PlayVideo, self).__init__(src, width, height, **kwargs)\n",
"\n",
"def display_videos(video_ids, W=400, H=300, fs=1):\n",
" tab_contents = []\n",
" for i, video_id in enumerate(video_ids):\n",
" out = widgets.Output()\n",
" with out:\n",
" if video_ids[i][0] == 'Youtube':\n",
" video = YouTubeVideo(id=video_ids[i][1], width=W,\n",
" height=H, fs=fs, rel=0)\n",
" print(f'Video available at https://youtube.com/watch?v={video.id}')\n",
" else:\n",
" video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n",
" height=H, fs=fs, autoplay=False)\n",
" if video_ids[i][0] == 'Bilibili':\n",
" print(f'Video available at https://www.bilibili.com/video/{video.id}')\n",
" elif video_ids[i][0] == 'Osf':\n",
" print(f'Video available at https://osf.io/{video.id}')\n",
" display(video)\n",
" tab_contents.append(out)\n",
" return tab_contents\n",
"\n",
"video_ids = [('Youtube', '9fbpumGWm4A'), ('Bilibili', 'BV1Un4y197Xr')]\n",
"tab_contents = display_videos(video_ids, W=730, H=410)\n",
"tabs = widgets.Tab()\n",
"tabs.children = tab_contents\n",
"for i in range(len(tab_contents)):\n",
" tabs.set_title(i, video_ids[i][0])\n",
"display(tabs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_task_parameters\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Given this task modeling, we first specify some parameters."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Parameters definition\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Parameters definition\n",
"\n",
"class Config():\n",
" def __init__(self):\n",
" self.STATE_DIM = 5 # dimension of agent's state: x position, y position, heading, linear vel. v, angular vel. w\n",
" self.ACTION_DIM = 2 # dimension of agent's action: action_v, action_w\n",
" self.OBS_DIM = 2 # dimension of agent's observation of its movement: observation_v, observation_w\n",
" self.TARGET_DIM = 2 # dimension of target's position: target_x, target_y\n",
"\n",
" self.TERMINAL_ACTION = 0.1 # start/stop threshold\n",
" self.DT = 0.1 # discretization time step\n",
" self.EPISODE_TIME = 3.5 # max trial duration in seconds\n",
" self.EPISODE_LEN = int(self.EPISODE_TIME / self.DT) # max trial duration in steps\n",
"\n",
" self.LINEAR_SCALE = 400 # cm/unit\n",
" self.goal_radius = torch.tensor([65]) / self.LINEAR_SCALE # reward zone radius\n",
" self.initial_radius_range = np.array([100, 400]) / self.LINEAR_SCALE # range of target distance\n",
" self.relative_angle_range = np.deg2rad([-35, 35]) # range of target angle\n",
" self.process_gain_default = torch.tensor([200 / self.LINEAR_SCALE, torch.deg2rad(torch.tensor(90.))]) # joystick gain\n",
" self.target_offT = 3 # when target is invisible; by default target is invisible after the first 300 ms\n",
"\n",
" self.pro_noise_std_ = 0.2 # process noise std\n",
" self.obs_noise_std_ = 0.1 # observation noise std"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"## Coding Exercise 1: Task environment"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Video 5: Task Environment\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"remove-input"
]
},
"outputs": [],
"source": [
"# @title Video 5: Task Environment\n",
"\n",
"from ipywidgets import widgets\n",
"from IPython.display import YouTubeVideo\n",
"from IPython.display import IFrame\n",
"from IPython.display import display\n",
"\n",
"class PlayVideo(IFrame):\n",
" def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n",
" self.id = id\n",
" if source == 'Bilibili':\n",
" src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n",
" elif source == 'Osf':\n",
" src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n",
" super(PlayVideo, self).__init__(src, width, height, **kwargs)\n",
"\n",
"def display_videos(video_ids, W=400, H=300, fs=1):\n",
" tab_contents = []\n",
" for i, video_id in enumerate(video_ids):\n",
" out = widgets.Output()\n",
" with out:\n",
" if video_ids[i][0] == 'Youtube':\n",
" video = YouTubeVideo(id=video_ids[i][1], width=W,\n",
" height=H, fs=fs, rel=0)\n",
" print(f'Video available at https://youtube.com/watch?v={video.id}')\n",
" else:\n",
" video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n",
" height=H, fs=fs, autoplay=False)\n",
" if video_ids[i][0] == 'Bilibili':\n",
" print(f'Video available at https://www.bilibili.com/video/{video.id}')\n",
" elif video_ids[i][0] == 'Osf':\n",
" print(f'Video available at https://osf.io/{video.id}')\n",
" display(video)\n",
" tab_contents.append(out)\n",
" return tab_contents\n",
"\n",
"video_ids = [('Youtube', 'v0vSzqgxl3U'), ('Bilibili', 'BV1Mw4m1v7La')]\n",
"tab_contents = display_videos(video_ids, W=730, H=410)\n",
"tabs = widgets.Tab()\n",
"tabs.children = tab_contents\n",
"for i in range(len(tab_contents)):\n",
" tabs.set_title(i, video_ids[i][0])\n",
"display(tabs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_task_environment_video\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We then define the task environment in the following code. You can see from the code how a target is sampled for each trial. You will need to fill in the missing code to define the task dynamics $T(\\boldsymbol{s}_{t+1}|\\boldsymbol{s}_{t},\\boldsymbol{a}_{t})$.\n",
"\n",
"The code for the dynamics implements the following mathematical equations:\n",
"\n",
"$$\n",
"\\begin{equation}\n",
"s_{x_{t+1}}=s_{x_{t}}+s_{v_{t}} \\cos (s_{\\theta_t})\\,\\Delta t\\\\\n",
"\\end{equation}\n",
"$$\n",
"$$\n",
"\\begin{equation}\n",
"s_{y_{t+1}}=s_{y_{t}}+s_{v_{t}} \\sin (s_{\\theta_t})\\,\\Delta t\\\\\n",
"\\end{equation}\n",
"$$\n",
"$$\n",
"\\begin{equation}\n",
"s_{\\theta_{t+1}}=s_{\\theta_{t}}+s_{\\omega_{t}}\\,\\Delta t \\\\\n",
"\\end{equation}\n",
"$$\n",
"$$\n",
"\\begin{equation}\n",
"s_{v_{t+1}}=G_{v}a_{v_t} + \\eta_{v_t} \\\\\n",
"\\end{equation}\n",
"$$\n",
"$$\n",
"\\begin{equation}\n",
"s_{\\omega_{t+1}}=G_{\\omega} a_{\\omega_t} + \\eta_{\\omega_t}\\\\\n",
"\\end{equation}\n",
"$$\n",
"\n",
"where the state elements include:\n",
"* x position $s_{x_{t}}$\n",
"* y position $s_{y_{t}}$\n",
"* heading $s_{\\theta_{t}}$\n",
"* linear velocity $s_{v_{t}}$\n",
"* angular velocity $s_{\\omega_{t}}$\n",
"\n",
"$G_{v}$ and $G_{\\omega}$ are the joystick gains mapping actions to linear and angular velocities. $\\eta_{v_t}$ and $\\eta_{\\omega_t}$ are process noises."
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"execution": {}
},
"source": [
"```python\n",
"class Env(nn.Module):\n",
" def __init__(self, arg):\n",
" \"\"\"\n",
" Initializes the environment with given arguments.\n",
"\n",
" Inputs:\n",
" - arg (object): An object containing initialization parameters.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" super().__init__()\n",
" self.__dict__.update(arg.__dict__)\n",
"\n",
" def reset(self, target_position=None, gain=None):\n",
" \"\"\"\n",
" Resets the environment to start a new trial.\n",
"\n",
" Inputs:\n",
" - target_position (tensor, optional): The target position for the trial. If None, a position is sampled.\n",
" - gain (tensor, optional): The joystick gain. If None, the default gain is used.\n",
"\n",
" Outputs:\n",
" - initial_state (tensor): The initial state of the environment.\n",
" \"\"\"\n",
" # sample target position\n",
" self.target_position = target_position\n",
" if target_position is None:\n",
" target_rel_r = torch.sqrt(torch.zeros(1).uniform_(*self.initial_radius_range**2))\n",
" target_rel_ang = torch.zeros(1).uniform_(*self.relative_angle_range)\n",
" rel_phi = np.pi/2 - target_rel_ang\n",
" target_x = target_rel_r * torch.cos(rel_phi)\n",
" target_y = target_rel_r * torch.sin(rel_phi)\n",
" self.target_position = torch.tensor([target_x, target_y]).view([-1, 1])\n",
" self.target_position_obs = self.target_position.clone()\n",
"\n",
" # joystick gain\n",
" self.gain = gain\n",
" if gain is None:\n",
" self.gain = self.process_gain_default\n",
"\n",
" # process noise std\n",
" self.pro_noise_std = self.gain * self.pro_noise_std_\n",
"\n",
" return torch.tensor([0, 0, np.pi / 2, 0, 0]).view([-1, 1]) # return the initial state\n",
"\n",
" def forward(self, x, a, t):\n",
" \"\"\"\n",
" Updates the state based on the current state, action, and time.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current state of the environment.\n",
" - a (tensor): The action taken.\n",
" - t (int): The current time step.\n",
"\n",
" Outputs:\n",
" - next_x (tensor): The next state of the environment.\n",
" - reached_target (bool): Whether the target has been reached.\n",
" \"\"\"\n",
" if t == self.target_offT:\n",
" self.target_position_obs *= 0 # make target invisible\n",
"\n",
" relative_dist = torch.dist(x[:2], self.target_position)\n",
" reached_target = relative_dist < self.goal_radius\n",
" next_x = self.dynamics(x, a.view(-1)) # update state based on environment dynamics\n",
"\n",
" return next_x, reached_target\n",
"\n",
" def dynamics(self, x, a):\n",
" \"\"\"\n",
" Defines the environment dynamics.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current state of the environment.\n",
" - a (tensor): The action taken.\n",
"\n",
" Outputs:\n",
" - next_x (tensor): The next state of the environment.\n",
" \"\"\"\n",
" # sample process noise\n",
" eta = self.pro_noise_std * torch.randn(2)\n",
"\n",
" # there are five elements in the state\n",
" px, py, heading_angle, lin_vel, ang_vel = torch.split(x.view(-1), 1)\n",
"\n",
" # update state: s_{t+1} = f(s_{t}, a_{t})\n",
" ###################################################################\n",
" ## Fill out the following then remove\n",
" raise NotImplementedError(\"Student exercise: complete update for y position and angular velocity.\")\n",
" ###################################################################\n",
" px_ = px + lin_vel * torch.cos(heading_angle) * self.DT\n",
" # Hint: Mimic how the x position is updated. The y position update is similar,\n",
" # but it requires the sine of 'heading_angle' instead of the cosine.\n",
" py_ = ...\n",
" heading_angle_ = heading_angle + ang_vel * self.DT\n",
" lin_vel_ = self.gain[0] * a[0] + eta[0]\n",
" # Hint: The variables 'self.gain', 'a', and 'eta' are two-dimensional.\n",
" # The first dimension is for the linear component, and the second dimension is for the angular component.\n",
" ang_vel_ = ...\n",
"\n",
" next_x = torch.stack([px_, py_, heading_angle_,\n",
" lin_vel_.reshape(1), ang_vel_.reshape(1)]).view([-1, 1])\n",
" return next_x\n",
"\n",
" def is_stop(self, action):\n",
" \"\"\"\n",
" Determines if the given action is a stop action.\n",
"\n",
" Inputs:\n",
" - action (tensor): The action.\n",
"\n",
" Outputs:\n",
" - stop (bool): Whether the action is a stop action.\n",
" \"\"\"\n",
" stop = (action.abs() < self.TERMINAL_ACTION).all()\n",
" return stop\n",
"\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {}
},
"outputs": [],
"source": [
"#to_remove solution\n",
"class Env(nn.Module):\n",
" def __init__(self, arg):\n",
" \"\"\"\n",
" Initializes the environment with given arguments.\n",
"\n",
" Inputs:\n",
" - arg (object): An object containing initialization parameters.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" super().__init__()\n",
" self.__dict__.update(arg.__dict__)\n",
"\n",
" def reset(self, target_position=None, gain=None):\n",
" \"\"\"\n",
" Resets the environment to start a new trial.\n",
"\n",
" Inputs:\n",
" - target_position (tensor, optional): The target position for the trial. If None, a position is sampled.\n",
" - gain (tensor, optional): The joystick gain. If None, the default gain is used.\n",
"\n",
" Outputs:\n",
" - initial_state (tensor): The initial state of the environment.\n",
" \"\"\"\n",
" # sample target position\n",
" self.target_position = target_position\n",
" if target_position is None:\n",
" target_rel_r = torch.sqrt(torch.zeros(1).uniform_(*self.initial_radius_range**2))\n",
" target_rel_ang = torch.zeros(1).uniform_(*self.relative_angle_range)\n",
" rel_phi = np.pi/2 - target_rel_ang\n",
" target_x = target_rel_r * torch.cos(rel_phi)\n",
" target_y = target_rel_r * torch.sin(rel_phi)\n",
" self.target_position = torch.tensor([target_x, target_y]).view([-1, 1])\n",
" self.target_position_obs = self.target_position.clone()\n",
"\n",
" # joystick gain\n",
" self.gain = gain\n",
" if gain is None:\n",
" self.gain = self.process_gain_default\n",
"\n",
" # process noise std\n",
" self.pro_noise_std = self.gain * self.pro_noise_std_\n",
"\n",
" return torch.tensor([0, 0, np.pi / 2, 0, 0]).view([-1, 1]) # return the initial state\n",
"\n",
" def forward(self, x, a, t):\n",
" \"\"\"\n",
" Updates the state based on the current state, action, and time.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current state of the environment.\n",
" - a (tensor): The action taken.\n",
" - t (int): The current time step.\n",
"\n",
" Outputs:\n",
" - next_x (tensor): The next state of the environment.\n",
" - reached_target (bool): Whether the target has been reached.\n",
" \"\"\"\n",
" if t == self.target_offT:\n",
" self.target_position_obs *= 0 # make target invisible\n",
"\n",
" relative_dist = torch.dist(x[:2], self.target_position)\n",
" reached_target = relative_dist < self.goal_radius\n",
" next_x = self.dynamics(x, a.view(-1)) # update state based on environment dynamics\n",
"\n",
" return next_x, reached_target\n",
"\n",
" def dynamics(self, x, a):\n",
" \"\"\"\n",
" Defines the environment dynamics.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current state of the environment.\n",
" - a (tensor): The action taken.\n",
"\n",
" Outputs:\n",
" - next_x (tensor): The next state of the environment.\n",
" \"\"\"\n",
" # sample process noise\n",
" eta = self.pro_noise_std * torch.randn(2)\n",
"\n",
" # there are five elements in the state\n",
" px, py, heading_angle, lin_vel, ang_vel = torch.split(x.view(-1), 1)\n",
"\n",
" # update state: s_{t+1} = f(s_{t}, a_{t})\n",
" px_ = px + lin_vel * torch.cos(heading_angle) * self.DT\n",
" # Hint: Mimic how the x position is updated. The y position update is similar,\n",
" # but it requires the sine of 'heading_angle' instead of the cosine.\n",
" py_ = py + lin_vel * torch.sin(heading_angle) * self.DT\n",
" heading_angle_ = heading_angle + ang_vel * self.DT\n",
" lin_vel_ = self.gain[0] * a[0] + eta[0]\n",
" # Hint: The variables 'self.gain', 'a', and 'eta' are two-dimensional.\n",
" # The first dimension is for the linear component, and the second dimension is for the angular component.\n",
" ang_vel_ = self.gain[1] * a[1] + eta[1]\n",
"\n",
" next_x = torch.stack([px_, py_, heading_angle_,\n",
" lin_vel_.reshape(1), ang_vel_.reshape(1)]).view([-1, 1])\n",
" return next_x\n",
"\n",
" def is_stop(self, action):\n",
" \"\"\"\n",
" Determines if the given action is a stop action.\n",
"\n",
" Inputs:\n",
" - action (tensor): The action.\n",
"\n",
" Outputs:\n",
" - stop (bool): Whether the action is a stop action.\n",
" \"\"\"\n",
" stop = (action.abs() < self.TERMINAL_ACTION).all()\n",
" return stop"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_task_environment\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"## Coding exercise 2: RL agent"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Agent observation"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Next, we define the observation probability $O(\\boldsymbol{o}_t|\\boldsymbol{s}_t)$ from which the observation $\\boldsymbol{o}_t$ is drawn.\n",
"\n",
"The observation for self-movement is defined as\n",
"\n",
"$$\\boldsymbol{o}_t=H_t \\boldsymbol{s}_t+\\boldsymbol{\\zeta}_t$$\n",
"\n",
"where $\\boldsymbol{\\zeta}_t$ is a zero-mean Gaussian observation noise, and the observation model $H_{t}$ is a matrix that only takes the velocity elements (linear and angular velocities) from the true state, filtering out positional elements as they are unobservable. Essentially, the observation for self-movement is a noisy version of the agent's linear and angular velocities."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Observation dynamics\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Observation dynamics\n",
"\n",
"class ObsStep(nn.Module):\n",
" def __init__(self, arg):\n",
" \"\"\"\n",
" Initializes the observation step with given arguments.\n",
"\n",
" Inputs:\n",
" - arg (object): An object containing the parameters for state dimension, observation dimension, and observation noise standard deviation.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" super().__init__()\n",
" self.STATE_DIM = arg.STATE_DIM\n",
" self.OBS_DIM = arg.OBS_DIM\n",
" self.obs_noise_std_ = arg.obs_noise_std_\n",
"\n",
" # observation matrix\n",
" self.H = torch.zeros(self.OBS_DIM, self.STATE_DIM)\n",
" self.H[0, -2] = 1\n",
" self.H[1, -1] = 1\n",
"\n",
" def reset(self, gain):\n",
" \"\"\"\n",
" Resets the observation noise standard deviation based on the given gain.\n",
"\n",
" Inputs:\n",
" - gain (tensor): The gain used to scale the observation noise.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" self.obs_noise_std = self.obs_noise_std_ * gain\n",
"\n",
" def forward(self, x):\n",
" \"\"\"\n",
" Computes the observation based on the current state and observation noise.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current state of the environment.\n",
"\n",
" Outputs:\n",
" - o_t (tensor): The observed state with added noise.\n",
" \"\"\"\n",
" zeta = (self.obs_noise_std * torch.randn(self.OBS_DIM)).view([-1, 1])\n",
" o_t = self.H @ x + zeta\n",
" return o_t"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Actor-critic RL agent"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Video 6: RL Agent\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"remove-input"
]
},
"outputs": [],
"source": [
"# @title Video 6: RL Agent\n",
"\n",
"from ipywidgets import widgets\n",
"from IPython.display import YouTubeVideo\n",
"from IPython.display import IFrame\n",
"from IPython.display import display\n",
"\n",
"class PlayVideo(IFrame):\n",
" def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n",
" self.id = id\n",
" if source == 'Bilibili':\n",
" src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n",
" elif source == 'Osf':\n",
" src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n",
" super(PlayVideo, self).__init__(src, width, height, **kwargs)\n",
"\n",
"def display_videos(video_ids, W=400, H=300, fs=1):\n",
" tab_contents = []\n",
" for i, video_id in enumerate(video_ids):\n",
" out = widgets.Output()\n",
" with out:\n",
" if video_ids[i][0] == 'Youtube':\n",
" video = YouTubeVideo(id=video_ids[i][1], width=W,\n",
" height=H, fs=fs, rel=0)\n",
" print(f'Video available at https://youtube.com/watch?v={video.id}')\n",
" else:\n",
" video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n",
" height=H, fs=fs, autoplay=False)\n",
" if video_ids[i][0] == 'Bilibili':\n",
" print(f'Video available at https://www.bilibili.com/video/{video.id}')\n",
" elif video_ids[i][0] == 'Osf':\n",
" print(f'Video available at https://osf.io/{video.id}')\n",
" display(video)\n",
" tab_contents.append(out)\n",
" return tab_contents\n",
"\n",
"video_ids = [('Youtube', '1E-AjvSM2kw'), ('Bilibili', 'BV1ub421v7JG')]\n",
"tab_contents = display_videos(video_ids, W=730, H=410)\n",
"tabs = widgets.Tab()\n",
"tabs.children = tab_contents\n",
"for i in range(len(tab_contents)):\n",
" tabs.set_title(i, video_ids[i][0])\n",
"display(tabs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_rl_agent_video\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Each RL agent requires an actor and a critic network. Actor and critic networks can have a variety of architectures. Our goal here is to investigate whether functionally specialized modules provide advantages for our task. Therefore, we designed architectures incorporating modules with distinct levels of specialization for comparison. The first architecture is a **holistic** actor/critic, comprising a single module where all neurons jointly compute the belief $b_t$ and the action $\\boldsymbol{a}_t$/value $Q_t$. In contrast, the second architecture is a **modular** actor/critic, featuring modules specialized in computing different variables (see the figure below).\n",
"\n",
"The specialization of each module is determined as follows.\n",
"\n",
"First, we can confine the computation of beliefs. Since computing beliefs about the evolving state requires integrating evidence over time, a network capable of computing belief must possess some form of memory. Recurrent neural networks (RNNs) satisfy this requirement by using a hidden state that evolves over time. In contrast, computations of value and action do not need additional memory when the belief is provided, making memoryless multi-layer perceptrons (MLPs) sufficient. Consequently, adopting an architecture with an RNN followed by a memoryless MLP (modular actor/critic) ensures that the computation of belief is exclusively confined to the RNN.\n",
"\n",
"Second, we can confine the computation of the state-action value $Q_t$ for the critic. Since a critic is trained end-to-end to compute $Q_t$, stacking two modules between all inputs and outputs does not limit the computation of $Q_t$ to a specific module. However, since $Q_t$ is a function of the action $\\boldsymbol{a}_t$, we can confine the computation of $Q_t$ to the second module of the modular critic by supplying $\\boldsymbol{a}_t$ only to the second module. This ensures that the first module, lacking access to the action, cannot accurately compute $Q_t$. Therefore, the modular critic's RNN is dedicated to computing $b_t$ and sends it to the MLP dedicated to computing $Q_t$. This architecture enforces modularity.\n",
"\n",
"Besides the critic, the modular actor has higher specialization than the holistic actor, which lacks confined $b_t$ computation. Thought bubbles in the figure below denote the variables that can be **computed** within each module enforced through architecture rather than indicating they are **encoded** in each module. For example, $b_t$ in modular architectures is passed to the second module, but an accurate $b_t$ computation can only be completed in the first RNN module."
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"
"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We will compare two agents: the modular agent, which uses modular actor and modular critic networks, and the holistic agent, which uses holistic actor and holistic critic networks.\n",
"\n",
"For simplicity, we will only present the code for actors here."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"class Agent():\n",
" def __init__(self, arg, Actor):\n",
" \"\"\"\n",
" Initializes the agent with given arguments and an actor model.\n",
"\n",
" Inputs:\n",
" - arg (object): An object containing initialization parameters such as observation dimension, action dimension, and target dimension.\n",
" - Actor (class): The actor model class to be used by the agent.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" self.__dict__.update(arg.__dict__)\n",
"\n",
" self.actor = Actor(self.OBS_DIM, self.ACTION_DIM, self.TARGET_DIM)\n",
" self.obs_step = ObsStep(arg)\n",
"\n",
" def select_action(self, state_input, hidden_in):\n",
" \"\"\"\n",
" Selects an action based on the current state input and hidden state.\n",
"\n",
" Inputs:\n",
" - state_input (tensor): The current state input to the actor model.\n",
" - hidden_in (tensor): The hidden state input to the actor model.\n",
"\n",
" Outputs:\n",
" - action (tensor): The action selected by the actor model.\n",
" - hidden_out (tensor): The updated hidden state from the actor model.\n",
" \"\"\"\n",
" with torch.no_grad():\n",
" action, hidden_out = self.actor(state_input, hidden_in)\n",
" return action, hidden_out\n",
"\n",
" def load(self, data_path, filename):\n",
" \"\"\"\n",
" Loads the actor model parameters from a file.\n",
"\n",
" Inputs:\n",
" - data_path (Path): The path to the directory containing the file.\n",
" - filename (str): The name of the file to load the parameters from.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" self.filename = filename\n",
" file = data_path / f'{self.filename}.tar'\n",
" params = torch.load(file)\n",
" self.actor.load_state_dict(params['actor_dict'])"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Holistic actor"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We define the holistic actor as follows. Fill in the missing code to define the holistic architecture."
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"execution": {}
},
"source": [
"```python\n",
"class HolisticActor(nn.Module):\n",
" def __init__(self, OBS_DIM, ACTION_DIM, TARGET_DIM):\n",
" \"\"\"\n",
" Initializes the holistic actor model with given dimensions.\n",
"\n",
" Inputs:\n",
" - OBS_DIM (int): The dimension of the observation input.\n",
" - ACTION_DIM (int): The dimension of the action output.\n",
" - TARGET_DIM (int): The dimension of the target input.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" super().__init__()\n",
" self.OBS_DIM = OBS_DIM\n",
" self.ACTION_DIM = ACTION_DIM\n",
" self.RNN_SIZE = 220 # RNN hidden size\n",
"\n",
" self.rnn = nn.LSTM(input_size=OBS_DIM + ACTION_DIM + TARGET_DIM, hidden_size=self.RNN_SIZE)\n",
" self.l1 = nn.Linear(self.RNN_SIZE, ACTION_DIM)\n",
"\n",
" def forward(self, x, hidden_in):\n",
" \"\"\"\n",
" Computes the action based on the current input and hidden state.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current input to the model, which includes observation, action, and target information.\n",
" - hidden_in (tuple): The initial hidden state for the LSTM.\n",
"\n",
" Outputs:\n",
" - a (tensor): The action output from the model.\n",
" - hidden_out (tuple): The updated hidden state from the LSTM.\n",
" \"\"\"\n",
" ###################################################################\n",
" ## Fill out the following then remove\n",
" raise NotImplementedError(\"Student exercise: complete forward propagation through holistic actor.\")\n",
" ###################################################################\n",
" #######################################################\n",
" # TODO: Pass the input 'x' and the previous hidden state 'hidden_in' to the RNN module 'self.rnn'.\n",
" # Get the output 'x' and the hidden state 'hidden_out' from the RNN module.\n",
" # Refer to https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html.\n",
" # Hint: 'self.rnn' takes two arguments as inputs and outputs two things.\n",
" # The first position corresponds to 'x', and the second position corresponds to the hidden state.\n",
" #######################################################\n",
" x, hidden_out = ...\n",
"\n",
" a = torch.tanh(self.l1(x))\n",
" return a, hidden_out\n",
"\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"#to_remove solution\n",
"class HolisticActor(nn.Module):\n",
" def __init__(self, OBS_DIM, ACTION_DIM, TARGET_DIM):\n",
" \"\"\"\n",
" Initializes the holistic actor model with given dimensions.\n",
"\n",
" Inputs:\n",
" - OBS_DIM (int): The dimension of the observation input.\n",
" - ACTION_DIM (int): The dimension of the action output.\n",
" - TARGET_DIM (int): The dimension of the target input.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" super().__init__()\n",
" self.OBS_DIM = OBS_DIM\n",
" self.ACTION_DIM = ACTION_DIM\n",
" self.RNN_SIZE = 220 # RNN hidden size\n",
"\n",
" self.rnn = nn.LSTM(input_size=OBS_DIM + ACTION_DIM + TARGET_DIM, hidden_size=self.RNN_SIZE)\n",
" self.l1 = nn.Linear(self.RNN_SIZE, ACTION_DIM)\n",
"\n",
" def forward(self, x, hidden_in):\n",
" \"\"\"\n",
" Computes the action based on the current input and hidden state.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current input to the model, which includes observation, action, and target information.\n",
" - hidden_in (tuple): The initial hidden state for the LSTM.\n",
"\n",
" Outputs:\n",
" - a (tensor): The action output from the model.\n",
" - hidden_out (tuple): The updated hidden state from the LSTM.\n",
" \"\"\"\n",
" #######################################################\n",
" # TODO: Pass the input 'x' and the previous hidden state 'hidden_in' to the RNN module 'self.rnn'.\n",
" # Get the output 'x' and the hidden state 'hidden_out' from the RNN module.\n",
" # Refer to https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html.\n",
" # Hint: 'self.rnn' takes two arguments as inputs and outputs two things.\n",
" # The first position corresponds to 'x', and the second position corresponds to the hidden state.\n",
" #######################################################\n",
" x, hidden_out = self.rnn(x, hidden_in)\n",
"\n",
" a = torch.tanh(self.l1(x))\n",
" return a, hidden_out"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Test your implementation of `HolisticActor'!\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Test your implementation of `HolisticActor'!\n",
"set_seed(0)\n",
"\n",
"arg = Config()\n",
"actor_test = HolisticActor(arg.OBS_DIM, arg.ACTION_DIM, arg.TARGET_DIM)\n",
"\n",
"x_test = torch.randn(1, 1, 6); hidden_in_test = (torch.randn(1, 1, 220), torch.randn(1, 1, 220))\n",
"a, hidden_out = actor_test(x_test, hidden_in_test)\n",
"\n",
"if torch.norm(a.reshape(-1) - torch.tensor([0.1145, -0.1694])) < 1e-2:\n",
" print('Your function is correct!')\n",
"else:\n",
" print('Your function is incorrect!')"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Modular actor"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We define the modular actor as follows. Fill in the missing code to define the modular architecture."
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"execution": {}
},
"source": [
"```python\n",
"class ModularActor(nn.Module):\n",
" def __init__(self, OBS_DIM, ACTION_DIM, TARGET_DIM):\n",
" \"\"\"\n",
" Initializes the modular actor model with given dimensions.\n",
"\n",
" Inputs:\n",
" - OBS_DIM (int): The dimension of the observation input.\n",
" - ACTION_DIM (int): The dimension of the action output.\n",
" - TARGET_DIM (int): The dimension of the target input.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" super().__init__()\n",
" self.OBS_DIM = OBS_DIM\n",
" self.ACTION_DIM = ACTION_DIM\n",
" self.RNN_SIZE = 128 # RNN hidden size\n",
" MLP_SIZE = 300 # number of neurons in one MLP layer\n",
"\n",
" self.rnn = nn.LSTM(input_size=OBS_DIM + ACTION_DIM + TARGET_DIM, hidden_size=self.RNN_SIZE)\n",
" self.l1 = nn.Linear(self.RNN_SIZE, MLP_SIZE)\n",
" self.l2 = nn.Linear(MLP_SIZE, MLP_SIZE)\n",
" self.l3 = nn.Linear(MLP_SIZE, ACTION_DIM)\n",
"\n",
" def forward(self, x, hidden_in):\n",
" \"\"\"\n",
" Computes the action based on the current input and hidden state.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current input to the model, which includes observation, action, and target information.\n",
" - hidden_in (tuple): The initial hidden state for the LSTM.\n",
"\n",
" Outputs:\n",
" - a (tensor): The action output from the model.\n",
" - hidden_out (tuple): The updated hidden state from the LSTM.\n",
" \"\"\"\n",
" ###################################################################\n",
" ## Fill out the following then remove\n",
" raise NotImplementedError(\"Student exercise: complete forward propagation through modular actor.\")\n",
" ###################################################################\n",
" #######################################################\n",
" # TODO: Pass 'x' to the MLP module, which consists of two linear layers with ReLU nonlinearity.\n",
" # First, pass 'x' to the first linear layer, 'self.l1', followed by 'F.relu'.\n",
" # Second, pass 'x' again to the second linear layer, 'self.l2', followed by 'F.relu'.\n",
" #######################################################\n",
" x, hidden_out = self.rnn(x, hidden_in)\n",
" x = ...\n",
" x = ...\n",
"\n",
" a = torch.tanh(self.l3(x))\n",
"\n",
" return a, hidden_out\n",
"\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"#to_remove solution\n",
"class ModularActor(nn.Module):\n",
" def __init__(self, OBS_DIM, ACTION_DIM, TARGET_DIM):\n",
" \"\"\"\n",
" Initializes the modular actor model with given dimensions.\n",
"\n",
" Inputs:\n",
" - OBS_DIM (int): The dimension of the observation input.\n",
" - ACTION_DIM (int): The dimension of the action output.\n",
" - TARGET_DIM (int): The dimension of the target input.\n",
"\n",
" Outputs:\n",
" - None\n",
" \"\"\"\n",
" super().__init__()\n",
" self.OBS_DIM = OBS_DIM\n",
" self.ACTION_DIM = ACTION_DIM\n",
" self.RNN_SIZE = 128 # RNN hidden size\n",
" MLP_SIZE = 300 # number of neurons in one MLP layer\n",
"\n",
" self.rnn = nn.LSTM(input_size=OBS_DIM + ACTION_DIM + TARGET_DIM, hidden_size=self.RNN_SIZE)\n",
" self.l1 = nn.Linear(self.RNN_SIZE, MLP_SIZE)\n",
" self.l2 = nn.Linear(MLP_SIZE, MLP_SIZE)\n",
" self.l3 = nn.Linear(MLP_SIZE, ACTION_DIM)\n",
"\n",
" def forward(self, x, hidden_in):\n",
" \"\"\"\n",
" Computes the action based on the current input and hidden state.\n",
"\n",
" Inputs:\n",
" - x (tensor): The current input to the model, which includes observation, action, and target information.\n",
" - hidden_in (tuple): The initial hidden state for the LSTM.\n",
"\n",
" Outputs:\n",
" - a (tensor): The action output from the model.\n",
" - hidden_out (tuple): The updated hidden state from the LSTM.\n",
" \"\"\"\n",
" #######################################################\n",
" # TODO: Pass 'x' to the MLP module, which consists of two linear layers with ReLU nonlinearity.\n",
" # First, pass 'x' to the first linear layer, 'self.l1', followed by 'F.relu'.\n",
" # Second, pass 'x' again to the second linear layer, 'self.l2', followed by 'F.relu'.\n",
" #######################################################\n",
" x, hidden_out = self.rnn(x, hidden_in)\n",
" x = F.relu(self.l1(x))\n",
" x = F.relu(self.l2(x))\n",
"\n",
" a = torch.tanh(self.l3(x))\n",
"\n",
" return a, hidden_out"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Test your implementation of `ModularActor'!\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Test your implementation of `ModularActor'!\n",
"set_seed(0)\n",
"\n",
"arg = Config()\n",
"actor_test = ModularActor(arg.OBS_DIM, arg.ACTION_DIM, arg.TARGET_DIM)\n",
"\n",
"x_test = torch.randn(1, 1, 6); hidden_in_test = (torch.randn(1, 1, 128), torch.randn(1, 1, 128))\n",
"a, hidden_out = actor_test(x_test, hidden_in_test)\n",
"\n",
"if torch.norm(a.reshape(-1) - torch.tensor([0.0068, 0.0307])) < 1e-2:\n",
" print('Your function is correct!')\n",
"else:\n",
" print('Your function is incorrect!')"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Coding Exercise 2 Discussion\n",
"\n",
"1. To ensure a fair comparison, the total number of trainable parameters is designed to be similar between the two architectures. How many trainable parameters are there in each architecture?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"#to_remove explanation\n",
"\n",
"\"\"\"\n",
"Discussion: To ensure a fair comparison, the total number of trainable parameters is designed to be similar between the two architectures. How many trainable parameters are there in each architecture?\n",
"\n",
"Taking into consideration that OBS_DIM = ACTION_DIM = TARGET_DIM = 2, and that for `LSTM` layer, the total number of parameters is 4(nm + n^2 + n) where m is the input dimension and n is the output dimension (as we have self-recurrence, thus n^2, projection from input to output, thus nm, and, finally, bias, thus n), we have:\n",
"\n",
"- for holistic actor (LSTM + Linear projection): 4 * (6 * 220 + 220*220 + 220) + (220 * 2 + 220) = 200420.\n",
"- for modular actor (LSTM + Linear projections): 4 * (6 * 128 + 128*128 + 128) + (128 * 300 + 128) + (300 * 300 + 300) + (300 * 2 + 300) = 198848.\n",
"\"\"\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_rl_agent\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"---\n",
"# Section 2: Evaluate agents in the training task\n",
"\n",
"Estimated timing to here from start of tutorial: 25 minutes"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"With the code for the environment and agents done, we will now write an evaluation function allowing the agent to interact with the environment."
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"## Coding Exercise 3: Evaluation function"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We first sample 1000 targets for agents to steer to."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Sample 1000 targets\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Sample 1000 targets\n",
"arg = Config()\n",
"\n",
"set_seed(0)\n",
"env = Env(arg)\n",
"target_positions = []\n",
"\n",
"for _ in range(1000):\n",
" __ = env.reset()\n",
" target_positions.append(env.target_position)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Then, we define the evaluation function. For each target, an agent takes multiple steps to steer to it. In each step, the agent selects an action based on the input information about the state, and the environment updates according to the agent's action.\n",
"\n",
"Fill in the missing code that calculates the reward."
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"execution": {}
},
"source": [
"```python\n",
"def evaluation(agent, gain_factor=1):\n",
" \"\"\"\n",
" Evaluates the agent's performance in the environment.\n",
"\n",
" Inputs:\n",
" - agent (Agent): The agent to be evaluated.\n",
" - gain_factor (float): A factor to scale the process gain. Default is 1.\n",
"\n",
" Outputs:\n",
" - results (DataFrame): A DataFrame containing evaluation results with columns:\n",
" - 'pos_x': List of x positions over episodes.\n",
" - 'pos_y': List of y positions over episodes.\n",
" - 'pos_r_end': List of final radial distances from origin over episodes.\n",
" - 'target_x': List of target x positions.\n",
" - 'target_y': List of target y positions.\n",
" - 'target_r': List of target radial distances.\n",
" - 'rewarded': List of binary rewards indicating if the target was reached and stopped.\n",
" - 'state_input': List of state inputs recorded during the episodes.\n",
" \"\"\"\n",
" set_seed(0)\n",
" env = Env(arg)\n",
"\n",
" pos_x = []; pos_y = []; pos_r_end = []\n",
" target_x = []; target_y = []; target_r = []\n",
" rewarded = []; state_input_ = []\n",
"\n",
" for target_position in tqdm(target_positions):\n",
" state = env.reset(target_position=target_position, gain=arg.process_gain_default * gain_factor)\n",
" agent.obs_step.reset(env.gain)\n",
"\n",
" state_input = torch.cat([torch.zeros([1, 1, arg.OBS_DIM]), torch.zeros([1, 1, arg.ACTION_DIM]),\n",
" env.target_position_obs.view(1, 1, -1)], dim=2)\n",
" hidden_in = (torch.zeros(1, 1, agent.actor.RNN_SIZE), torch.zeros(1, 1, agent.actor.RNN_SIZE))\n",
"\n",
" state_inputs = []\n",
" states = []\n",
"\n",
" for t in range(arg.EPISODE_LEN):\n",
" # 1. Agent takes an action given the state-related input\n",
" action, hidden_out = agent.select_action(state_input, hidden_in)\n",
"\n",
" # 2. Environment updates to the next state given state and action,\n",
" # as well as checking if the agent has reached the reward zone,\n",
" # and if the agent has stopped.\n",
" next_state, reached_target = env(state, action, t)\n",
" is_stop = env.is_stop(action)\n",
"\n",
" # 3. Receive reward\n",
" ###################################################################\n",
" ## Fill out the following then remove\n",
" raise NotImplementedError(\"Student exercise: compute the reward.\")\n",
" ###################################################################\n",
" # TODO: Compute the reward. The reward is '1' when the target is reached and the agent stops on it,\n",
" # otherwise, the reward is '0'.\n",
" # Hint: Use variables 'reached_target' and 'is_stop'.\n",
" reward = ...\n",
"\n",
" # 4. Agent observes the next state and constructs the next state-related input\n",
" next_observation = agent.obs_step(next_state)\n",
" next_state_input = torch.cat([next_observation.view(1, 1, -1), action,\n",
" env.target_position_obs.view(1, 1, -1)], dim=2)\n",
"\n",
" states.append(state)\n",
" state_inputs.append(state_input)\n",
"\n",
" state_input = next_state_input\n",
" state = next_state\n",
" hidden_in = hidden_out\n",
"\n",
" # trial is done when the agent stops\n",
" if is_stop:\n",
" break\n",
"\n",
" # store data for each trial\n",
" pos_x_, pos_y_, _, _, _ = torch.chunk(torch.cat(states, dim=1), state.shape[0], dim=0)\n",
" pos_x.append(pos_x_.view(-1).numpy() * arg.LINEAR_SCALE)\n",
" pos_y.append(pos_y_.view(-1).numpy() * arg.LINEAR_SCALE)\n",
"\n",
" pos_r, _ = cart2pol(pos_x[-1], pos_y[-1])\n",
" pos_r_end.append(pos_r[-1])\n",
"\n",
" target_x.append(target_position[0].item() * arg.LINEAR_SCALE)\n",
" target_y.append(target_position[1].item() * arg.LINEAR_SCALE)\n",
" target_r_, _ = cart2pol(target_x[-1], target_y[-1])\n",
" target_r.append(target_r_)\n",
"\n",
" state_input_.append(torch.cat(state_inputs))\n",
" rewarded.append(reward.item())\n",
"\n",
" return(pd.DataFrame().assign(pos_x=pos_x, pos_y=pos_y,\n",
" pos_r_end=pos_r_end, target_x=target_x, target_y=target_y,\n",
" target_r=target_r, rewarded=rewarded,\n",
" state_input=state_input_))\n",
"\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {}
},
"outputs": [],
"source": [
"#to_remove solution\n",
"def evaluation(agent, gain_factor=1):\n",
" \"\"\"\n",
" Evaluates the agent's performance in the environment.\n",
"\n",
" Inputs:\n",
" - agent (Agent): The agent to be evaluated.\n",
" - gain_factor (float): A factor to scale the process gain. Default is 1.\n",
"\n",
" Outputs:\n",
" - results (DataFrame): A DataFrame containing evaluation results with columns:\n",
" - 'pos_x': List of x positions over episodes.\n",
" - 'pos_y': List of y positions over episodes.\n",
" - 'pos_r_end': List of final radial distances from origin over episodes.\n",
" - 'target_x': List of target x positions.\n",
" - 'target_y': List of target y positions.\n",
" - 'target_r': List of target radial distances.\n",
" - 'rewarded': List of binary rewards indicating if the target was reached and stopped.\n",
" - 'state_input': List of state inputs recorded during the episodes.\n",
" \"\"\"\n",
" set_seed(0)\n",
" env = Env(arg)\n",
"\n",
" pos_x = []; pos_y = []; pos_r_end = []\n",
" target_x = []; target_y = []; target_r = []\n",
" rewarded = []; state_input_ = []\n",
"\n",
" for target_position in tqdm(target_positions):\n",
" state = env.reset(target_position=target_position, gain=arg.process_gain_default * gain_factor)\n",
" agent.obs_step.reset(env.gain)\n",
"\n",
" state_input = torch.cat([torch.zeros([1, 1, arg.OBS_DIM]), torch.zeros([1, 1, arg.ACTION_DIM]),\n",
" env.target_position_obs.view(1, 1, -1)], dim=2)\n",
" hidden_in = (torch.zeros(1, 1, agent.actor.RNN_SIZE), torch.zeros(1, 1, agent.actor.RNN_SIZE))\n",
"\n",
" state_inputs = []\n",
" states = []\n",
"\n",
" for t in range(arg.EPISODE_LEN):\n",
" # 1. Agent takes an action given the state-related input\n",
" action, hidden_out = agent.select_action(state_input, hidden_in)\n",
"\n",
" # 2. Environment updates to the next state given state and action,\n",
" # as well as checking if the agent has reached the reward zone,\n",
" # and if the agent has stopped.\n",
" next_state, reached_target = env(state, action, t)\n",
" is_stop = env.is_stop(action)\n",
"\n",
" # 3. Receive reward\n",
" # TODO: Compute the reward. The reward is '1' when the target is reached and the agent stops on it,\n",
" # otherwise, the reward is '0'.\n",
" # Hint: Use variables 'reached_target' and 'is_stop'.\n",
" reward = reached_target & is_stop\n",
"\n",
" # 4. Agent observes the next state and constructs the next state-related input\n",
" next_observation = agent.obs_step(next_state)\n",
" next_state_input = torch.cat([next_observation.view(1, 1, -1), action,\n",
" env.target_position_obs.view(1, 1, -1)], dim=2)\n",
"\n",
" states.append(state)\n",
" state_inputs.append(state_input)\n",
"\n",
" state_input = next_state_input\n",
" state = next_state\n",
" hidden_in = hidden_out\n",
"\n",
" # trial is done when the agent stops\n",
" if is_stop:\n",
" break\n",
"\n",
" # store data for each trial\n",
" pos_x_, pos_y_, _, _, _ = torch.chunk(torch.cat(states, dim=1), state.shape[0], dim=0)\n",
" pos_x.append(pos_x_.view(-1).numpy() * arg.LINEAR_SCALE)\n",
" pos_y.append(pos_y_.view(-1).numpy() * arg.LINEAR_SCALE)\n",
"\n",
" pos_r, _ = cart2pol(pos_x[-1], pos_y[-1])\n",
" pos_r_end.append(pos_r[-1])\n",
"\n",
" target_x.append(target_position[0].item() * arg.LINEAR_SCALE)\n",
" target_y.append(target_position[1].item() * arg.LINEAR_SCALE)\n",
" target_r_, _ = cart2pol(target_x[-1], target_y[-1])\n",
" target_r.append(target_r_)\n",
"\n",
" state_input_.append(torch.cat(state_inputs))\n",
" rewarded.append(reward.item())\n",
"\n",
" return(pd.DataFrame().assign(pos_x=pos_x, pos_y=pos_y,\n",
" pos_r_end=pos_r_end, target_x=target_x, target_y=target_y,\n",
" target_r=target_r, rewarded=rewarded,\n",
" state_input=state_input_))"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Since training RL agents takes a lot of time, here we load the pre-trained modular and holistic agents and evaluate these two agents on the same sampled 1000 targets. We will then store the evaluation data in pandas dataframes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"modular_agent = Agent(arg, ModularActor)\n",
"modular_agent.load(Path('agents/modular'), 0)\n",
"modular_df = evaluation(modular_agent)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"holistic_agent = Agent(arg, HolisticActor)\n",
"holistic_agent.load(Path('agents/holistic'), 0)\n",
"holistic_df = evaluation(holistic_agent)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_evaluation_function\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"## Coding Exercise 4: Agent trajectory in a single trial"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"After evaluation, we want to visualize the agents' steering trajectories. Fill in the missing code to compute the distance between the stop location and the target location. Given the reward boundary is $65$ cm, are these trials rewarded?"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Modular agent trajectory"
]
},
{
"cell_type": "markdown",
"metadata": {
"colab_type": "text",
"execution": {}
},
"source": [
"```python\n",
"with plt.xkcd():\n",
" trial_idx = 21\n",
" trial = modular_df.iloc[trial_idx]\n",
"\n",
" fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
" ax = fig.add_subplot(111)\n",
"\n",
" ax.set_aspect('equal')\n",
" ax.spines['top'].set_visible(False)\n",
" ax.spines['right'].set_visible(False)\n",
" ax.spines['bottom'].set_visible(False)\n",
" ax.spines['left'].set_visible(False)\n",
" ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])\n",
"\n",
" # plot trajectory\n",
" px = trial.pos_x; py = trial.pos_y\n",
" ax.plot(px, py, lw=lw, c=modular_c)\n",
"\n",
" # plot target\n",
" target_x = trial.target_x; target_y = trial.target_y\n",
" print(f'Target distance from the start location: {np.around(trial.target_r, 1)} cm')\n",
"\n",
" ###################################################################\n",
" ## Fill out the following then remove\n",
" raise NotImplementedError(\"Student exercise: calculate the distance between target location and stop position.\")\n",
" ###################################################################\n",
" # Given target locations as trial.target_x and trial.target_y,\n",
" # and stop locations as trial.pos_x[-1] and trial.pos_y[-1],\n",
" # compute the Euclidean distance between the target and stop locations.\n",
" distance_stoploc_to_target = ...\n",
" print(f'Target distance from the stop location: {np.around(distance_stoploc_to_target, 1)} cm')\n",
"\n",
" print(f'Steps taken: {px.size - 1}')\n",
"\n",
" reward_boundary_radius = arg.goal_radius * arg.LINEAR_SCALE\n",
" target_color = reward_c if distance_stoploc_to_target < reward_boundary_radius else unreward_c\n",
"\n",
" cir1 = Circle(xy=[target_x, target_y], radius=reward_boundary_radius, alpha=0.4, color=target_color, lw=0)\n",
" ax.add_patch(cir1)\n",
" ax.scatter(target_x, target_y, c=target_color, s=5)\n",
"\n",
" # plot initial position\n",
" ax.scatter(0, 0, c='k', s=20, marker='*')\n",
" ax.text(10, -10, s='Start', fontsize=fontsize)\n",
"\n",
" fig.tight_layout(pad=0)\n",
"\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"#to_remove solution\n",
"with plt.xkcd():\n",
" trial_idx = 21\n",
" trial = modular_df.iloc[trial_idx]\n",
"\n",
" fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
" ax = fig.add_subplot(111)\n",
"\n",
" ax.set_aspect('equal')\n",
" ax.spines['top'].set_visible(False)\n",
" ax.spines['right'].set_visible(False)\n",
" ax.spines['bottom'].set_visible(False)\n",
" ax.spines['left'].set_visible(False)\n",
" ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])\n",
"\n",
" # plot trajectory\n",
" px = trial.pos_x; py = trial.pos_y\n",
" ax.plot(px, py, lw=lw, c=modular_c)\n",
"\n",
" # plot target\n",
" target_x = trial.target_x; target_y = trial.target_y\n",
" print(f'Target distance from the start location: {np.around(trial.target_r, 1)} cm')\n",
"\n",
" # Given target locations as trial.target_x and trial.target_y,\n",
" # and stop locations as trial.pos_x[-1] and trial.pos_y[-1],\n",
" # compute the Euclidean distance between the target and stop locations.\n",
" distance_stoploc_to_target = np.sqrt((trial.target_x - trial.pos_x[-1])**2 + (trial.target_y - trial.pos_y[-1])**2)\n",
" print(f'Target distance from the stop location: {np.around(distance_stoploc_to_target, 1)} cm')\n",
"\n",
" print(f'Steps taken: {px.size - 1}')\n",
"\n",
" reward_boundary_radius = arg.goal_radius * arg.LINEAR_SCALE\n",
" target_color = reward_c if distance_stoploc_to_target < reward_boundary_radius else unreward_c\n",
"\n",
" cir1 = Circle(xy=[target_x, target_y], radius=reward_boundary_radius, alpha=0.4, color=target_color, lw=0)\n",
" ax.add_patch(cir1)\n",
" ax.scatter(target_x, target_y, c=target_color, s=5)\n",
"\n",
" # plot initial position\n",
" ax.scatter(0, 0, c='k', s=20, marker='*')\n",
" ax.text(10, -10, s='Start', fontsize=fontsize)\n",
"\n",
" fig.tight_layout(pad=0)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Holistic agent trajectory"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Holistic agent trajectory\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Holistic agent trajectory\n",
"\n",
"with plt.xkcd():\n",
" trial_idx = 21\n",
" trial = holistic_df.iloc[trial_idx]\n",
"\n",
" fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
" ax = fig.add_subplot(111)\n",
"\n",
" ax.set_aspect('equal')\n",
" ax.spines['top'].set_visible(False)\n",
" ax.spines['right'].set_visible(False)\n",
" ax.spines['bottom'].set_visible(False)\n",
" ax.spines['left'].set_visible(False)\n",
" ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])\n",
"\n",
" # plot trajectory\n",
" px = trial.pos_x; py = trial.pos_y\n",
" ax.plot(px, py, lw=lw, c=holistic_c)\n",
"\n",
" # plot target\n",
" target_x = trial.target_x; target_y = trial.target_y\n",
" print(f'Target distance from the start location: {np.around(trial.target_r, 1)} cm')\n",
"\n",
" distance_stoploc_to_target = np.sqrt((trial.target_x - trial.pos_x[-1])**2\n",
" + (trial.target_y - trial.pos_y[-1])**2)\n",
" print(f'Target distance from the stop location: {np.around(distance_stoploc_to_target, 1)} cm')\n",
"\n",
" print(f'Steps taken: {px.size - 1}')\n",
"\n",
" reward_boundary_radius = arg.goal_radius * arg.LINEAR_SCALE\n",
" target_color = reward_c if distance_stoploc_to_target < reward_boundary_radius else unreward_c\n",
"\n",
" cir1 = Circle(xy=[target_x, target_y], radius=reward_boundary_radius, alpha=0.4, color=target_color, lw=0)\n",
" ax.add_patch(cir1)\n",
" ax.scatter(target_x, target_y, c=target_color, s=5)\n",
"\n",
" # plot initial position\n",
" ax.scatter(0, 0, c='k', s=20, marker='*')\n",
" ax.text(10, -10, s='Start', fontsize=fontsize)\n",
"\n",
" fig.tight_layout(pad=0)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Coding Exercise 4 Discussion\n",
"\n",
"1. Is there any difference between the trajectories for the modular and holistic agents? If so, what does it imply?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"#to_remove explanation\n",
"\n",
"\"\"\"\n",
"Discussion: Is there any difference between the trajectories for the modular and holistic agents? If so, what does it imply?\n",
"\n",
"The holistic agent's trajectory has a higher curvature and length than that of the modular agent, suggesting that the modular agent's trajectory is more optimal. This is because, based on the RL objective with a discount factor smaller than 1, the trajectory should be as efficient (involving fewer steps) as possible.\n",
"\"\"\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_agent_trajectory_in_a_single_trial\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"## Activity: Comparing performance of agents"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Agent trajectories across trials"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We can also visualize multiple trials together. Here, we visualize trajectories steering towards $500$ targets."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Modular agent's trajectories\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Modular agent's trajectories\n",
"\n",
"target_idexes = np.arange(0, 500)\n",
"df = modular_df\n",
"\n",
"fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
"ax = fig.add_subplot(111)\n",
"ax.set_title(\"Modular agent's trajectories\", fontsize=fontsize, fontweight='bold')\n",
"ax.set_aspect('equal')\n",
"ax.spines['top'].set_visible(False)\n",
"ax.spines['right'].set_visible(False)\n",
"ax.spines['bottom'].set_visible(False)\n",
"ax.spines['left'].set_visible(False)\n",
"ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])\n",
"ax.set_xlim([-235, 235]); ax.set_ylim([-2, 430])\n",
"\n",
"ax.plot(np.linspace(0, 230 + 7), np.tan(np.deg2rad(55)) * np.linspace(0, 230 + 7) - 10,\n",
" c='k', ls=(0, (1, 1)), lw=lw)\n",
"for _, trial in df.iloc[target_idexes].iterrows():\n",
" ax.plot(trial.pos_x, trial.pos_y, c=modular_c, lw=0.3, ls='-', alpha=0.2)\n",
"\n",
"reward_idexes = df.rewarded.iloc[target_idexes].values\n",
"for label, mask, c in zip(['Rewarded', 'Unrewarded'], [reward_idexes, ~reward_idexes],\n",
" [reward_c, unreward_c]):\n",
" ax.scatter(*df.iloc[target_idexes].loc[mask, ['target_x', 'target_y']].values.T,\n",
" c=c, marker='o', s=1, lw=1.5, label=label)\n",
"\n",
"x_temp = np.linspace(-235, 235)\n",
"ax.plot(x_temp, np.sqrt(420**2 - x_temp**2), c='k', ls=(0, (1, 1)), lw=lw)\n",
"ax.text(-10, 425, s=r'70$\\degree$', fontsize=fontsize)\n",
"ax.text(130, 150, s=r'400 cm', fontsize=fontsize)\n",
"\n",
"ax.plot(np.linspace(-230, -130), np.linspace(0, 0), c='k', lw=lw)\n",
"ax.plot(np.linspace(-230, -230), np.linspace(0, 100), c='k', lw=lw)\n",
"ax.text(-230, 100, s=r'100 cm', fontsize=fontsize)\n",
"ax.text(-130, 0, s=r'100 cm', fontsize=fontsize)\n",
"\n",
"ax.legend(fontsize=fontsize, frameon=False, loc=[0.56, 0.0],\n",
" handletextpad=-0.5, labelspacing=0, ncol=1, columnspacing=1)\n",
"\n",
"fig.tight_layout(pad=0)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Holistic agent's trajectories\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Holistic agent's trajectories\n",
"\n",
"target_idexes = np.arange(0, 500)\n",
"df = holistic_df\n",
"\n",
"fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
"ax = fig.add_subplot(111)\n",
"ax.set_title(\"Holistic agent's trajectories\", fontsize=fontsize, fontweight='bold')\n",
"ax.set_aspect('equal')\n",
"ax.spines['top'].set_visible(False)\n",
"ax.spines['right'].set_visible(False)\n",
"ax.spines['bottom'].set_visible(False)\n",
"ax.spines['left'].set_visible(False)\n",
"ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])\n",
"ax.set_xlim([-235, 235]); ax.set_ylim([-2, 430])\n",
"\n",
"ax.plot(np.linspace(0, 230 + 7), np.tan(np.deg2rad(55)) * np.linspace(0, 230 + 7) - 10,\n",
" c='k', ls=(0, (1, 1)), lw=lw)\n",
"for _, trial in df.iloc[target_idexes].iterrows():\n",
" ax.plot(trial.pos_x, trial.pos_y, c=holistic_c, lw=0.3, ls='-', alpha=0.2)\n",
"\n",
"reward_idexes = df.rewarded.iloc[target_idexes].values\n",
"for label, mask, c in zip(['Rewarded', 'Unrewarded'], [reward_idexes, ~reward_idexes],\n",
" [reward_c, unreward_c]):\n",
" ax.scatter(*df.iloc[target_idexes].loc[mask, ['target_x', 'target_y']].values.T,\n",
" c=c, marker='o', s=1, lw=1.5, label=label)\n",
"\n",
"x_temp = np.linspace(-235, 235)\n",
"ax.plot(x_temp, np.sqrt(420**2 - x_temp**2), c='k', ls=(0, (1, 1)), lw=lw)\n",
"ax.text(-10, 425, s=r'70$\\degree$', fontsize=fontsize)\n",
"ax.text(130, 150, s=r'400 cm', fontsize=fontsize)\n",
"\n",
"ax.plot(np.linspace(-230, -130), np.linspace(0, 0), c='k', lw=lw)\n",
"ax.plot(np.linspace(-230, -230), np.linspace(0, 100), c='k', lw=lw)\n",
"ax.text(-230, 100, s=r'100 cm', fontsize=fontsize)\n",
"ax.text(-130, 0, s=r'100 cm', fontsize=fontsize)\n",
"\n",
"ax.legend(fontsize=fontsize, frameon=False, loc=[0.56, 0.0],\n",
" handletextpad=-0.5, labelspacing=0, ncol=1, columnspacing=1)\n",
"\n",
"fig.tight_layout(pad=0)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Discussion\n",
"\n",
"1. Is there any difference between the two agents' trajectories?"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Agents trained with multiple random seeds"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"It is well known that an RL agent's performance can vary significantly with different random seeds. Therefore, no conclusions can be drawn based on one training run with a single random seed.\n",
"\n",
"Both agents were trained with eight random seeds, and all of them were evaluated using the same sample of $1000$ targets. Let's load this saved trajectory data."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Load agents\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Load agents\n",
"with open('modular_dfs.pkl', 'rb') as file:\n",
" modular_dfs = pickle.load(file)\n",
"\n",
"with open('holistic_dfs.pkl', 'rb') as file:\n",
" holistic_dfs = pickle.load(file)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Reward fraction comparison"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We first compute the fraction of rewarded trials in the total $1000$ trials for all training runs with different random seeds for the modular and holistic agents. We visualize this using a bar plot, with each red dot denoting the performance of a random seed."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reward function comparison\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Reward function comparison\n",
"\n",
"with plt.xkcd():\n",
" xticks = [0, 1]; xticklabels = ['Modular', 'Holistic']\n",
" yticks = [0.9, 0.95, 1]\n",
"\n",
" fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
" ax = fig.add_subplot(1, 1, 1)\n",
" ax.set_title('Reward fraction comparison', fontsize=fontsize, fontweight='bold')\n",
" ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)\n",
" plt.xticks(xticks, xticklabels, fontsize=fontsize)\n",
" plt.yticks(yticks, fontsize=fontsize)\n",
" ax.set_xlabel(r'', fontsize=fontsize + 1)\n",
" ax.set_ylabel('Reward fraction', fontsize=fontsize + 1)\n",
" ax.set_xlim(xticks[0] - 0.3, xticks[-1] + 0.3)\n",
" ax.set_ylim(yticks[0], yticks[-1])\n",
" ax.xaxis.set_label_coords(0.5, -0.15)\n",
" ax.yaxis.set_label_coords(-0.13, 0.5)\n",
" ax.yaxis.set_major_formatter(major_formatter)\n",
"\n",
" for (idx, dfs), c in zip(enumerate([modular_dfs, holistic_dfs]), [modular_c, holistic_c]):\n",
" ydata = [df.rewarded.sum() / len(df) for df in dfs]\n",
" ax.bar(idx, np.mean(ydata), width=0.7, color=c, alpha=1, zorder=0)\n",
" ax.scatter([idx] * len(ydata), ydata, c='crimson', marker='.',\n",
" s=10, lw=0.5, zorder=1, clip_on=False)\n",
"\n",
" fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Time spent comparison"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Despite similar performance measured by a rewarded fraction, we dis observe qualitative differences in the trajectories of the two agents in the previous sections. It is possible that the holistic agent's more curved trajectories, although reaching the target, are less efficient, i.e., they waste more time.\n",
"\n",
"Therefore, we also plot the time spent by both agents for the same 1000 targets."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Time spent comparison\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Time spent comparison\n",
"\n",
"with plt.xkcd():\n",
" xticks = [0, 1]; xticklabels = ['Modular', 'Holistic']\n",
" yticks = [0.35, 0.45, 0.55]\n",
"\n",
" fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
" ax = fig.add_subplot(1, 1, 1)\n",
" ax.set_title('Time spent comparison', fontsize=fontsize, fontweight='bold')\n",
" ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)\n",
" plt.xticks(xticks, xticklabels, fontsize=fontsize)\n",
" plt.yticks(yticks, fontsize=fontsize)\n",
" ax.set_xlabel(r'', fontsize=fontsize + 1)\n",
" ax.set_ylabel('Time spent (h)', fontsize=fontsize + 1)\n",
" ax.set_xlim(xticks[0] - 0.3, xticks[-1] + 0.3)\n",
" ax.set_ylim(yticks[0], yticks[-1])\n",
" ax.xaxis.set_label_coords(0.5, -0.15)\n",
" ax.yaxis.set_label_coords(-0.16, 0.5)\n",
" ax.yaxis.set_major_formatter(major_formatter)\n",
"\n",
" for (idx, dfs), c in zip(enumerate([modular_dfs, holistic_dfs]), [modular_c, holistic_c]):\n",
" ydata = [(np.hstack(df.pos_x).size - len(df)) * arg.DT / 3600 for df in dfs]\n",
" ax.bar(idx, np.mean(ydata), width=0.7, color=c, alpha=1, zorder=0)\n",
" ax.scatter([idx] * len(ydata), ydata, c='crimson', marker='.',\n",
" s=10, lw=0.5, zorder=1, clip_on=False)\n",
"\n",
" fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Discussion\n",
"1. Which agent's behavior is more desired in the RL framework? Why?\n",
"\n",
"*Hint*: Consider the objective functions for the critic and actor. The discount factor $\\gamma$ used in training is smaller than $1$."
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Training curve"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"So far, we have only tested the agents after training. We can also visualize the performance curve for both agents during the training course.\n",
"\n",
"During training, for every $500$ training trials, the agent's performance (fraction of rewarded trials) was evaluated. These data were saved; we'll load them back in for visualization."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Load training curves\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Load training curves\n",
"\n",
"training_curve_path = Path('training_curve')\n",
"holistic_curves = [pd.read_csv(file) for file in training_curve_path.glob(f'holistic*')]\n",
"modular_curves = [pd.read_csv(file) for file in training_curve_path.glob(f'modular*')]"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Let us plot the training curves for both agents. The shaded area denotes the standard error of the mean across all random seeds."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Visualize training curves\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Visualize training curves\n",
"\n",
"mean_holistic_curves = np.vstack([v.reward_fraction for v in holistic_curves]).mean(axis=0)\n",
"sem_holistic_curves = sem(np.vstack([v.reward_fraction for v in holistic_curves]), axis=0)\n",
"mean_modular_curves = np.vstack([v.reward_fraction for v in modular_curves]).mean(axis=0)\n",
"sem_modular_curves = sem(np.vstack([v.reward_fraction for v in holistic_curves]), axis=0)\n",
"\n",
"xaxis_scale = int(1e4)\n",
"yticks = np.around(np.linspace(0, 1, 6), 1)\n",
"xticks = np.around(np.linspace(0, 1.5e5, 4), 1)\n",
"xticklabels = [my_tickformatter(i, None) for i in xticks / xaxis_scale]\n",
"\n",
"with plt.xkcd():\n",
" fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
" ax = fig.add_subplot(1, 1, 1)\n",
" ax.set_title('Reward fraction during training', fontsize=fontsize, fontweight='bold')\n",
" ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)\n",
" plt.xticks(xticks, xticklabels, fontsize=fontsize)\n",
" plt.yticks(yticks, fontsize=fontsize)\n",
" ax.set_xlabel(r'Training trial ($\\times$10$^4$)', fontsize=fontsize + 1)\n",
" ax.set_ylabel('Reward fraction', fontsize=fontsize + 1)\n",
" ax.set_xlim(xticks[0], xticks[-1])\n",
" ax.set_ylim(yticks[0], yticks[-1])\n",
" ax.xaxis.set_label_coords(0.5, -0.3)\n",
" ax.yaxis.set_label_coords(-0.13, 0.5)\n",
" ax.yaxis.set_major_formatter(major_formatter)\n",
"\n",
" xdata = holistic_curves[0].episode.values\n",
" for ymean, ysem, color, label in zip([mean_holistic_curves, mean_modular_curves],\n",
" [sem_holistic_curves, sem_modular_curves],\n",
" [holistic_c, modular_c],\n",
" ['Holistic agent', 'Modular agent']):\n",
" ax.plot(xdata, ymean, lw=lw, clip_on=False, c=color, label=label)\n",
" ax.fill_between(xdata, ymean - ysem, ymean + ysem, edgecolor='None',\n",
" facecolor=color, alpha=0.4, clip_on=True)\n",
"\n",
" ax.legend(fontsize=fontsize, frameon=False, loc=[0.26, 0.1],\n",
" handletextpad=0.5, labelspacing=0.2, ncol=1, columnspacing=1)\n",
"\n",
" fig.tight_layout(pad=0.3, w_pad=0.5, rect=(0, 0, 1, 1))"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"**Conclusion for the training task:** The modular agent learned faster and developed more efficient behaviors than the holistic agent."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_comparing_performance_of_agents\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"---\n",
"# Section 3: A novel gain task\n",
"\n",
"Estimated timing to here from start of tutorial: 50 minutes"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Video 7: Novel Task\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"remove-input"
]
},
"outputs": [],
"source": [
"# @title Video 7: Novel Task\n",
"\n",
"from ipywidgets import widgets\n",
"from IPython.display import YouTubeVideo\n",
"from IPython.display import IFrame\n",
"from IPython.display import display\n",
"\n",
"class PlayVideo(IFrame):\n",
" def __init__(self, id, source, page=1, width=400, height=300, **kwargs):\n",
" self.id = id\n",
" if source == 'Bilibili':\n",
" src = f'https://player.bilibili.com/player.html?bvid={id}&page={page}'\n",
" elif source == 'Osf':\n",
" src = f'https://mfr.ca-1.osf.io/render?url=https://osf.io/download/{id}/?direct%26mode=render'\n",
" super(PlayVideo, self).__init__(src, width, height, **kwargs)\n",
"\n",
"def display_videos(video_ids, W=400, H=300, fs=1):\n",
" tab_contents = []\n",
" for i, video_id in enumerate(video_ids):\n",
" out = widgets.Output()\n",
" with out:\n",
" if video_ids[i][0] == 'Youtube':\n",
" video = YouTubeVideo(id=video_ids[i][1], width=W,\n",
" height=H, fs=fs, rel=0)\n",
" print(f'Video available at https://youtube.com/watch?v={video.id}')\n",
" else:\n",
" video = PlayVideo(id=video_ids[i][1], source=video_ids[i][0], width=W,\n",
" height=H, fs=fs, autoplay=False)\n",
" if video_ids[i][0] == 'Bilibili':\n",
" print(f'Video available at https://www.bilibili.com/video/{video.id}')\n",
" elif video_ids[i][0] == 'Osf':\n",
" print(f'Video available at https://osf.io/{video.id}')\n",
" display(video)\n",
" tab_contents.append(out)\n",
" return tab_contents\n",
"\n",
"video_ids = [('Youtube', 'wpfvgl1q9zg'), ('Bilibili', 'BV1Wb421v7Gi')]\n",
"tab_contents = display_videos(video_ids, W=730, H=410)\n",
"tabs = widgets.Tab()\n",
"tabs.children = tab_contents\n",
"for i in range(len(tab_contents)):\n",
" tabs.set_title(i, video_ids[i][0])\n",
"display(tabs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_novel_task_video\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"A good architecture should not only learn fast but also generalize better.\n",
"\n",
"One crucial parameter in our task is the joystick gain, which linearly maps motor actions (dimensionless, bounded in $[-1,1]$) to corresponding velocities in the environment, i.e., velocity=gain $\\cdot$ action (see the environment dynamics above). During training, the gain remains fixed at $200$ cm/s and $90^{\\circ}$/s for linear and angular components, referred to as the $1\\times$ gain.\n",
"\n",
"To test agents' generalization abilities, here, we increase the gain to $2\\times$. This means the maximum velocities in the environment are doubled to $400$ cm/s and $180^{\\circ}$/s. Note that agents were only exposed to the $1\\times$ gain during training. Therefore, if they use the same sequence of actions as in training with the $2\\times$ gain, they will overshoot targets.\n",
"\n",
"Let us evaluate modular and holistic agents with the same $1000$ sampled targets again, but now with the $2\\times$ gain."
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"
"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"execution": {}
},
"outputs": [],
"source": [
"gain_factor = 2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load holistic & modular agents\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Load holistic & modular agents\n",
"modular_agent = Agent(arg, ModularActor)\n",
"modular_agent.load(Path('agents/modular'), 0)\n",
"modular_dfs_2x = evaluation(modular_agent, gain_factor=gain_factor)\n",
"\n",
"holistic_agent = Agent(arg, HolisticActor)\n",
"holistic_agent.load(Path('agents/holistic'), 0)\n",
"holistic_dfs_2x = evaluation(holistic_agent, gain_factor=gain_factor)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Let us first visualize the modular agents' trajectories with $2\\times$ gain."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Modular agent's trajectories\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Modular agent's trajectories\n",
"\n",
"target_idexes = np.arange(0, 500)\n",
"df = modular_dfs_2x\n",
"\n",
"fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
"ax = fig.add_subplot(111)\n",
"ax.set_title(\"Modular agent's trajectories\\nwith \" +\n",
" r'$2\\times$gain', fontsize=fontsize, fontweight='bold')\n",
"ax.set_aspect('equal')\n",
"ax.spines['top'].set_visible(False)\n",
"ax.spines['right'].set_visible(False)\n",
"ax.spines['bottom'].set_visible(False)\n",
"ax.spines['left'].set_visible(False)\n",
"ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])\n",
"ax.set_xlim([-235, 235]); ax.set_ylim([-2, 430])\n",
"\n",
"ax.plot(np.linspace(0, 230 + 7), np.tan(np.deg2rad(55)) * np.linspace(0, 230 + 7) - 10,\n",
" c='k', ls=(0, (1, 1)), lw=lw)\n",
"for _, trial in df.iloc[target_idexes].iterrows():\n",
" ax.plot(trial.pos_x, trial.pos_y, c=modular_c, lw=0.3, ls='-', alpha=0.2)\n",
"\n",
"reward_idexes = df.rewarded.iloc[target_idexes].values\n",
"for label, mask, c in zip(['Rewarded', 'Unrewarded'], [reward_idexes, ~reward_idexes],\n",
" [reward_c, unreward_c]):\n",
" ax.scatter(*df.iloc[target_idexes].loc[mask, ['target_x', 'target_y']].values.T,\n",
" c=c, marker='o', s=1, lw=1.5, label=label)\n",
"\n",
"x_temp = np.linspace(-235, 235)\n",
"ax.plot(x_temp, np.sqrt(420**2 - x_temp**2), c='k', ls=(0, (1, 1)), lw=lw)\n",
"ax.text(-10, 425, s=r'70$\\degree$', fontsize=fontsize)\n",
"ax.text(130, 150, s=r'400 cm', fontsize=fontsize)\n",
"\n",
"ax.plot(np.linspace(-230, -130), np.linspace(0, 0), c='k', lw=lw)\n",
"ax.plot(np.linspace(-230, -230), np.linspace(0, 100), c='k', lw=lw)\n",
"ax.text(-230, 100, s=r'100 cm', fontsize=fontsize)\n",
"ax.text(-130, 0, s=r'100 cm', fontsize=fontsize)\n",
"\n",
"ax.text(-210, 30, s=\"Modular agent's\\ntrajectories\", fontsize=fontsize - 0.5, c=modular_c)\n",
"\n",
"ax.legend(fontsize=fontsize, frameon=False, loc=[0.56, 0.0],\n",
" handletextpad=-0.5, labelspacing=0, ncol=1, columnspacing=1)\n",
"\n",
"fig.tight_layout(pad=0)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Let us now visualize the holistic agents' trajectories with $2\\times$ gain."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Holistic agent's trajectories\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Holistic agent's trajectories\n",
"\n",
"target_idexes = np.arange(0, 500)\n",
"df = holistic_dfs_2x\n",
"\n",
"fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
"ax = fig.add_subplot(111)\n",
"ax.set_title(\"Holistic agent's trajectories\\nwith \" +\n",
" r'$2\\times$gain', fontsize=fontsize, fontweight='bold')\n",
"ax.set_aspect('equal')\n",
"ax.spines['top'].set_visible(False)\n",
"ax.spines['right'].set_visible(False)\n",
"ax.spines['bottom'].set_visible(False)\n",
"ax.spines['left'].set_visible(False)\n",
"ax.axes.xaxis.set_ticks([]); ax.axes.yaxis.set_ticks([])\n",
"ax.set_xlim([-235, 235]); ax.set_ylim([-2, 430])\n",
"\n",
"ax.plot(np.linspace(0, 230 + 7), np.tan(np.deg2rad(55)) * np.linspace(0, 230 + 7) - 10,\n",
" c='k', ls=(0, (1, 1)), lw=lw)\n",
"for _, trial in df.iloc[target_idexes].iterrows():\n",
" ax.plot(trial.pos_x, trial.pos_y, c=holistic_c, lw=0.3, ls='-', alpha=0.2)\n",
"\n",
"reward_idexes = df.rewarded.iloc[target_idexes].values\n",
"for label, mask, c in zip(['Rewarded', 'Unrewarded'], [reward_idexes, ~reward_idexes],\n",
" [reward_c, unreward_c]):\n",
" ax.scatter(*df.iloc[target_idexes].loc[mask, ['target_x', 'target_y']].values.T,\n",
" c=c, marker='o', s=1, lw=1.5, label=label)\n",
"\n",
"\n",
"x_temp = np.linspace(-235, 235)\n",
"ax.plot(x_temp, np.sqrt(420**2 - x_temp**2), c='k', ls=(0, (1, 1)), lw=lw)\n",
"ax.text(-10, 425, s=r'70$\\degree$', fontsize=fontsize)\n",
"ax.text(130, 150, s=r'400 cm', fontsize=fontsize)\n",
"\n",
"ax.plot(np.linspace(-230, -130), np.linspace(0, 0), c='k', lw=lw)\n",
"ax.plot(np.linspace(-230, -230), np.linspace(0, 100), c='k', lw=lw)\n",
"ax.text(-230, 100, s=r'100 cm', fontsize=fontsize)\n",
"ax.text(-130, 0, s=r'100 cm', fontsize=fontsize)\n",
"\n",
"ax.text(-210, 30, s=\"Holistic agent's\\ntrajectories\", fontsize=fontsize - 0.5, c=holistic_c)\n",
"\n",
"ax.legend(fontsize=fontsize, frameon=False, loc=[0.56, 0.0],\n",
" handletextpad=-0.5, labelspacing=0, ncol=1, columnspacing=1)\n",
"\n",
"fig.tight_layout(pad=0)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Discussion\n",
"\n",
"1. Do the two agents have the same generalization abilities?"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Again, let us load the saved evaluation data with the $2\\times$ gain for agents with all $8$ random seeds."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Load data\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Load data\n",
"with open('modular_dfs_2x.pkl', 'rb') as file:\n",
" modular_dfs_2x = pickle.load(file)\n",
"\n",
"with open('holistic_dfs_2x.pkl', 'rb') as file:\n",
" holistic_dfs_2x = pickle.load(file)"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Now, let's compare reward fraction with $2\\times$ gain."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Reward function comparison\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Reward function comparison\n",
"\n",
"with plt.xkcd():\n",
" xticks = [0, 1]; xticklabels = ['Modular', 'Holistic']\n",
" yticks = [0, 0.5, 1]\n",
"\n",
" fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
" ax = fig.add_subplot(111)\n",
" ax.set_title(\"Reward fraction comparison\\nwith \" +\n",
" r'$2\\times$gain', fontsize=fontsize, fontweight='bold')\n",
" ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)\n",
" plt.xticks(xticks, xticklabels, fontsize=fontsize)\n",
" plt.yticks(yticks, fontsize=fontsize)\n",
" ax.set_xlabel(r'', fontsize=fontsize + 1)\n",
" ax.set_ylabel('Reward fraction', fontsize=fontsize + 1)\n",
" ax.set_xlim(xticks[0] - 0.3, xticks[-1] + 0.3)\n",
" ax.set_ylim(yticks[0], yticks[-1])\n",
" ax.xaxis.set_label_coords(0.5, -0.15)\n",
" ax.yaxis.set_label_coords(-0.13, 0.5)\n",
" ax.yaxis.set_major_formatter(major_formatter)\n",
"\n",
" for (idx, dfs), c in zip(enumerate([modular_dfs_2x, holistic_dfs_2x]), [modular_c, holistic_c]):\n",
" ydata = [df.rewarded.sum() / len(df) for df in dfs]\n",
" ax.bar(idx, np.mean(ydata), width=0.7, color=c, alpha=1, zorder=0)\n",
" ax.scatter([idx] * len(ydata), ydata, c='crimson', marker='.',\n",
" s=10, lw=0.5, zorder=1, clip_on=False)\n",
"\n",
" fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Let us also compare traveled distance vs target distance with $2\\times$ gain for different agents."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Traveled distance comparison\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Traveled distance comparison\n",
"with plt.xkcd():\n",
" xticks = np.array([0, 250, 500])\n",
" yticks = xticks\n",
"\n",
" fig = plt.figure(figsize=(2, 2), dpi=200)\n",
" ax = fig.add_subplot(111)\n",
" ax.set_title(\"Traveled distance comparison\\nwith \" +\n",
" r'$2\\times$gain', fontsize=fontsize, fontweight='bold')\n",
" ax.set_aspect('equal')\n",
" ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)\n",
" plt.xticks(xticks, fontsize=fontsize)\n",
" plt.yticks(yticks, fontsize=fontsize)\n",
" ax.set_xlabel('Target distance (cm)', fontsize=fontsize + 1)\n",
" ax.set_ylabel('Traveled distance (cm)', fontsize=fontsize + 1)\n",
" ax.set_xlim(xticks[0], xticks[-1])\n",
" ax.set_ylim(yticks[0], yticks[-1])\n",
" ax.xaxis.set_label_coords(0.5, -0.25)\n",
" ax.yaxis.set_label_coords(-0.25, 0.5)\n",
" ax.yaxis.set_major_formatter(major_formatter)\n",
"\n",
" xdata_m = np.hstack([df.target_r for df in modular_dfs_2x])\n",
" ydata_m = np.hstack([df.pos_r_end for df in modular_dfs_2x])\n",
" xdata_h = np.hstack([df.target_r for df in holistic_dfs_2x])\n",
" ydata_h = np.hstack([df.pos_r_end for df in holistic_dfs_2x])\n",
"\n",
" sampled_trial_idx = np.random.choice(np.arange(ydata_m.size), size=1000, replace=False)\n",
"\n",
" ax.scatter(xdata_m[sampled_trial_idx], ydata_m[sampled_trial_idx], c=modular_c, s=1, alpha=0.1)\n",
" ax.scatter(xdata_h[sampled_trial_idx], ydata_h[sampled_trial_idx], c=holistic_c, s=1, alpha=0.1)\n",
"\n",
" model = LinearRegression(fit_intercept=False)\n",
" model.fit(xdata_m.reshape(-1, 1), ydata_m)\n",
" ax.plot(xticks, model.predict(xticks.reshape(-1, 1)), c=modular_c,\n",
" ls='-', lw=lw*2, label='Modular')\n",
"\n",
" model = LinearRegression(fit_intercept=False)\n",
" model.fit(xdata_h.reshape(-1, 1), ydata_h)\n",
" ax.plot(xticks, model.predict(xticks.reshape(-1, 1)), c=holistic_c,\n",
" ls='-', lw=lw*2, label='Holistic')\n",
"\n",
" ax.plot(xticks, yticks, c='k', ls='--', lw=lw)\n",
"\n",
" ax.legend(fontsize=fontsize, frameon=False, loc=[0.45, 0.1],\n",
" handletextpad=0.5, labelspacing=0.2, ncol=1, columnspacing=1)\n",
"\n",
" fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"### Discussion:\n",
"1. What have you found? Which agent generalized better?"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_novel_gain_task\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"---\n",
"# Summary\n",
"\n",
"*Estimated timing of tutorial: 1 hour*\n",
"\n",
"In this tutorial, we explored the difference in agents' performance based on their architecture. We revealed that modular architecture, with separate modules for learning different aspects of behavior, is superior to a holistic architecture with a single module. The modular architecture with stronger inductive bias achieves good performance faster and has the capability to generalize to other tasks as well. Intriguingly, this modularity is a property we also observe in the brains, which could be important for generalization in the brain as well."
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"---\n",
"# Bonus Section 1: Decoding analysis"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"Although we have confirmed that the modular agent exhibited better generalization performance in the gain task, the underlying mechanism remains unclear. One reasonable hypothesis is that the modular agent has a robust belief that remains accurate with the $2\\times$ gain. Therefore, this robust belief can support good actions. Ultimately, agents should be aware of its location to avoid overshooting.\n",
"\n",
"To test this hypothesis, we recorded the activities of RNN neurons in the agents' actors, as these neurons are responsible for computing the beliefs underlying actions. Since beliefs should represent the agents' locations in the environment, we used linear regression (with $\\ell_2$ regularization) to decode agents' locations from the recorded RNN activities.\n",
"\n",
"We defined the decoding error, representing the Euclidean distance between the true and decoded locations, as an indicator of belief accuracy."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load data\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Load data\n",
"modular_decoder_data = [fit_decoder(df) for df in modular_dfs]\n",
"modular_decoder_data_2x = [fit_decoder(df) for df in modular_dfs_2x]\n",
"modular_decoding_error = np.hstack([np.linalg.norm(v[2] - v[0].predict(v[1]), axis=1) for v\n",
" in modular_decoder_data])\n",
"modular_decoding_error_2x = np.hstack([np.linalg.norm(v[2] - v[0].predict(v[1]), axis=1) for v\n",
" in modular_decoder_data_2x])\n",
"\n",
"holistic_decoder_data = [fit_decoder(df) for df in holistic_dfs]\n",
"holistic_decoder_data_2x = [fit_decoder(df) for df in holistic_dfs_2x]\n",
"holistic_decoding_error = np.hstack([np.linalg.norm(v[2] - v[0].predict(v[1]), axis=1) for v\n",
" in holistic_decoder_data])\n",
"holistic_decoding_error_2x = np.hstack([np.linalg.norm(v[2] - v[0].predict(v[1]), axis=1) for v\n",
" in holistic_decoder_data_2x])"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"We plot the distribution of the decoding error for every step in every trial across random seeds for each agent, under $1×$ or $2×$ gain."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Decoding error comparison\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Decoding error comparison\n",
"\n",
"with plt.xkcd():\n",
" xticks = np.array([0, 1]); xticklabels = ['1x (train)', '2x (test)']\n",
" yticks = np.linspace(0, 120, 5)\n",
"\n",
" fig = plt.figure(figsize=(2.2, 1.7), dpi=200)\n",
" ax = fig.add_subplot(111)\n",
" ax.set_title('Decoding error comparison', fontsize=fontsize, fontweight='bold')\n",
" ax.spines['top'].set_visible(False); ax.spines['right'].set_visible(False)\n",
" plt.xticks(xticks, xticklabels, fontsize=fontsize)\n",
" plt.yticks(yticks, fontsize=fontsize)\n",
" ax.set_xlabel(r'Gain', fontsize=fontsize + 1)\n",
" ax.set_ylabel('Decoding error (cm)', fontsize=fontsize + 1)\n",
" ax.set_xlim(xticks[0] - 0.2, xticks[-1] + 0.2)\n",
" ax.set_ylim(yticks[0], yticks[-1])\n",
" ax.xaxis.set_label_coords(0.5, -0.15)\n",
" ax.yaxis.set_label_coords(-0.2, 0.5)\n",
" ax.yaxis.set_major_formatter(major_formatter)\n",
"\n",
" violin1 = ax.violinplot(filter_fliers([modular_decoding_error, modular_decoding_error_2x]),\n",
" positions=xticks - 0.1, showmeans=True, widths=0.1)\n",
" set_violin_plot(violin1, facecolor=modular_c, edgecolor=modular_c)\n",
"\n",
" violin2 = ax.violinplot(filter_fliers([holistic_decoding_error, holistic_decoding_error_2x]),\n",
" positions=xticks + 0.1, showmeans=True, widths=0.1)\n",
" set_violin_plot(violin2, facecolor=holistic_c, edgecolor=holistic_c)\n",
"\n",
" ax.legend([violin1['bodies'][0], violin2['bodies'][0]], ['Modular', 'Holistic'],\n",
" fontsize=fontsize, frameon=False, loc=[0.3, 0.7],\n",
" handletextpad=0.5, labelspacing=0.2, ncol=1, columnspacing=1)\n",
"\n",
" fig.tight_layout(pad=0.1, rect=(0, 0, 1, 1))"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"While both agents demonstrated small decoding errors with the training gain, we observed that the holistic agent, which struggled with generalization at the $2×$ gain, also exhibited reduced accuracy in determining its own location. This helps explain why the modular agent can generalize better at higher gains."
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"**Conclusion for the gain task:** In the gain task, the modular agent exhibited better generalization than the holistic agent, supported by a more accurate internal belief when faced with larger gains."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_decoding_analysis\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"execution": {}
},
"source": [
"---\n",
"# Bonus Section 2: Generalization, but no free lunch\n",
"\n",
"The No Free Lunch theorems proved that no inductive bias can excel across all tasks. It has been studied in the [paper](https://www.science.org/doi/10.1126/sciadv.adk1256) that agents with a modular architecture can acquire the underlying structure of the training task. In contrast, holistic agents tend to acquire different knowledge than modular agents during training, such as forming beliefs based on unreliable information sources or exhibiting less efficient control actions. The novel gain task has a structure similar to the training task, consequently, a modular agent that accurately learns the training task's structure can leverage its knowledge in these novel tasks.\n",
"\n",
"However, it is worth noting that an infinite number of new tasks can be constructed, diverging from the training task's structure but aligning with the 'inferior' beliefs and control acquired by holistic agents.\n",
"\n",
"For example, although we have seen that the holistic agent developed inefficient trajectories with higher curvature, which is detrimental for the training task, if we consider a new task with a different reward function, such as preferring more curved trajectories, the holistic agent should perform better in this particular task than the modular agent. *There is no free lunch*."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Submit your feedback\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"cellView": "form",
"execution": {},
"tags": [
"hide-input"
]
},
"outputs": [],
"source": [
"# @title Submit your feedback\n",
"content_review(f\"{feedback_prefix}_generalization_but_no_free_lunch\")"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [],
"include_colab_link": true,
"name": "W2D1_Tutorial3",
"provenance": [],
"toc_visible": true
},
"kernel": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.19"
}
},
"nbformat": 4,
"nbformat_minor": 4
}