почему методы итерации политики и итерации значений дают разные результаты для оптимальных значений и оптимальной политики?

В настоящее время я изучаю динамическое программирование в обучении с подкреплением, в котором я столкнулся с двумя концепциями Value-Iteration и Policy-Iteration. Чтобы понять то же самое, я реализую пример gridworld из Sutton, который говорит:

Нетерминальными состояниями являются S = {1, 2, . . . , 14}. В каждом состоянии возможны четыре действия, A = {вверх, вниз, вправо, влево}, которые детерминистически вызывают соответствующие переходы между состояниями, за исключением тех действий, которые уводят агента из сетки, фактически оставляя состояние неизменным. Так, например, p(6, 1|5, справа) = 1, p(7, 1|7, справа) = 1 и p(10, r |5, справа) = 0 для всех r, принадлежащих R , Это недисконтированное, эпизодическое задание. Вознаграждение равно -1 на все переходы, пока не будет достигнуто конечное состояние. Терминальное состояние на рисунке заштриховано (хотя и показано в двух местах, формально это одно состояние). Таким образом, функция ожидаемого вознаграждения равна r(s, a, s') = 1 для всех состояний s, s' и действий a. Предположим, что агент следует равновероятной случайной политике (все действия равновероятны).

Ниже приведена реализация двух методов

class GridWorld:
    def __init__(self,grid_size = 5, gamma = 0.9, penalty = -1.0, theta = 1e-2):
        self.grid_size = grid_size
        self.discount = gamma
        self.actions = [np.array([0, -1]),
                   np.array([-1, 0]),
                   np.array([0, 1]),
                   np.array([1, 0])]
        self.action_prob = 1/len(self.actions)
        self.theta = theta
        print('action prob : ',self.action_prob)
        self.penalty_reward = penalty
        self.re_init()

    def re_init(self):
        self.values = np.zeros((self.grid_size, self.grid_size))
        self.policy = np.zeros(self.values.shape, dtype=np.int)

    def checkTerminal(self,state):
        x, y = state
        if x == 0 and y == 0:
            return 1
        elif (x == self.grid_size - 1 and y == self.grid_size - 1):
            return 1
        else : 
            return 0


    def step(self, state, action):
        #print(state)
        if self.checkTerminal(state):
            next_state = state
            reward = 0
        else:
            next_state = (np.array(state) + action).tolist()
            x, y = next_state
            if x < 0 or x >= self.grid_size or y < 0 or y >= self.grid_size:
                next_state = state
            reward = self.penalty_reward     

        return next_state, reward


    def compValueIteration(self):
        new_state_values = np.zeros((self.grid_size, self.grid_size))
        policy = np.zeros((self.grid_size, self.grid_size))
        iter_cnt = 0
        while True:
            #delta = 0
            state_values = new_state_values.copy()
            old_state_values = state_values.copy()

            for i in range(self.grid_size):
                for j in range(self.grid_size):
                    values = []
                    for action in self.actions:
                        (next_i, next_j), reward = self.step([i, j], action)
                        values.append(reward + self.discount*state_values[next_i, next_j])
                    new_state_values[i, j] = np.max(values)
                    policy[i, j] = np.argmax(values)
                    #delta = max(delta, np.abs(old_state_values[i, j] - new_state_values[i, j]))
            delta = np.abs(old_state_values - new_state_values).max()        
            print(f'Difference: {delta}')
            if delta < self.theta:
                break
            iter_cnt += 1

        return new_state_values, policy, iter_cnt



    def policyEvaluation(self,policy,new_state_values):
        #new_state_values = np.zeros((self.grid_size, self.grid_size))
        iter_cnt = 0
        while True:
            delta = 0 
            state_values = new_state_values.copy()
            old_state_values = state_values.copy()
            for i in range(self.grid_size):
                for j in range(self.grid_size):
                    action = policy[i, j]
                    (next_i, next_j), reward = self.step([i, j], action)
                    value = self.action_prob * (reward + self.discount * state_values[next_i, next_j])
                    new_state_values[i, j] = value
                    delta = max(delta, np.abs(old_state_values[i, j] - new_state_values[i, j]))

            print(f'Difference: {delta}')
            if delta < self.theta:
                break
            iter_cnt += 1

        return new_state_values


    def policyImprovement(self, policy, values, actions):

        #expected_action_returns = np.zeros((self.grid_size, self.grid_size, np.size(actions)))
        policy_stable = True

        for i in range(self.grid_size):
            for j in range(self.grid_size):
                old_action = policy[i, j]
                act_cnt = 0
                expected_rewards = []
                for action in self.actions:
                    (next_i, next_j), reward = self.step([i, j], action)
                    expected_rewards.append(self.action_prob * (reward + self.discount * values[next_i, next_j]))
                #max_reward = np.max(expected_rewards)
                #new_action = np.random.choice(np.where(expected_rewards == max_reward)[0])
                new_action = np.argmax(expected_rewards)
                #print('new_action : ',new_action)
                #print('old_action : ',old_action)
                if old_action != new_action:
                    policy_stable = False
                policy[i, j] = new_action

        return policy, policy_stable



    def compPolicyIteration(self):
        iterations = 0
        total_start_time = time.time()

        while True:
            start_time = time.time()
            self.values = self.policyEvaluation(self.policy, self.values)
            elapsed_time = time.time() - start_time
            print(f'PE => Elapsed time {elapsed_time} seconds')
            start_time = time.time()

            self.policy, policy_stable = self.policyImprovement(self.policy,self.values, self.actions)
            elapsed_time = time.time() - start_time
            print(f'PI => Elapsed time {elapsed_time} seconds')

            if policy_stable:
                break

            iterations += 1

        total_elapsed_time = time.time() - total_start_time
        print(f'Optimal policy is reached after {iterations} iterations in {total_elapsed_time} seconds')
        return self.values, self.policy

Но обе мои реализации дают разные оптимальные политики и оптимальные значения. Я следовал точно алгоритмам, данным в книге.

Результат повторения политики:

values :
[[ 0.         -0.33300781 -0.33300781 -0.33300781]
 [-0.33300781 -0.33300781 -0.33300781 -0.33300781]
 [-0.33300781 -0.33300781 -0.33300781 -0.33300781]
 [-0.33300781 -0.33300781 -0.33300781  0.        ]]
****************************************************************************************************
****************************************************************************************************
policy :
[[0 0 0 0]
 [1 0 0 0]
 [0 0 0 3]
 [0 0 2 0]]

Результат с итерацией значения:

values :
[[ 0.0 -1.0 -2.0 -3.0]
 [-1.0 -2.0 -3.0 -2.0]
 [-2.0 -3.0 -2.0 -1.0]
 [-3.0 -2.0 -1.0  0.0]]
****************************************************************************************************
****************************************************************************************************
[[0. 0. 0. 0.]
 [1. 0. 0. 3.]
 [1. 0. 2. 3.]
 [1. 2. 2. 0.]]

Кроме того, итерация значения сходится после 4 итераций, тогда как итерация политики после 2 итераций.

Где я ошибся? Могут ли они дать разные оптимальные политики? Но то, что я считаю написанным в книге, после 3-й итерации значения, которые мы получаем, являются оптимальными. Тогда должны быть некоторые проблемы с итерацией политики моего кода, которые я не вижу. В принципе, как мне исправить политику?


person POOJA GUPTA    schedule 08.09.2019    source источник
comment
пожалуйста, посмотрите, у меня фиксированная политика с действием 0 всегда для итерации политики, это ошибка?   -  person POOJA GUPTA    schedule 09.09.2019
comment
Обратите внимание, что здесь мы предпочитаем технический стиль письма. Мы мягко препятствуем приветствию, надежде на помощь, благодарности, предварительной благодарности, благодарственным письмам, приветствиям, добрым пожеланиям, подписям, пожалуйста, не могли бы вы помочь, болтовне и сокращенным txtspk, мольбам, как долго вы застряли, советы по голосованию, мета-комментарии и т. д. Просто объясните свою проблему и покажите, что вы пробовали, чего ожидали и что на самом деле произошло.   -  person halfer    schedule 14.09.2020


Ответы (1)


Я предполагаю, что проблема в этих строках:

(1) value = self.action_prob * (вознаграждение + self.discount * state_values[next_i, next_j])
(2) new_state_values[i, j] = ценность

Здесь вы напрямую назначаете ценность, которую получаете от только одного действия. Если вы посмотрите на уравнение ожидания Беллмана, то увидите, что в начале есть сумма для всех действий. Вы должны рассмотреть все действия из состояния, выполнить расчет (1) для всех возможных действий, просуммировать их и присвоить эту сумму в (2) новому значению. для (i,j).

person Sarsa    schedule 02.10.2019
comment
спасибо за ответ, но если вы внимательно посмотрите, это детерминистическая политика, что означает, что для каждого состояния есть детерминированное действие. Здесь изначально я зафиксировал политику с действием 0 в качестве действия для всех состояний, и, следовательно, вероятность должна быть равна 1, т. е. pi(a/s) = 0, поэтому нам не нужно суммировать. На самом деле я принимал неверную вероятность pi (a/s) за 0,25, теперь, когда я изменил это значение на 1, оно не сходится. Вы можете заглянуть в него? - person POOJA GUPTA; 06.10.2019