I'm bad a coding python and I'm new to Pytorch. The problem is I can run this code on a CPU by defining accelerator = 'cpu' BUT when I set it to GPU the code gets stuck on running _ = iter(train_loader) . I don't have an Nvidia GPU and I have been trying to run this on Google Collab but it can't run on the T4 GPU as it gets stuck on _ = iter(train_loader) .
RuntimeError: CUDA error: initialization error you can easily run this code if you have the installed modules
Please help if you know how to use GPU acceleration in PyTorch lightning.
import os
import pandas as pd
import numpy as np
import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
from pytorch_lightning import LightningModule, Trainer
from typing import List, Tuple, Generator
import random
import multiprocessing as mp
# Add CUDA-related environment variable settings
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["TORCH_USE_CUDA_DSA"] = "1"
print(torch.cuda.is_available())
avg_reward_list = [] # List to store average rewards
avg_total_actions = [] # List to store average total actions
episode_times = []
num_episodes = 5000
# Define the calculate_profit function
def calculate_profit(next_price, current_price, amount_held):
if amount_held > 0:
return (current_price - next_price) * amount_held
elif amount_held < 0:
return (next_price - current_price) * abs(amount_held)
else:
return 0
# Check if CUDA (GPU) is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class DQN(nn.Module):
def __init__(self, in_states, h1_nodes, out_actions, dropout_prob=0.0):
super().__init__()
self.fc1 = nn.Linear(in_states, h1_nodes)
self.dropout = nn.Dropout(dropout_prob)
self.fc2 = nn.Linear(h1_nodes, out_actions)
def forward(self, x):
x = torch.relu(self.fc1(x.float()))
x = self.dropout(x)
x = self.fc2(x)
return x
class CustomEnv(gym.Env):
def __init__(self, df, initial_balance=10000, lookback_window_size=1):
super(CustomEnv, self).__init__()
self.df = df.dropna().reset_index()
self.df_total_steps = 100
self.initial_balance = initial_balance
self.lookback_window_size = lookback_window_size
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.lookback_window_size,), dtype=np.float32)
self.current_step = None
self.total_actions = 0
self.current_price = 0
self.net_worth = 0
self.avg_total_actions = 0
self.avg_total_actionss = 0
self.not_profitable_profits = 0
self.profitable_profits = 0
self.step_count = 0
self.avg_total_actionss_array = 0
self.profit_factor = 0
self.episode_rewards_array = [] # List to store rewards for this episode
self.action_total_array = [] # List to store total actions for this episode
self.net_worth_array = [] # List to store net worth for this episode
self.gamma = 0.90
self.epsilon = 1.0
self.epsilon_min = 0.01
self.epsilon_decay = 0.999999
h1_nodes = 80
in_states = 1
out_actions = 3
learning_rate = 0.001
self.dropout_prob=0.5
self.model = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
self.target_model = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
self.batch_size = 1000
def remember(self, experience):
self.memory.append(experience)
if len(self.memory) > self.buffer_size:
self.memory.pop(0) # Remove oldest experience if buffer exceeds size
def sample_batch(self, batch_size):
return random.sample(self.memory, min(len(self.memory), batch_size))
def reset(self):
self.net_worth = self.initial_balance
self.current_step = self.lookback_window_size
self.total_actions = random.uniform(-10, 10)
self.current_price = self.df.loc[self.current_step, 'close']
return self._next_observation()
def _next_observation(self):
close_prices = self.df.loc[self.current_step - self.lookback_window_size:self.current_step - 1, 'Normalized_SMA_Slope'].values
return torch.tensor(close_prices, dtype=torch.float32)
def step(self, action):
self.step_count += 1
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
self.current_step += 1
if self.current_step >= len(self.df):
return self._next_observation(), 0, True, {}
obs = self._next_observation()
self.total_actions = (action - 1) * 10 # Convert action index to total actions
next_price = self.df.loc[self.current_step + 1, 'close'] if self.current_step + 1 < len(self.df) else self.current_price
self.current_price = self.df.loc[self.current_step, 'close']
holding = self.total_actions / 10
profit = calculate_profit(next_price, self.current_price, holding)
self.net_worth += profit
reward = profit
if self.current_step >= self.df_total_steps or self.net_worth < 100:
done = 1
else:
done = 0
self.avg_total_actions = np.mean(self.total_actions)
self.train_dqn(obs, action, reward, done)
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
if profit > 0: #and avg_action < 1 and avg_action > -1:
#self.profitable_profits.append(profit)
self.profitable_profits += profit
#profit = profit*2
else:
self.not_profitable_profits -= profit
self.profit_factor = ((self.profitable_profits / self.not_profitable_profits) if self.not_profitable_profits != 0 else 0.0) - 1
if self.step_count > self.batch_size:
self.target_model.load_state_dict(self.model.state_dict())
self.step_count = 0
#print(f'Step: {self.current_step}, Net Worth: {round(self.net_worth, 1)}, PRICE: {round(self.current_price, 1)}, action: {round(self.total_actions, 1)}, Profit Factor: {self.profit_factor}')
#print(f'working!')
self.episode_rewards_array.append(reward)
self.action_total_array.append(self.total_actions)
self.net_worth_array.append(self.net_worth)
return obs, reward, done, {}
def train_dqn(self, obs, action, reward, done): #nothing wrong here
obs = torch.tensor(obs).unsqueeze(0).float()
action = torch.tensor([action], dtype=torch.int64)
reward = torch.tensor([reward], dtype=torch.float32).requires_grad_()
next_obs = obs
done = torch.tensor([done], dtype=torch.float32).requires_grad_()
# Reshape obs to match the input size expected by fc1 layer
obs = obs.view(-1, self.lookback_window_size)
q_values = self.model(obs)
next_q_values = self.model(next_obs)
q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
next_q_value = next_q_values.max(1)[0].detach() # Detach next_q_value
expected_q_value = reward + self.gamma * next_q_value * (1.0 - done)
loss = nn.functional.mse_loss(q_value, expected_q_value)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
class RLDataset(IterableDataset):
def __init__(self, env: CustomEnv, sample_size: int = 200) -> None:
self.epoch_count = 0
self.env = env
self.sample_size = sample_size
def __iter__(self) -> Generator[Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], None, None]:
obs = self.env.reset()
for _ in range(self.env.df_total_steps):
if random.random() < self.env.epsilon: # Ensure to use self.env.epsilon
action = self.env.action_space.sample() # Random action
else:
action = self.env.model(torch.FloatTensor(obs)).argmax().item()
next_obs, reward, done, _ = self.env.step(action)
yield obs, action, reward, next_obs, done
if done==1:
self.epoch_count += 1
# Calculate average reward for this episode
avg_reward = np.mean(self.env.episode_rewards_array)
avg_reward_list.append(avg_reward) # Append average reward to list
avg_total_actions = np.mean(self.env.action_total_array)
# #print(f" End of episode {self.epoch_count}! Net Worth={round(self.env.net_worth, 2)}! Epsilon={self.env.epsilon} Avg Reward={avg_reward}, Avg Position={avg_total_actions}, profit factor={self.env.profit_factor}")
# # Save episode data to the database
# conn = sqlite3.connect('trading_data.db')
# cursor = conn.cursor()
# rounded_net_worth = round(self.env.net_worth, 2)
# cursor.execute('''
# INSERT INTO data (episode, net_worth, steps, epsilon, avg_reward, avg_total_actions, profit_factor)
# VALUES (?, ?, ?, ?, ?, ?, ?)
# ''', (self.epoch_count, rounded_net_worth, self.env.current_step, self.env.epsilon, avg_reward, avg_total_actions, self.env.profit_factor))
# conn.commit()
# conn.close()
obs = self.env.reset()
else:
obs = next_obs
class DQNLit(LightningModule):
def __init__(self, env: CustomEnv, batch_size) -> None:
super().__init__() # Initialize epoch count
self.env = env
h1_nodes = 80
in_states = 1
out_actions = 3
self.dropout_prob=0.5 # this kinda brok everything?
self.net = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
self.target_net = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
def forward(self, x):
return self.net(x.float())
def dqn_mse_loss(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) -> torch.Tensor:
obs, action, reward, next_obs, done = batch
# Convert reward and done to Float data type
reward = reward.float()
done = done.float()
q_values = self.net(obs)
next_q_values = self.target_net(next_obs).max(1)[0]
expected_q_values = reward + (1 - done) * self.env.gamma * next_q_values
return nn.MSELoss()(q_values.gather(1, action.unsqueeze(1)).squeeze(1), expected_q_values.detach())
def training_step(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], batch_idx: int):
loss = self.dqn_mse_loss(batch)
return loss
def configure_optimizers(self):
return optim.Adam(self.net.parameters(), lr=0.001)
def train_dataloader(self):
num_workers = 1 # Choose the number of workers you want
return DataLoader(RLDataset(self.env), batch_size=self.batch_size, num_workers=num_workers, persistent_workers=True)
def main():
num_rows = 2000
# Create synthetic data
data = {
'close': np.full(num_rows, 100), # Set close to 100 for all rows
#'Normalized_SMA_Slope': np.full(num_rows, 0.1) # Set Normalized_SMA_Slope to 0.1 for all rows
}
# Create a DataFrame from the synthetic data
df = pd.DataFrame(data)
# Set a specific value for index 0
specific_value = 50
df.at[0, 'close'] = specific_value
# Calculate Simple Moving Average (SMA)
window_size = 10
df['SMA'] = df['close'].rolling(window=window_size).mean()
# Calculate the Simple Moving Average slope for normalization
sma_slope = df['SMA'] - df['SMA'].shift(1)
# SMA slope for normalization
window_long = window_size * 10
sma_slope_long = df['SMA'] - df['SMA'].shift(window_long)
# Calculate the standard deviation of long-term SMA slope
stdev_sma_slope_long = sma_slope_long.rolling(window=window_long, min_periods=1).std()
# Normalize the Simple Moving Average slope
normalized_sma_slope = sma_slope / stdev_sma_slope_long
# Add the normalized_sma_slope to df as a new column
df['Normalized_SMA_Slope'] = normalized_sma_slope
# Define the lookback window size
lookback_window_size = 1
# Initialize the environment with the dataset and lookback window size
train_df = df[:-lookback_window_size]
train_env = CustomEnv(train_df, lookback_window_size=lookback_window_size)
batch_size = 1000
print('started1')
train_dataset = RLDataset(train_env)
print('started2')
train_loader = DataLoader(train_dataset, batch_size=batch_size, num_workers=1)
print('started3')
model = DQNLit(train_env, batch_size)
_ = iter(train_loader) # Call iter(train_loader) to avoid the RuntimeError
trainer = Trainer(accelerator='gpu')#, max_epochs=200, strategy='ddp_notebook')
print('started4')
trainer.fit(model, train_loader)
print('started5')
if __name__ == '__main__':
# Required on Windows when using multiprocessing
#mp.set_start_method("spawn")
main()
train_loader = DataLoader(train_dataset, batch_size=batch_size) #, num_workers=1
is the issue as num_workers=1 creates 2 instances and this is the issue. BUT setting it to num_workers=0 creates an error that says num_workers needs to be greater than 0 but this is a bug. SO REMOVE num_workers parameter then it works fine as essentially that equals num_workers = 0.