Огляд прикладу застосування навчання з підкріпленням з використанням TensorFlow

КПДВ. У Karpathy game грає нейронна мережа

Всім привіт!
Я думаю, що багато чули про Google DeepMind. Про те як вони навчають програми грати в ігри Atari краще людини. Сьогодні я хочу представити вам статтю про те, як зробити щось подібне. Дана стаття — це огляд ідеї та код приклад застосування Q-learning, що є приватним випадком навчання з підкріпленням. Приклад заснований на статті співробітників Google DeepMind.

Гра

У прикладі розглядається гра Karpathy game. Вона зображена на КПДВ. Суть її полягає в наступному: необхідно управляти жовтою кулькою таким чином, щоб «є» зелені кульки і не є червоні і помаранчеві. За помаранчеві дається більший штраф, ніж за червоні. У жовтого кульки є радіально розходяться відрізки відповідальні за зір (в програмі вони називаються eye). З допомогою такого відрізка програма відчуває тип найближчого об'єкта в напрямку відрізка, його швидкість видалення. Тип об'єкта може бути колір кульки або стіна. Набір вхідних даних виходить наступним: дані з кожного ока і власна швидкість. Вихідні дані — це команди управління жовтою кулькою. По суті — це прискорення по чотирьох напрямах: вгору, ліворуч, вниз, вправо.

Ідея



Багатошаровий перцептрон. Картинка з вікіпедії

В цю гру грає багатошаровий перцептрон. На вході у нього описані вище вхідні дані. На виході корисність кожного з можливих дій. Перцептрон навчається методом зворотного поширення помилки. А конкретно використовується метод RMSProp. Особливість його в тому, що він використовує для оптимізації відразу пачку прикладів, але це не єдина його особливість. Щоб дізнатися більше про метод можете подивитися ці слайди. Вони розповідають не тільки про RMSProp. Нічого краще я поки не знайшов. Помилка виходу нейронної мережі обчислюється за допомогою того самого Q-learning.

TensorFlow

Майже все це можна більш менш легко закодить не заглиблюючись у написання власних реалізацій алгоритмів, завдяки нещодавно побачила світ бібліотеці TensorFlow. Програмування з використанням бібліотеки зводиться до опису графа обчислень, необхідних для отримання результату. Потім цей граф відправляється в сесію TensorFlow, де й виробляються самі обчислення. RMSProp взятий цілком з TensorFlow. Нейронна мережа реалізована на матрицях звідти ж. Q-learning реалізований також на звичайних операціях TensorFlow.

Код

Тепер давайте подивимося на найцікавіші місця коду прикладу.

models.py — багатошаровий перцептрон
import math
import tensorflow as tf

from .utils import base_name

# Для початку подивіться нижче клас MLP
# Один шар перцептрона
class Layer(object):
# input_sizes - масив кількостей входів, чому масив см конструктор MLP нижче
# output_size - кількість виходів
# scope - рядок, змінні TensorFlow можна організовувати в скоупы
# для зручного перевикористання (см https://www.tensorflow.org/versions/master/how_tos/variable_scope/index.html)
def __init__(self, input_sizes, output_size, scope):
"""Cretes a neural network layer."""
if type(input_sizes) != list:
input_sizes = [input_sizes]

self.input_sizes = input_sizes
self.output_size = output_size
self.scope = scope or "Layer"

# входимо в скоуп
with tf.variable_scope(self.scope):
# масив нейронів
self.Ws = []
for input_idx, input_size in enumerate(input_sizes):
# ідентифікатор нейрона
W_name = "W_%d" % (input_idx,)
# инициализатор ваг нейрона - рівномірний розподіл
W_initializer = tf.random_uniform_initializer(
-1.0 / math.sqrt(input_size), 1.0 / math.sqrt(input_size))
# створення нейрона - як матриці input_size x output_size
W_var = tf.get_variable(W_name, (input_size, output_size), initializer=W_initializer)
self.Ws.append(W_var)
# створення вектора вільних членів шару
# цей вектор буде додано до виходів нейронів шару
self.b = tf.get_variable("b", (output_size,), initializer=tf.constant_initializer(0))

# використання шару нейронів
# xs - вектор вхідних значень
# повертає вектор вихідних значень
def __call__(self, xs):
if type(xs) != list:
xs = [xs]
assert len(xs) == len(self.Ws), \
"Expected %d input vectors, got %d" % (len(self.Ws), len(xs))
with tf.variable_scope(self.scope):
# розрахунок вихідних значень
# так як кожен нейрон - матриця
# то вектор вихідних значень - це сума
# множень матриць-нейронів на вхідний вектор + вектор вільних членів 
return sum([tf.matmul(x, W) for x, W in zip(xs, self.Ws)]) + self.b

# повертає список параметрів шару
# це потрібно для роботи алгоритму зворотного поширення помилки
def variables(self):
return [self.b] + self.Ws

def copy(self, scope=None):
scope = scope or self.scope + "_copy"

with tf.variable_scope(scope) as sc:
for v in self.variables():
tf.get_variable(base_name(v), v.get_shape(),
initializer=lambda x,dtype=tf.float32: v.initialized_value())
sc.reuse_variables()
return Layer(self.input_sizes, self.output_size, scope=sc)

# Багатошаровий перцептрон
class MLP(object):

# input_sizes - масив розмірів вхідних шарів, не знаю навіщо,
# але тут реалізована підтримка декількох вхідних шарів,
# виглядає це як один вхідний шар розділений на частини,
# за фактом ця можливість не використовується, тобто вхідний шар один
# hiddens - масив розмірів прихованих шарів, за фактом
# використовується 2 прихованих шару по 100 нейронів
# і вихідний шар - 4 нейрона, що цікаво, не робити нічого
# нейромережа не може, такого варіанту у неї немає
# nonlinearities - масив передатних функцій нейронів шарів, про передавальні функції см <a href="https://ru.wikipedia.org/wiki/%D0%98%D1%81%D0%BA%D1%83%D1%81%D1%81%D1%82%D0%B2%D0%B5%D0%BD%D0%BD%D1%8B%D0%B9_%D0%BD%D0%B5%D0%B9%D1%80%D0%BE%D0%BD">Штучний нейрон</a>
# scope - рядок, змінні TensorFlow можна організовувати в скоупы
# для зручного перевикористання
# given_layers - можна передати вже створені шари
def __init__(self, input_sizes, hiddens, nonlinearities, scope=None, given_layers=None):
self.input_sizes = input_sizes
self.hiddens = hiddens
self.input_nonlinearity, self.layer_nonlinearities = nonlinearities[0], nonlinearities[1:]
self.scope = scope or "MLP"

assert len(hiddens) == len(nonlinearities), \
"Number of hiddens must be equal to number of nonlinearities"

with tf.variable_scope(self.scope):
if given_layers is not None:
# використовувати передані шари
self.input_layer = given_layers[0]
self.layers = given_layers[1:]
else:
# створити шари
# створення вхідного шару
self.input_layer = Layer(input_sizes, hiddens[0], scope="input_layer")
self.layers = []
# створити приховані шари
for l_idx, (h_from, h_to) in enumerate(zip(hiddens[:-1], hiddens[1:])):
self.layers.append(Layer(h_from, h_to, scope="hidden_layer_%d" % (l_idx,)))

# використання нейромережі
# xs - вектор вхідних значень
# повертається вихід вихідного шару
def __call__(self, xs):
if type(xs) != list:
xs = [xs]
with tf.variable_scope(self.scope):
# застосування вхідного шару до вектора вхідних значень
hidden = self.input_nonlinearity(self.input_layer(xs))
for layer, nonlinearity in zip(self.layers, self.layer_nonlinearities):
# застосування прихованих шарів в виходів попередніх шарів
hidden = nonlinearity(layer(hidden))
return hidden

# список параметрів всій нейронної мережі від вхідного до вихідного шару
def variables(self):
res = self.input_layer.variables()
for layer in self.layers:
res.extend(layer.variables())
return res

def copy(self, scope=None):
scope = scope or self.scope + "_copy"
nonlinearities = [self.input_nonlinearity] + self.layer_nonlinearities
given_layers = [self.input_layer.copy()] + [layer.copy() for layer in self.layers]
return MLP(self.input_sizes, self.hiddens, nonlinearities, scope=scope,
given_layers=given_layers)


discrete_deepq.py — реалізація Q-learning
import numpy as np
import random
import tensorflow as tf

from collections import deque

class DiscreteDeepQ(object):
# Опис параметрів нижче
def __init__(self, observation_size,
num_actions,
observation_to_actions,
optimizer,
session,
random_action_probability=0.05,
exploration_period=1000,
store_every_nth=5,
train_every_nth=5,
minibatch_size=32,
discount_rate=0.95,
max_experience=30000,
target_network_update_rate=0.01,
summary_writer=None):
# Цей великий коментар я просто переведу нижче
"""Initialized the Deepq object.

Based on:
https://www.cs.toronto.edu/~vmnih/docs/dqn.pdf

Parameters
-------
observation_size : int
length of the vector passed as observation
num_actions : int
number of actions that the model can execute
observation_to_actions: dali model
model that implements activate function
that can take in observation vector or a batch
and returns scores (of life values) for each
action for each observation.
input shape: [batch_size, observation_size]
output shape: [batch_size, num_actions]
optimizer: tf.solver.*
optimizer prediction for error
session: tf.Session
session on which to execute the computation
random_action_probability: float (0 to 1)
exploration_period: int
probability of choosing a random
action (epsilon form paper) annealed linearly
from 1 to over random_action_probability
exploration_period
store_every_nth: int
to further decorrelate samples do not all
transitions, but rather every nth transition.
For example, if store_every_nth is 5, then
only 20% of all the transitions is stored.
train_every_nth: int
normally training_step is invoked every
time action is executed. Depending on the
setup that might be too often. When this
variable is set set to n, then only every
n-th time training_step is called will
the training procedure actually be executed.
minibatch_size: int
number of state,action,reward,newstate
tuples considered during experience reply
dicount_rate: float (0 to 1)
how much we care about future rewards.
max_experience: int
maximum size of the reply buffer
target_network_update_rate: float
how much to update target network after each
iteration. Let's call target_network_update_rate
alpha, target network T, and network N. Every
time N gets updated we execute:
T = (1-alpha)*T + alpha*N
summary_writer: tf.train.SummaryWriter
writer to log metrics
"""

"""Ініціалізація Deepq

Засноване на:
https://www.cs.toronto.edu/~vmnih/docs/dqn.pdf

Параметри
-------
observation_size : int
довжина вектора вхідних даних (цей вектор
будемо називати спостереженням або станом)

num_actions : int
кількість можливих дій або ж
довжина вектора вихідних даних нейромережі

observation_to_actions: dali model
модель (в нашому випадку нейромережа),
яка приймає спостереження або набір спостережень
і повертає оцінку очками кожної дії або
набір оцінок для кожної дії кожного зі спостережень
вхідний розмір: матриця [batch_size, observation_size]
вихідний розмір: матриця [batch_size, num_actions]

optimizer: tf.solver.*
алгоритм розрахунку обратого поширення помилки
в нашому випадку буде використовуватися RMSProp

session: tf.Session
сесія TensorFlow в якій проводиться обчислення

random_action_probability: float (0 to 1)
ймовірність випадкового дії,
для обогощения досвіду нейромережі та поліпшення качесва управління
з певною ймовірністю виконується випадкова дія, а не
дія видане нейромережею

exploration_period: int
період пошукового поведінки в ітераціях,
протягом якого ймовірність випадкового виконання
дії падає від 1 до random_action_probability

store_every_nth: int
параметр потрібен щоб зберігати не всі навчальні приклади
а тільки певну частину з них.
Збереження відбувається один раз на вказане в параметрі
кількість повчальних прикладів

train_every_nth: int
зазвичай training_step (крок навчання)
запускається після кожної дії.
Іноді виходить так, що це занадто часто.
Ця змінна вказує скільки кроків
пропустити перед тим як запускати крок навчання

minibatch_size: int
розмір набору навчальних прикладів який
використовується на одному кроці навчання
алгоритмом RMSProp.
Навчальний приклад включає в себе
стан, зроблене дію, нагороду і
новий стан

dicount_rate: float (0 to 1)
параметр Q-learning
наскільки сильно впливає майбутня нагорода за
розрахунку користі дії

max_experience: int
максимальна кількість збережених
навчальних прикладів

target_network_update_rate: float
параметр швидкості навчання нейромережі,
тут використовується 2 нейромережі
T - target_q_network
вона використовується для розрахунку вкладу майбутньої користі і
N - q_network
вона испольщуется для вибору дії,
також ця мережа піддається навчанню
методом зворотного поширення помилки.
Мережа T з певною швидкістю прагне до мережі N.
Кожен раз при навчанні N,
Т модифікується таким чином:
alpha = target_network_update_rate
T = (1-alpha)*T + alpha*N

summary_writer: tf.train.SummaryWriter
запис логів
"""


# memorize arguments
self.observation_size = observation_size
self.num_actions = num_actions

self.q_network = observation_to_actions
self.optimizer = optimizer
self.s = session

self.random_action_probability = random_action_probability
self.exploration_period = exploration_period
self.store_every_nth = store_every_nth
self.train_every_nth = train_every_nth
self.minibatch_size = minibatch_size
self.discount_rate = tf.constant(discount_rate)
self.max_experience = max_experience
self.target_network_update_rate = \
tf.constant(target_network_update_rate)

# deepq state
self.actions_executed_so_far = 0
self.experience = deque()

self.iteration = 0
self.summary_writer = summary_writer

self.number_of_times_store_called = 0
self.number_of_times_train_called = 0

self.create_variables()

# розрахунок ймовірності випадкового дії
# з урахуванням зменшення з ітераціями
# (лінійний відпал)
def linear_annealing(self, n, total, p_initial, p_final):
"""Linear annealing between p_initial and p_final
over total steps - computes value at step n"""
if n >= total:
return p_final
else:
return p_initial - (n * (p_initial - p_final)) / (total)

# створення графів для TensorFlow
# для розрахунку керуючої дії
# і реалізації Q-learning
def create_variables(self):
# створення нейромережі T копіюванням з вихідної нейромережі N
self.target_q_network = self.q_network.copy(scope="target_network")

# розрахунок керуючої дії
# FOR REGULAR ACTION SCORE COMPUTATION
with tf.name_scope("taking_action"):
# вхідні дані вектора стану
self.observation = tf.placeholder(tf.float32, (None, self.observation_size), name="observation")
# розрахувати окуляри оцінки корисності кожної дії
self.action_scores = tf.identity(self.q_network(self.observation), name="action_scores")
tf.histogram_summary("action_scores", self.action_scores)
# взяти дію з максимальною кількістю очок
self.predicted_actions = tf.argmax(self.action_scores, dimension=1, name="predicted_actions")

# розрахунок майбутньої користі
with tf.name_scope("estimating_future_rewards"):
# FOR PREDICTING TARGET FUTURE REWARDS
# вхідний параметр - майбутні стану
self.next_observation = tf.placeholder(tf.float32, (None, self.observation_size), name="next_observation")
# вхідний параметр - маски майбутніх станів
self.next_observation_mask = tf.placeholder(tf.float32, (None,), name="next_observation_mask")
# оцінки корисності
self.next_action_scores = tf.stop_gradient(self.target_q_network(self.next_observation))
tf.histogram_summary("target_action_scores", self.next_action_scores)
# вхідний параметр - нагороди
self.rewards = tf.placeholder(tf.float32, (None,), name="rewards")
# взяти максимальні оцінки корисностей дій
target_values = tf.reduce_max(self.next_action_scores, reduction_indices=[1,]) * self.next_observation_mask
# r + DF * MAX(Q,s) см статтю про Q-learning у вікіпедії
self.future_rewards = self.rewards + self.discount_rate * target_values

# обученте мережі N 
with tf.name_scope("q_value_precition"):
# PREDICTION FOR ERROR
# вхідний параметр маски дій в наборі навчальних прикладів
self.action_mask = tf.placeholder(tf.float32, (None, self.num_actions), name="action_mask")
# розрахунок корисностей дій набору навчальних прикладів
self.masked_action_scores = tf.reduce_sum(self.action_scores * self.action_mask, reduction_indices=[1,])
# різниці поточних корисностей і майбутніх
# - (r + DF * MAX(Q,s) — Q[s',a'])
temp_diff = self.masked_action_scores - self.future_rewards
# ключовий момент навчання мережі
# RMSProp мінімізує середнє від вищевказаних різниць
self.prediction_error = tf.reduce_mean(tf.square(temp_diff))
# робота RMSProp, перший крок - обчислення градієнтів
gradients = self.optimizer.compute_gradients(self.prediction_error)
for i, (grad, var) in enumerate(gradients):
if grad is not None:
gradients[i] = (tf.clip_by_norm(grad, 5), var)
# Add histograms for gradients.
for grad, var in gradients:
tf.histogram_summary(var.name, var)
if grad:
tf.histogram_summary(var.name + '/gradients', grad)
# другий крок - оптимізація параметрів нейромережі
self.train_op = self.optimizer.apply_gradients(gradients)

# те саме місце де настроюється мережа T
# T = (1-alpha)*T + alpha*N
# UPDATE TARGET NETWORK
with tf.name_scope("target_network_update"):
self.target_network_update = []
for v_source, v_target in zip(self.q_network.variables(), self.target_q_network.variables()):
# this is equivalent to target = (1-alpha) * target + alpha * source
update_op = v_target.assign_sub(self.target_network_update_rate * (v_target - v_source))
self.target_network_update.append(update_op)
self.target_network_update = tf.group(*self.target_network_update)

# summaries 
tf.scalar_summary("prediction_error", self.prediction_error)

self.summarize = tf.merge_all_summaries()
self.no_op1 = tf.no_op()

# управління
def action(self, observation):
"""Given observation returns the action that should be chosen using
DeepQ learning strategy. Does not backprop."""
assert len(observation.shape) == 1, \
"Action is performed based on single observation."

self.actions_executed_so_far += 1
# розрахунок ймовірності випадкового дії
exploration_p = self.linear_annealing(self.actions_executed_so_far,
self.exploration_period,
1.0,
self.random_action_probability)

if random.random() < exploration_p:
# випадкова дія
return random.randint(0, self.num_actions - 1)
else:
# дія вибране нейромережею
return self.s.run(self.predicted_actions, {self.observation: observation[np.newaxis,:]})[0]

# збереження навчального прикладу
# навчальний приклади беруться з дій нейромережі
# під час управління
def store(self, observation, action, reward, newobservation):
"""Store experience, where starting with observation and
execution action, we arrived at the newobservation and got thetarget_network_update
reward reward

If newstate is None the state/action pair is assumed to be terminal
"""
if self.number_of_times_store_called % self.store_every_nth == 0:
self.experience.append((observation, action, reward, newobservation))
if len(self.experience) > self.max_experience:
self.experience.popleft()
self.number_of_times_store_called += 1

# крок навчання
def training_step(self):
"""Pick a self.minibatch_size exeperiences reply from buffer
and backpropage the value function.
"""
if self.number_of_times_train_called % self.train_every_nth == 0:
if len(self.experience) < self.minibatch_size:
return

# усього збереженого досвіду випадково вибираємо
# пачку з minibatch_size навчальних прикладів
# sample experience.
samples = random.sample(range(len(self.experience)), self.minibatch_size)
samples = [self.experience[i] for i in samples]

# представляємо навчальні приклади
# в потрібному вигляді
# bach states
states = np.empty(len(samples), self.observation_size))
newstates = np.empty(len(samples), self.observation_size))
action_mask = np.zeros((len(samples), self.num_actions))

newstates_mask = np.empty(len(samples),))
rewards = np.empty(len(samples),))

for i, (state, action, reward, newstate) in enumerate(samples):
states[i] = state
action_mask[i] = 0
action_mask[i][action] = 1
rewards[i] = reward
if newstate is not None:
newstates[i] = newstate
newstates_mask[i] = 1
else:
newstates[i] = 0
newstates_mask[i] = 0


calculate_summaries = self.iteration % 100 == 0 and \
self.summary_writer is not None

# запускаємо обчислення
# спочатку вважаємо помилку мережі
# потім запускаємо оптимізацію мережі
# далі збираємо статистику (необов'язковий крок
# потрібний для побудови графіків навчання)
cost, _, summary_str = self.s.run([
self.prediction_error,
self.train_op,
self.summarize if calculate_summaries else self.no_op1,
], {
self.observation: states,
self.next_observation: newstates,
self.next_observation_mask: newstates_mask,
self.action_mask: action_mask,
self.rewards: rewards,
})

# підлаштовуємо нейромережа Т
self.s.run(self.target_network_update)

if calculate_summaries:
self.summary_writer.add_summary(summary_str, self.iteration)

self.iteration += 1

self.number_of_times_train_called += 1


karpathy_game.py — гра, в яку грає нейромережа
import math
import matplotlib.pyplot as plt
import numpy as np
import random
import time

from collections import defaultdict
from euclid import Circle, Point2, Vector2, LineSegment2

import tf_rl.utils.svg as svg

# Ігровий об'єкт
# це кульку певного кольору
# даний клас розраховує переміщення
# зіткнення і займається промальовкою
class GameObject(object):
def __init__(self, position, speed, obj_type, settings):
"""Esentially represents circles of different kinds, which have
position and speed."""
self.settings = settings
self.radius = self.settings["object_radius"]

self.obj_type = obj_type
self.position = position
self.speed = speed
self.bounciness = 1.0

def wall_collisions(self):
"""Update speed upon collision with the wall."""
world_size = self.settings["world_size"]

for dim in range(2):
if self.position[dim] - self.radius <= 0 and self.speed[dim] < 0:
self.speed[dim] = - self.speed[dim] * self.bounciness
elif self.position[dim] + self.radius + 1 >= world_size[dim] and self.speed[dim] > 0:
self.speed[dim] = - self.speed[dim] * self.bounciness

def move(self, dt):
"""Move as if dt seconds passed"""
self.position += dt * self.speed
self.position = Point2(*self.position)

def step(self, dt):
"""Move and bounce of walls."""
self.wall_collisions()
self.move(dt)

def as_circle(self):
return Circle(self.position, float(self.radius))

def draw(self):
"""Return svg object for this item."""
color = self.settings["colors"][self.obj_type]
return svg.Circle(self.position + Point2(10, 10), self.radius, color=color)

# Гра. Тут все досить просто
# Спочатку, у відповідності з налаштуваннями,
# створюються стінки і об'єкт, яким управляє
# алгоритм. Тут я не буду коментувати
# всі, так як тут^ в принципі, за кодом зрозуміло,
# що відбувається, нижче відкоментую функцію
# observe, так як вона має безпосереднє
# ставлення до вхідних даних алгоритму
class KarpathyGame(object):
def __init__(self, settings):
"""Initiallize game simulator with settings"""
self.settings = settings
self.size = self.settings["world_size"]
self.walls = [LineSegment2(Point2(0,0), Point2(0,self.size[1])),
LineSegment2(Point2(0,self.size[1]), Point2(self.size[0], self.size[1])),
LineSegment2(Point2(self.size[0], self.size[1]), Point2(self.size[0], 0)),
LineSegment2(Point2(self.size[0], 0), Point2(0,0))]

self.hero = GameObject(Point2(*self.settings["hero_initial_position"]),
Vector2(*self.settings["hero_initial_speed"]),
"hero",
self.settings)
if not self.settings["hero_bounces_off_walls"]:
self.hero.bounciness = 0.0

self.objects = []
for obj_type, number in settings["num_objects"].items():
for _ in range(number):
self.spawn_object(obj_type)

self.observation_lines = self.generate_observation_lines()

self.object_reward = 0
self.collected_rewards = []

# Кожний радіальний відрізок бачить об'єкт або стінку
# і два числа, що представляють собою швидкість об'єкта
# every observation_line sees one of objects or wall and
# two numbers representing speed of the object (if applicable)
self.eye_observation_size = len(self.settings["objects"]) + 3
# і, врешті, до стану додаються
# два числа - швидкість керованого об'єкта
# additionally there are two numbers representing agents own speed.
self.observation_size = self.eye_observation_size * len(self.observation_lines) + 2
self.last_observation = np.zeros(self.observation_size)

self.directions = [Vector2(*d) for in d [[1,0], [0,1], [-1,0],[0,-1]]]
self.num_actions = len(self.directions)

self.objects_eaten = defaultdict(lambda: 0)

def perform_action(self, action_id):
"""Change speed to one of hero vectors"""
assert 0 <= action_id < self.num_actions
self.hero.speed *= 0.8
self.hero.speed += self.directions[action_id] * self.settings["delta_v"]

def spawn_object(self, obj_type):
"""Spawn object of a given type and add it to the objects array"""
radius = self.settings["object_radius"]
position = np.random.uniform([radius, radius], np.array(self.size) - radius)
position = Point2(float(position[0]), float(position[1]))
max_speed = np.array(self.settings["maximum_speed"])
speed = np.random.uniform(-max_speed, max_speed).astype(float)
speed = Vector2(float(speed[0]), float(speed[1]))

self.objects.append(GameObject(position, speed, obj_type, self.settings))

def step(self, dt):
"""Simulate all the objects for a given ammount of time.

Also resolve collisions with the hero"""
for obj in self.objects + [self.hero] :
obj.step(dt)
self.resolve_collisions()

def squared_distance(self, p1, p2):
return (p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2

def resolve_collisions(self):
"""If hero touches, hero eats. Also reward gets updated."""
collision_distance = 2 * self.settings["object_radius"]
collision_distance2 = collision_distance ** 2
to_remove = []
for obj in self.objects:
if self.squared_distance(self.hero.position, obj.position) < collision_distance2:
to_remove.append(obj)
for obj in to_remove:
self.objects.remove(obj)
self.objects_eaten[obj.obj_type] += 1
self.object_reward += self.settings["object_reward"][obj.obj_type]
self.spawn_object(obj.obj_type)

def inside_walls(self, point):
"""Check if the point is inside the walls"""
EPS = 1e-4
return (EPS <= point[0] < self.size[0] - EPS and
EPS <= point[1] < self.size[1] - EPS)

# повертає вектор стану
def observe(self):
"""Return observation vector. For all the observation directions it returns representation
of the closest object to the hero - might be nothing, another object or a wall.
Representation of observation for all the directions will be concatenated.
"""
num_obj_types = len(self.settings["objects"]) + 1 # and wall
max_speed_x, max_speed_y = self.settings["maximum_speed"]

# відстань видимості
observable_distance = self.settings["observation_line_length"]

# одержання всіх об'єктів в зоні видимості
relevant_objects = [obj for obj in self.objects
if obj.position.distance(self.hero.position) < observable_distance]
# сортування об'єктів за відстанню
# спочатку ближні
# objects sorted from closest to furthest
relevant_objects.sort(key=lambda x: x.position.distance(self.hero.position))

observation = np.zeros(self.observation_size)
observation_offset = 0
# починаємо перебирати відрізки зору
for i, observation_line in enumerate(self.observation_lines):
# shift to position hero
observation_line = LineSegment2(self.hero.position + Vector2(*observation_line.p1),
self.hero.position + Vector2(*observation_line.p2))

observed_object = None
# перевіряємо чи бачимо ми стіну
# if end of observation line is outside of walls, we see the wall.
if not self.inside_walls(observation_line.p2):
observed_object = "**wall**"
# перебираємо об'єкти в зоні видимості
for obj in relevant_objects:
if observation_line.distance(obj.position) < self.settings["object_radius"]:
# знайшли об'єкт
observed_object = obj
break
# параметри знайденого об'єкта
# тип, швидкість і відстань до нього
object_type_id = None
speed_x, speed_y = 0, 0
proximity = 0
if observed_object == "**wall**": # wall seen
# бачимо стіну
object_type_id = num_obj_types - 1
# у прикладі стіна завжди має
# нульовою швидкістю, я подумав,
# що краще, все таки, використовувати
# її відносну швидкість
# в результаті
# якість управління покращився

# a wall has fairly low speed...
# speed_x, speed_y = 0, 0
# I think relative speed is better than absolute
speed_x, speed_y = tuple (-self.hero.speed)
# best candidate is between intersection
# observation_line and a wall, that's
# closest to the hero
best_candidate = None
for wall in self.walls:
candidate = observation_line.intersect(wall)
if candidate is not None:
if (best_candidate is None or
best_candidate.distance(self.hero.position) >
candidate.distance(self.hero.position)):
best_candidate = candidate
if best_candidate is None:
# assume it is due to rounding errors
# and wall is barely touching observation line
proximity = observable_distance
else:
proximity = best_candidate.distance(self.hero.position)
elif observed_object is not None: # agent seen
# бачимо об'єкт
# тип об'єкта
object_type_id = self.settings["objects"].index(observed_object.obj_type)
# тут я теж використовував швидкість щодо
# керованого об'єкта
speed_x, speed_y = tuple(observed_object.speed - self.hero.speed)
intersection_segment = obj.as_circle().intersect(observation_line)
assert intersection_segment is not None
# обчислення відстань до об'єкту
try:
proximity = min(intersection_segment.p1.distance(self.hero.position),
intersection_segment.p2.distance(self.hero.position))
except AttributeError:
proximity = observable_distance
for object_type_idx_loop in range(num_obj_types):
# тут 1.0 означає відсутність в полі видимості
# об'єкта заданого типу
observation[observation_offset + object_type_idx_loop] = 1.0
if object_type_id is not None:
# якщо об'єкт знайдений у комірці типу об'єкта
# задається відстань менше від 0.0 до 1.0
# відстань вимірюється відносно довжини відрізка
observation[observation_offset + object_type_id] = proximity / observable_distance
# швидкість знайденого об'єкта
observation[observation_offset + num_obj_types] = speed_x / max_speed_x
observation[observation_offset + num_obj_types + 1] = speed_y / max_speed_y
assert num_obj_types + 2 == self.eye_observation_size
observation_offset += self.eye_observation_size

# після заповнення даних зі всіх відрізків
# додається швидкість керованого об'єкта
observation[observation_offset] = self.hero.speed[0] / max_speed_x
observation[observation_offset + 1] = self.hero.speed[1] / max_speed_y
assert observation_offset + 2 == self.observation_size

self.last_observation = observation
return observation

def distance_to_walls(self):
"""Returns distance of a hero to walls"""
res = float('inf')
for wall in self.walls:
res = min(res, self.hero.position.distance(wall))
return res - self.settings["object_radius"]

def collect_reward(self):
"""Return accumulated object eating score + current distance to walls score"""
wall_reward = self.settings["wall_distance_penalty"] * \
np.exp(-self.distance_to_walls() / самоврядування.settings["tolerable_distance_to_wall"])
assert wall_reward < 1e-3, "You are rewarding hero for being close to the wall!"
total_reward = wall_reward + self.object_reward
self.object_reward = 0
self.collected_rewards.append(total_reward)
return total_reward

def plot_reward(self, smoothing = 30):
"""Plot evolution of reward over time."""
plottable = self.collected_rewards[:]
while len(plottable) > 1000:
for i in range(0, len(plottable) - 1, 2):
plottable[i//2] = (plottable[i] + plottable[i+1]) / 2
plottable = plottable[:(len(plottable) // 2)]
x = []
for i in range(smoothing, len(plottable)):
chunk = plottable[i-smoothing:i]
x.append(sum(chunk) / len(chunk))
plt.plot(list(range(len(x))), x)

def generate_observation_lines(self):
"""Generate observation segments in settings["num_observation_lines"] directions"""
result = []
start = Point2(0.0, 0.0)
end = Point2(self.settings["observation_line_length"],
self.settings["observation_line_length"])
for angle in np.linspace(0, 2*np.pi, self.settings["num_observation_lines"], endpoint=False):
rotation = Point2(math.cos(angle), math.sin(angle))
current_start = Point2(start[0] * rotation[0], start[1] * rotation[1])
current_end = Point2(end[0] * rotation[0], end[1] * rotation[1])
result.append( LineSegment2(current_start, current_end))
return result

def _repr_html_(self):
return self.to_html()

def to_html(self, stats=[]):
"""Return svg representation of the simulator"""

stats = stats[:]
recent_reward = self.collected_rewards[-100:] + [0]
objects_eaten_str = ', '.join(["%s: %s" % (o,c) for o,c in self.objects_eaten.items()])
stats.extend([
"nearest wall = %.1f" % (self.distance_to_walls(),),
"reward = %.1f" % (sum(recent_reward)/len(recent_reward),),
"objects eaten => %s" % (objects_eaten_str,),
])

scene = svg.Scene((self.size[0] + 20, self.size[1] + 20 + 20 * len(stats)))
scene.add(svg.Rectangle((10, 10), self.size))


num_obj_types = len(self.settings["objects"]) + 1 # and wall

observation_offset = 0;
for line in self.observation_lines:
# getting color of the line
linecolor = 'black';
linewidth = '1px';
for object_type_idx_loop in range(num_obj_types):
if self.last_observation[observation_offset + object_type_idx_loop] < 1.0:
if object_type_idx_loop < num_obj_types - 1:
linecolor = self.settings["colors"][self.settings["objects"][object_type_idx_loop]];
linewidth = '3px';

observation_offset += self.eye_observation_size 

scene.add(svg.Line(line.p1 + self.hero.position + Point2(10,10),
line.p2 + self.hero.position + Point2(10,10),
color = linecolor,
stroke = linecolor,
stroke_width = linewidth))

for obj in self.objects + [self.hero] :
scene.add(obj.draw())

offset = self.size[1] + 15
for in txt stats:
scene.add(svg.Text((10, offset + 20), txt, 15))
offset += 20

return scene


Якщо ви захочете подивитися, як все це працює вам буде необхідний IPython Notebook. Так як все це збирається воєдино в сценарії для нього. Сценарій знаходиться за адресою notebooks/karpathy_game.ipynb.

Результат

Поки писав статтю, запустив на кілька годин навчання. Нижче відео: як у мене в підсумку навчилася сітка за досить невеликий час.


Куди рухатися далі

Далі я планую спробувати впровадити цей метод у свій віртуальний квадрокоптер. Спочатку хочу спробувати зробити стабілізацію. Потім, якщо вийде, спробую зробити щоб воно літало, але там вже, швидше за все, знадобиться сверточная мережу замість багатошарового перцептрона.

Приклад дбайливо викладений на гитхаб користувачем nivwusquorum, за що хочеться висловити йому величезне людське спасибі.

Джерело: Хабрахабр

0 коментарів

Тільки зареєстровані та авторизовані користувачі можуть залишати коментарі.