Задать вопрос
@mrlkk

Как мне заставить нейросеть типа обучение с подкреплением работать для моего подводного робота?

Пишу нейросеть для управления подводным роботом в MurIDE, может кто знает такое. Использую библиотеку PyTorch. Уже накидал кое-какую нейросеть, робот даже начал двигаться случайным образом (для меня это уже достижение). Задача такая - у робота есть Yaw, Pitch, Roll и Depth. Я хочу задавать ему целевые YPRD, а он должен в них приходить под управлением нейронки. У робота 5 моторов. Соответственно, среда возвращает мне 4 значения YRPD на вход агента, а агент выдает 5 значений мощностей моторов.
Но моя нейронка постоянно просто упирается в угол и ничего не делает. Я пробовал подкрутить функцию вознаграждений, но выходит какая-то фигня, если честно.
Очень надеюсь, что тут найдутся знающие люди, для которых такая задачка - раз плюнуть. Помогите!

#main.py
spoiler
import pymurapi as mur
from agent import Agent
import numpy as np
import time

auv = mur.mur_init()
start_time = None
desired_state = [50, 0, 0, 1]

# Функция для получения текущего состояния подводного аппарата
def get_current_state():
    yaw = auv.get_yaw()
    pitch = auv.get_pitch()
    roll = auv.get_roll()
    depth = auv.get_depth()
    return [yaw, pitch, roll, depth]


def calculate_reward(desired_state, current_state, previous_state):
    # Вычисление квадратичной ошибки между текущим и желаемым состоянием
    current_error = abs(np.array(desired_state) - np.array(current_state))
    previous_error = abs(np.array(desired_state) - np.array(previous_state))
    error = np.array(current_error) - np.array(previous_error)

    # print("Errors: ")
    # print("current ", current_error)
    # print("previous ", previous_error)
    # print("разница ", error)

    # Устанавливаем весовые коэффициенты для ошибки
    error[0] *= 2
    error[1] *= 1
    error[2] *= 1
    error[3] *= 100

    global_error = np.sum(abs(current_error))
    if global_error < 1:
        reward = 5000
    else:
        reward = -np.sum(error) * 10

    print(reward)

    return reward




def check_if_done(total_reward):
    global start_time

    # Если время начала эпизода не задано, установите его
    if start_time is None:
        start_time = time.time()

    # Получите текущее время
    current_time = time.time()

    if total_reward >= 10000:
        return True

    # Проверьте, прошло ли уже 30 секунд
    if current_time - start_time >= 5:
        return True
    else:
        return False


# Функция для применения действия на подводный аппарат
def take_action(action, output_dim, previous_state, total_reward):
    for i, power in enumerate(action):
        auv.set_motor_power(i, power*15)


    time.sleep(0.1)
    next_state = get_current_state()
    reward = calculate_reward(desired_state, next_state, previous_state)
    done = check_if_done(total_reward)
    return next_state, reward, done


if __name__ == '__main__':
    agent = Agent(input_dim=4, output_dim=5)
    num_episodes = 50000

    for episode in range(num_episodes):
        # Сброс симуляции и получение начального состояния
        auv.yaw = 0
        auv.pitch = 0
        auv.roll = 0
        auv.depth = 0
        current_state = get_current_state()


        done = False
        total_reward = 0.0
        num_steps = 0

        while not done:
            # Генерация действия нейросетью
            action = agent.generate_action(current_state)
            # print(action)

            # Применение действия и получение следующего состояния, вознаграждения и флага завершения
            next_state, reward, done = take_action(action, agent.output_dim, current_state, total_reward)

            # Обновление нейросети на основе полученных данных (пример)
            agent.update(current_state, action, reward, next_state, done)

            # Обновление текущего состояния
            current_state = next_state

            total_reward += reward
            num_steps += 1

        start_time = time.time()
        print("Episode:", episode, "Total Reward:", total_reward)
        total_reward = 0


#agent.py
spoiler
import torch
import torch.nn as nn
import torch.optim as optim
import random

class Agent:
    def __init__(self, input_dim, output_dim):
        self.input_dim = input_dim
        self.output_dim = output_dim

        self.model = None
        self.optimizer = None

        # Вызывается методом initialize_model
        self.initialize_model()

    def initialize_model(self):
        # Define the architecture of the neural network
        self.model = nn.Sequential(
            nn.Linear(self.input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 64),
            nn.ReLU(),
            nn.Linear(64, self.output_dim)  # Adjusted output dimension
        )

        self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)


    def generate_action(self, state, epsilon=0.01):
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        with torch.no_grad():
            action_values = self.model(state_tensor)

        if random.random() < epsilon:
            # Random action
            action = [random.randint(-10, 10) for _ in range(self.output_dim)]
        else:
            # Best action based on the model's predictions
            action = action_values.squeeze(0).tolist()

        return action

    def update(self, state, action, reward, next_state, done):
        state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
        action_tensor = torch.zeros((1, self.output_dim), dtype=torch.float32)  # Изменение размерности action_tensor
        reward_tensor = torch.tensor(reward, dtype=torch.float32).unsqueeze(0)
        next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)


        # Compute the target Q-values
        with torch.no_grad():
            next_state_values = self.model(next_state_tensor).max(1)[0].unsqueeze(0)
            target_q_values = reward_tensor + (1 - done) * next_state_values

        # Compute the current Q-values for the chosen actions
        current_q_values = self.model(state_tensor)  # Нет необходимости в gather

        # Select the Q-values for the chosen action
        current_q_values = torch.sum(current_q_values * action_tensor, dim=1, keepdim=True)

        # Compute the loss and update the neural network
        loss = nn.functional.smooth_l1_loss(current_q_values, target_q_values)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def save_model(self, filepath):
        torch.save(self.model.state_dict(), filepath)

    def load_model(self, filepath):
        self.model.load_state_dict(torch.load(filepath))
        self.model.eval()
  • Вопрос задан
  • 92 просмотра
Подписаться 1 Сложный 3 комментария
Пригласить эксперта
Ответы на вопрос 1
@AlexSku
не буду отвечать из-за модератора
А модель точно учиться? А то в двух местах no_grad() написано. (я-то в других средах обучал, поэтому тут не всё понимаю)
Ещё я не совсем понял, для чего initialize_model(). Обычно у модели прописывается forward(self, observation).
Ответ написан
Ваш ответ на вопрос

Войдите, чтобы написать ответ

Похожие вопросы