Skip to content

Instantly share code, notes, and snippets.

@SPOREIII
Last active March 27, 2024 07:43
Show Gist options
  • Save SPOREIII/586d3813d4ef2e38e47f36899cf1621f to your computer and use it in GitHub Desktop.
Save SPOREIII/586d3813d4ef2e38e47f36899cf1621f to your computer and use it in GitHub Desktop.
An example of differentiable programming using Taichi-Lang
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"检验taichi与torch的协同,将taichi编写的仿真器封装在'torch.autograd.Function'中,使用torch编写控制器并管理计算图,实现可微物理仿真的反向传播\n",
"\n",
"- 目标:给定小球初始状态(位置与初速度),控制质量点移动至目标位置且末速度为0\n",
"- 控制器\n",
" - 输入:当前位置(x, y),当前速度(x, y),当前位置与目标位置的欧式距离,当前位置相对目标位置的偏移量(x, y)\n",
" - 输出:当前时刻小球的加速度(x, y)\n",
"- 优化目标\n",
" - 极小化结束位置与目标位置的欧式距离\n",
" - 极小化结束位置的速度\n",
" - 极小化输出的加速度之和(外力做功最少)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"定义参数"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Taichi] version 1.6.0, llvm 15.0.1, commit f1c6fbbd, win, python 3.10.12\n",
"[Taichi] Starting on arch=cuda\n"
]
}
],
"source": [
"from tqdm import tqdm\n",
"import taichi as ti\n",
"import numpy as np\n",
"import torch\n",
"from torch import nn\n",
"import time\n",
"\n",
"ti.init(arch=ti.cuda, debug=True)\n",
"dt = 0.2\n",
"cto_opt_iter=50\n",
"sim_step = 50\n",
"act_step = 5\n",
"vloss_weight = 1.0\n",
"# device = 'cpu'\n",
"device = \"cuda:0\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"定义控制网络(MLP)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"class NeuralNetwork(nn.Module):\n",
" def __init__(self):\n",
" super().__init__()\n",
" self.linear_relu_stack = nn.Sequential(\n",
" nn.Linear(7, 16),\n",
" # nn.Dropout(0.2),\n",
" # nn.ReLU(),\n",
" # nn.Linear(16, 16),\n",
" # nn.Dropout(0.2),\n",
" # nn.ReLU(),\n",
" # nn.Linear(16, 32),\n",
" # nn.ReLU(),\n",
" # nn.Linear(32, 64),\n",
" # nn.ReLU(),\n",
" # nn.Linear(64, 128),\n",
" # nn.ReLU(),\n",
" # nn.Linear(128, 16),\n",
" # nn.Dropout(0.2),\n",
" # nn.ReLU(),\n",
" # nn.Linear(32, 32),\n",
" # nn.ReLU(),\n",
" # nn.Linear(32, 8),\n",
" nn.ReLU(),\n",
" nn.Linear(16, 2),\n",
" # nn.Dropout(0.2),\n",
" # nn.Tanh()\n",
" )\n",
"\n",
" def forward(self, x, v, d, g):\n",
" inp = torch.cat((x,v,d,g))\n",
" inp = self.linear_relu_stack(inp)\n",
" inp = torch.clamp(nn.LeakyReLU(0.1)(inp), -1, 1)\n",
" return inp"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"定义仿真器在每个时间步的迭代计算\n",
"- 在前向过程中载入小球速度与位移\n",
"- 在后向过程中清零各个变量的梯度,(从torch中)载入输出变量的梯度"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"@ti.data_oriented\n",
"class Simulator():\n",
" def __init__(self):\n",
" self.x_in = ti.Vector.field(2, ti.f32, shape=(), needs_grad=True)\n",
" self.v_in = ti.Vector.field(2, ti.f32, shape=(), needs_grad=True)\n",
" self.x_out = ti.Vector.field(2, ti.f32, shape=(), needs_grad=True)\n",
" self.v_out = ti.Vector.field(2, ti.f32, shape=(), needs_grad=True)\n",
" self.goal_x = ti.Vector.field(2, ti.f32, shape=(), needs_grad=True)\n",
" self.goal_v = ti.Vector.field(2, ti.f32, shape=(), needs_grad=True)\n",
" self.act = ti.Vector.field(2, ti.f32, shape=(), needs_grad=True)\n",
" \n",
" @ti.kernel\n",
" def _step(self):\n",
" self.v_out[None] = self.v_in[None] + self.act[None] * dt\n",
" self.x_out[None] = self.x_in[None] + self.v_out[None] * dt\n",
" \n",
" def update(self, x_in, v_in, act):\n",
" self.x_in.from_torch(x_in)\n",
" self.v_in.from_torch(v_in)\n",
" self.act.from_torch(act)\n",
" self._step()\n",
" return self.x_out.to_torch(device=device),\\\n",
" self.v_out.to_torch(device=device)\n",
"\n",
" def backward(self, x_in, v_in, act, x_grad_in, v_grad_in):\n",
" self.x_out.grad.fill(0.0)\n",
" self.v_out.grad.fill(0.0)\n",
" self.goal_x.grad.fill(0.0)\n",
" self.goal_v.grad.fill(0.0)\n",
" self.x_in.grad.fill(0.0)\n",
" self.v_in.grad.fill(0.0)\n",
" self.act.grad.fill(0.0)\n",
"\n",
" self.x_out.grad.from_torch(x_grad_in)\n",
" self.v_out.grad.from_torch(v_grad_in)\n",
" self.x_in.from_torch(x_in)\n",
" self.v_in.from_torch(v_in)\n",
" self.act.from_torch(act)\n",
" \n",
" self._step.grad()\n",
" return self.x_in.grad.to_torch(device=device),\\\n",
" self.v_in.grad.to_torch(device=device), \\\n",
" self.act.grad.to_torch(device=device)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"将仿真器封装在'torch.autograd.Function'内"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"class Sim_module(torch.autograd.Function):\n",
"\n",
" @staticmethod\n",
" def forward(ctx, x_in, v_in, act):\n",
" ctx.save_for_backward(x_in, v_in, act)\n",
" return sim.update(x_in, v_in, act)\n",
"\n",
" @staticmethod\n",
" @torch.autograd.function.once_differentiable\n",
" def backward(ctx, x_grad_in, v_grad_in):\n",
" x_in, v_in, act = ctx.saved_tensors\n",
" return sim.backward(x_in, v_in, act, x_grad_in, v_grad_in)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"定义训练方法"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"def train_controller(init, goal):\n",
" controller.train()\n",
" pbar = tqdm(range(cto_opt_iter))\n",
" for _ in pbar:\n",
" x = torch.tensor(init['pos'], requires_grad=True).cuda()\n",
" v = torch.tensor(init['vel'], requires_grad=True).cuda()\n",
" goal_pos = torch.tensor(goal['pos'], requires_grad=True).cuda()\n",
" goal_vel = torch.tensor(goal['vel'], requires_grad=True).cuda()\n",
" a_sum = 0\n",
" for _ in range(sim_step):\n",
" d = torch.unsqueeze((x - goal_pos).norm(), dim=0)\n",
" \n",
" a = controller(x, v, d, x-goal_pos)\n",
" for _ in range(act_step):\n",
" x, v = Sim_module.apply(x, v, a)\n",
" a_sum += a.norm()\n",
" \n",
" loss = (x - goal_pos).norm() + (v - goal_vel).norm()*vloss_weight + a_sum*0.2\n",
" pbar.set_description(f\"Loss: {loss.item():.4f}\")\n",
" loss.backward()\n",
" nn.utils.clip_grad_norm_(controller.parameters(), max_norm=1)\n",
" optimizer.step()\n",
" optimizer.zero_grad()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"定义在新任务上运行仿真并可视化的方法"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def visualize_trajectory(init, goal):\n",
" controller.eval()\n",
" gui = ti.GUI('view', res=(400, 400))\n",
" x = torch.tensor(init['pos'], requires_grad=True).cuda()\n",
" v = torch.tensor(init['vel'], requires_grad=True).cuda()\n",
" goal_vis = np.expand_dims(goal['pos']/50, axis=0) + [0.5, 0.5]\n",
" init_vis = np.expand_dims(init['pos']/50, axis=0) + [0.5, 0.5]\n",
"\n",
" goal_vel = torch.tensor(goal['vel'], requires_grad=True).cuda()\n",
" goal_pos = torch.tensor(goal['pos'], requires_grad=True).cuda()\n",
"\n",
" pbar = tqdm(range(sim_step))\n",
" a_sum = 0\n",
" for _ in pbar:\n",
" d = torch.unsqueeze((x - goal_pos).norm(), dim=0)\n",
" a = controller(x, v, d, x-goal_pos)\n",
" for _ in range(act_step):\n",
" x, v = Sim_module.apply(x, v, a)\n",
" a_sum += a.norm()\n",
"\n",
" loss = (x - goal_pos).norm() + (v - goal_vel).norm()*vloss_weight + a_sum*0.2\n",
" pbar.set_description(f\"Loss: {loss.item():.4f}\")\n",
"\n",
" gui.circles(goal_vis, radius=5, color=0xff0000)\n",
" gui.circles(init_vis, radius=5, color=0xFFFFFF)\n",
" gui.circles(x.detach().clone().cpu().unsqueeze(dim=0).numpy()/50 + [0.5, 0.5], \n",
" radius=5, color=0xFFFFFF)\n",
" gui.show()\n",
" time.sleep(0.02)\n",
" gui.running = False"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"定义训练数据集的产生方法,包含三个难度的控制任务\n",
"- level0:随机初始位置,初速度为0,固定目标位置\n",
"- level1:随机初始位置,初速度为0,随机目标位置\n",
"- level2:随机初始位置,随机初速度,随机目标位置"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def generate_train_sample(level=0):\n",
" if level == 0:\n",
" init = {\n",
" 'pos': np.random.uniform(-10, 10, size=(2)).astype(np.float32),\n",
" 'vel': np.random.uniform(0, 0, size=(2)).astype(np.float32)\n",
" }\n",
" goal = {\n",
" 'pos': np.random.uniform(0, 0, size=(2)).astype(np.float32),\n",
" 'vel': np.array([0.0, 0.0], dtype=np.float32)\n",
" }\n",
" if level == 1:\n",
" init = {\n",
" 'pos': np.random.uniform(-10, 10, size=(2)).astype(np.float32),\n",
" 'vel': np.random.uniform(0, 0, size=(2)).astype(np.float32)\n",
" }\n",
" goal = {\n",
" 'pos': np.random.uniform(-10, 10, size=(2)).astype(np.float32),\n",
" 'vel': np.array([0.0, 0.0], dtype=np.float32)\n",
" }\n",
" if level == 2:\n",
" init = {\n",
" 'pos': np.random.uniform(-10, 10, size=(2)).astype(np.float32),\n",
" 'vel': np.random.uniform(-5, 5, size=(2)).astype(np.float32)\n",
" }\n",
" goal = {\n",
" 'pos': np.random.uniform(-10, 10, size=(2)).astype(np.float32),\n",
" 'vel': np.array([0.0, 0.0], dtype=np.float32)\n",
" }\n",
"\n",
" return init, goal"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"实例化定义的对象"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"sim = Simulator()\n",
"controller = NeuralNetwork().cuda()\n",
"optimizer = torch.optim.Adam(controller.parameters(), lr=0.005)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"训练控制器"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" 0%| | 0/50 [00:00<?, ?it/s]"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Loss: 1318.6252: 100%|██████████| 50/50 [01:29<00:00, 1.79s/it]\n",
"Loss: 6.5467: 100%|██████████| 50/50 [01:24<00:00, 1.69s/it] \n",
"Loss: 2.7758: 100%|██████████| 50/50 [01:28<00:00, 1.78s/it]\n",
"Loss: 0.4294: 100%|██████████| 50/50 [01:24<00:00, 1.68s/it] \n",
"Loss: 0.2740: 100%|██████████| 50/50 [01:11<00:00, 1.43s/it]\n",
"Loss: 0.2614: 100%|██████████| 50/50 [01:11<00:00, 1.42s/it]\n",
"Loss: 5.9343: 100%|██████████| 50/50 [01:16<00:00, 1.53s/it]\n",
"Loss: 7.3399: 10%|█ | 5/50 [00:10<01:28, 1.97s/it]"
]
}
],
"source": [
"for _ in range(5):\n",
" init, goal = generate_train_sample(0)\n",
" train_controller(init, goal)\n",
" # visualize_trajectory(init, goal)\n",
"for _ in range(5):\n",
" init, goal = generate_train_sample(1)\n",
" train_controller(init, goal)\n",
"for _ in range(5):\n",
" init, goal = generate_train_sample(2)\n",
" train_controller(init, goal)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"测试控制器"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" 0%| | 0/50 [00:00<?, ?it/s]"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"Loss: 36.1536: 100%|██████████| 50/50 [00:01<00:00, 30.02it/s] \n"
]
}
],
"source": [
"init = {'pos':np.array([10.0, 7.0], dtype=np.float32), \n",
" 'vel':np.array([-15.0, 14.0], dtype=np.float32)}\n",
"goal = {'pos':np.array([-10.0, -10.0], dtype=np.float32), \n",
" 'vel':np.array([0.0, 0.0], dtype=np.float32)}\n",
"visualize_trajectory(init, goal)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "taichi",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment