import pymurapi as mur
from agent import Agent
import numpy as np
import time
auv = mur.mur_init()
start_time = None
desired_state = [50, 0, 0, 1]
# Функция для получения текущего состояния подводного аппарата
def get_current_state():
yaw = auv.get_yaw()
pitch = auv.get_pitch()
roll = auv.get_roll()
depth = auv.get_depth()
return [yaw, pitch, roll, depth]
def calculate_reward(desired_state, current_state, previous_state):
# Вычисление квадратичной ошибки между текущим и желаемым состоянием
current_error = abs(np.array(desired_state) - np.array(current_state))
previous_error = abs(np.array(desired_state) - np.array(previous_state))
error = np.array(current_error) - np.array(previous_error)
# print("Errors: ")
# print("current ", current_error)
# print("previous ", previous_error)
# print("разница ", error)
# Устанавливаем весовые коэффициенты для ошибки
error[0] *= 2
error[1] *= 1
error[2] *= 1
error[3] *= 100
global_error = np.sum(abs(current_error))
if global_error < 1:
reward = 5000
else:
reward = -np.sum(error) * 10
print(reward)
return reward
def check_if_done(total_reward):
global start_time
# Если время начала эпизода не задано, установите его
if start_time is None:
start_time = time.time()
# Получите текущее время
current_time = time.time()
if total_reward >= 10000:
return True
# Проверьте, прошло ли уже 30 секунд
if current_time - start_time >= 5:
return True
else:
return False
# Функция для применения действия на подводный аппарат
def take_action(action, output_dim, previous_state, total_reward):
for i, power in enumerate(action):
auv.set_motor_power(i, power*15)
time.sleep(0.1)
next_state = get_current_state()
reward = calculate_reward(desired_state, next_state, previous_state)
done = check_if_done(total_reward)
return next_state, reward, done
if __name__ == '__main__':
agent = Agent(input_dim=4, output_dim=5)
num_episodes = 50000
for episode in range(num_episodes):
# Сброс симуляции и получение начального состояния
auv.yaw = 0
auv.pitch = 0
auv.roll = 0
auv.depth = 0
current_state = get_current_state()
done = False
total_reward = 0.0
num_steps = 0
while not done:
# Генерация действия нейросетью
action = agent.generate_action(current_state)
# print(action)
# Применение действия и получение следующего состояния, вознаграждения и флага завершения
next_state, reward, done = take_action(action, agent.output_dim, current_state, total_reward)
# Обновление нейросети на основе полученных данных (пример)
agent.update(current_state, action, reward, next_state, done)
# Обновление текущего состояния
current_state = next_state
total_reward += reward
num_steps += 1
start_time = time.time()
print("Episode:", episode, "Total Reward:", total_reward)
total_reward = 0
import torch
import torch.nn as nn
import torch.optim as optim
import random
class Agent:
def __init__(self, input_dim, output_dim):
self.input_dim = input_dim
self.output_dim = output_dim
self.model = None
self.optimizer = None
# Вызывается методом initialize_model
self.initialize_model()
def initialize_model(self):
# Define the architecture of the neural network
self.model = nn.Sequential(
nn.Linear(self.input_dim, 64),
nn.ReLU(),
nn.Linear(64, 64),
nn.ReLU(),
nn.Linear(64, self.output_dim) # Adjusted output dimension
)
self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
def generate_action(self, state, epsilon=0.01):
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
action_values = self.model(state_tensor)
if random.random() < epsilon:
# Random action
action = [random.randint(-10, 10) for _ in range(self.output_dim)]
else:
# Best action based on the model's predictions
action = action_values.squeeze(0).tolist()
return action
def update(self, state, action, reward, next_state, done):
state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
action_tensor = torch.zeros((1, self.output_dim), dtype=torch.float32) # Изменение размерности action_tensor
reward_tensor = torch.tensor(reward, dtype=torch.float32).unsqueeze(0)
next_state_tensor = torch.tensor(next_state, dtype=torch.float32).unsqueeze(0)
# Compute the target Q-values
with torch.no_grad():
next_state_values = self.model(next_state_tensor).max(1)[0].unsqueeze(0)
target_q_values = reward_tensor + (1 - done) * next_state_values
# Compute the current Q-values for the chosen actions
current_q_values = self.model(state_tensor) # Нет необходимости в gather
# Select the Q-values for the chosen action
current_q_values = torch.sum(current_q_values * action_tensor, dim=1, keepdim=True)
# Compute the loss and update the neural network
loss = nn.functional.smooth_l1_loss(current_q_values, target_q_values)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def save_model(self, filepath):
torch.save(self.model.state_dict(), filepath)
def load_model(self, filepath):
self.model.load_state_dict(torch.load(filepath))
self.model.eval()