pythonpytorchreinforcement-learningpytorch-lightning

PyTorch lightning RuntimeError: CUDA error: initialization error. CPU works tho


I'm bad a coding python and I'm new to Pytorch. The problem is I can run this code on a CPU by defining accelerator = 'cpu' BUT when I set it to GPU the code gets stuck on running _ = iter(train_loader) . I don't have an Nvidia GPU and I have been trying to run this on Google Collab but it can't run on the T4 GPU as it gets stuck on _ = iter(train_loader) .

RuntimeError: CUDA error: initialization error you can easily run this code if you have the installed modules

Please help if you know how to use GPU acceleration in PyTorch lightning.

import os
import pandas as pd
import numpy as np
import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data.dataset import IterableDataset
from pytorch_lightning import LightningModule, Trainer
from typing import List, Tuple, Generator
import random
import multiprocessing as mp



# Add CUDA-related environment variable settings
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"
os.environ["TORCH_USE_CUDA_DSA"] = "1"
print(torch.cuda.is_available())

avg_reward_list = []  # List to store average rewards
avg_total_actions = []  # List to store average total actions

episode_times = []
num_episodes = 5000


# Define the calculate_profit function
def calculate_profit(next_price, current_price, amount_held):
    if amount_held > 0:
        return (current_price - next_price) * amount_held
    elif amount_held < 0:
        return (next_price - current_price) * abs(amount_held)
    else:
        return 0

# Check if CUDA (GPU) is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class DQN(nn.Module):
    def __init__(self, in_states, h1_nodes, out_actions, dropout_prob=0.0):  
        super().__init__()
        self.fc1 = nn.Linear(in_states, h1_nodes)
        self.dropout = nn.Dropout(dropout_prob)
        self.fc2 = nn.Linear(h1_nodes, out_actions)

    def forward(self, x):
        x = torch.relu(self.fc1(x.float()))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

class CustomEnv(gym.Env):
    def __init__(self, df, initial_balance=10000, lookback_window_size=1):
        super(CustomEnv, self).__init__()
        self.df = df.dropna().reset_index()
        self.df_total_steps = 100
        self.initial_balance = initial_balance
        self.lookback_window_size = lookback_window_size
        self.action_space = spaces.Discrete(3)  
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.lookback_window_size,), dtype=np.float32)
        self.current_step = None
        self.total_actions = 0
        self.current_price = 0
        self.net_worth = 0
        self.avg_total_actions = 0
        self.avg_total_actionss = 0
        self.not_profitable_profits = 0
        self.profitable_profits = 0
        self.step_count = 0
        self.avg_total_actionss_array = 0
        self.profit_factor = 0
        

        self.episode_rewards_array = []  # List to store rewards for this episode
        self.action_total_array = []  # List to store total actions for this episode
        self.net_worth_array = []  # List to store net worth for this episode

        self.gamma = 0.90
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.999999
        h1_nodes = 80
        in_states = 1
        out_actions = 3
        learning_rate = 0.001
        self.dropout_prob=0.5
        self.model = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
        self.target_model = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
        self.optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        self.batch_size = 1000

    def remember(self, experience):
        self.memory.append(experience)
        if len(self.memory) > self.buffer_size:
            self.memory.pop(0)  # Remove oldest experience if buffer exceeds size

    def sample_batch(self, batch_size):
        return random.sample(self.memory, min(len(self.memory), batch_size))

    def reset(self):
        self.net_worth = self.initial_balance
        self.current_step = self.lookback_window_size
        self.total_actions = random.uniform(-10, 10)
        self.current_price = self.df.loc[self.current_step, 'close']
        return self._next_observation()

    def _next_observation(self):
        close_prices = self.df.loc[self.current_step - self.lookback_window_size:self.current_step - 1, 'Normalized_SMA_Slope'].values
        return torch.tensor(close_prices, dtype=torch.float32)

    def step(self, action):
        self.step_count += 1
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        self.current_step += 1
        if self.current_step >= len(self.df):
            return self._next_observation(), 0, True, {}

        obs = self._next_observation()

        self.total_actions = (action - 1) * 10  # Convert action index to total actions

        next_price = self.df.loc[self.current_step + 1, 'close'] if self.current_step + 1 < len(self.df) else self.current_price
        self.current_price = self.df.loc[self.current_step, 'close']
        holding = self.total_actions / 10
        profit = calculate_profit(next_price, self.current_price, holding)
        self.net_worth += profit

        reward = profit

        if self.current_step >= self.df_total_steps or self.net_worth < 100:
            done = 1
        else:
            done = 0

        self.avg_total_actions = np.mean(self.total_actions)

        self.train_dqn(obs, action, reward, done)

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay
        if profit > 0: #and avg_action < 1 and avg_action > -1:
            #self.profitable_profits.append(profit)
            self.profitable_profits += profit 
            #profit = profit*2
        else:
            self.not_profitable_profits -= profit

        self.profit_factor = ((self.profitable_profits / self.not_profitable_profits) if self.not_profitable_profits != 0 else 0.0) - 1

        if self.step_count > self.batch_size:
            self.target_model.load_state_dict(self.model.state_dict())
            self.step_count = 0
        #print(f'Step: {self.current_step}, Net Worth: {round(self.net_worth, 1)}, PRICE: {round(self.current_price, 1)}, action: {round(self.total_actions, 1)}, Profit Factor: {self.profit_factor}')
        #print(f'working!')

        self.episode_rewards_array.append(reward)
        self.action_total_array.append(self.total_actions)
        self.net_worth_array.append(self.net_worth)

        return obs, reward, done, {}


    def train_dqn(self, obs, action, reward, done): #nothing wrong here 
        obs = torch.tensor(obs).unsqueeze(0).float()
        action = torch.tensor([action], dtype=torch.int64)
        reward = torch.tensor([reward], dtype=torch.float32).requires_grad_()
        next_obs = obs
        done = torch.tensor([done], dtype=torch.float32).requires_grad_()

        # Reshape obs to match the input size expected by fc1 layer
        obs = obs.view(-1, self.lookback_window_size)  

        q_values = self.model(obs)
        next_q_values = self.model(next_obs)
        q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
        next_q_value = next_q_values.max(1)[0].detach()  # Detach next_q_value
        expected_q_value = reward + self.gamma * next_q_value * (1.0 - done)

        loss = nn.functional.mse_loss(q_value, expected_q_value)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()




class RLDataset(IterableDataset):
    def __init__(self, env: CustomEnv, sample_size: int = 200) -> None:
        self.epoch_count = 0
        self.env = env
        self.sample_size = sample_size

    def __iter__(self) -> Generator[Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], None, None]:
        obs = self.env.reset()
        for _ in range(self.env.df_total_steps):
            if random.random() < self.env.epsilon:  # Ensure to use self.env.epsilon
                action = self.env.action_space.sample()  # Random action
            else:
                action = self.env.model(torch.FloatTensor(obs)).argmax().item()
            next_obs, reward, done, _ = self.env.step(action)
            yield obs, action, reward, next_obs, done
            if done==1:
                self.epoch_count += 1
                # Calculate average reward for this episode
                avg_reward = np.mean(self.env.episode_rewards_array)
                avg_reward_list.append(avg_reward)  # Append average reward to list
                avg_total_actions = np.mean(self.env.action_total_array)

                # #print(f" End of episode {self.epoch_count}! Net Worth={round(self.env.net_worth, 2)}! Epsilon={self.env.epsilon} Avg Reward={avg_reward}, Avg Position={avg_total_actions}, profit factor={self.env.profit_factor}")
                # # Save episode data to the database
                # conn = sqlite3.connect('trading_data.db')
                # cursor = conn.cursor()
                # rounded_net_worth = round(self.env.net_worth, 2)
                # cursor.execute('''
                #     INSERT INTO data (episode, net_worth, steps, epsilon, avg_reward, avg_total_actions, profit_factor)
                #     VALUES (?, ?, ?, ?, ?, ?, ?)
                # ''', (self.epoch_count, rounded_net_worth, self.env.current_step, self.env.epsilon, avg_reward, avg_total_actions, self.env.profit_factor))
                # conn.commit()
                # conn.close()

                obs = self.env.reset()
            else:
                obs = next_obs

class DQNLit(LightningModule):
    def __init__(self, env: CustomEnv, batch_size) -> None:
        super().__init__()  # Initialize epoch count
        self.env = env

        h1_nodes = 80
        in_states = 1
        out_actions = 3
        self.dropout_prob=0.5 # this kinda brok everything?
        self.net = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)
        self.target_net = DQN(in_states, h1_nodes, out_actions, dropout_prob=self.dropout_prob)

    def forward(self, x):
        return self.net(x.float())
    
    def dqn_mse_loss(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) -> torch.Tensor:
        obs, action, reward, next_obs, done = batch
        
        # Convert reward and done to Float data type
        reward = reward.float()
        done = done.float()
        
        q_values = self.net(obs)
        next_q_values = self.target_net(next_obs).max(1)[0]
        expected_q_values = reward + (1 - done) * self.env.gamma * next_q_values
        
        return nn.MSELoss()(q_values.gather(1, action.unsqueeze(1)).squeeze(1), expected_q_values.detach())

    def training_step(self, batch: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor], batch_idx: int):
        loss = self.dqn_mse_loss(batch)
        return loss

    def configure_optimizers(self):
        return optim.Adam(self.net.parameters(), lr=0.001)

    def train_dataloader(self):
        num_workers = 1  # Choose the number of workers you want
        return DataLoader(RLDataset(self.env), batch_size=self.batch_size, num_workers=num_workers, persistent_workers=True)




def main():
    num_rows = 2000
    # Create synthetic data
    data = {
        'close': np.full(num_rows, 100),  # Set close to 100 for all rows
        #'Normalized_SMA_Slope': np.full(num_rows, 0.1)  # Set Normalized_SMA_Slope to 0.1 for all rows
    }

    # Create a DataFrame from the synthetic data
    df = pd.DataFrame(data)

    # Set a specific value for index 0
    specific_value = 50
    df.at[0, 'close'] = specific_value

    # Calculate Simple Moving Average (SMA)
    window_size = 10
    df['SMA'] = df['close'].rolling(window=window_size).mean()

    # Calculate the Simple Moving Average slope for normalization
    sma_slope = df['SMA'] - df['SMA'].shift(1)

    # SMA slope for normalization
    window_long = window_size * 10
    sma_slope_long = df['SMA'] - df['SMA'].shift(window_long)

    # Calculate the standard deviation of long-term SMA slope
    stdev_sma_slope_long = sma_slope_long.rolling(window=window_long, min_periods=1).std()

    # Normalize the Simple Moving Average slope
    normalized_sma_slope = sma_slope / stdev_sma_slope_long

    # Add the normalized_sma_slope to df as a new column
    df['Normalized_SMA_Slope'] = normalized_sma_slope

    # Define the lookback window size
    lookback_window_size = 1


    # Initialize the environment with the dataset and lookback window size
    train_df = df[:-lookback_window_size]
    train_env = CustomEnv(train_df, lookback_window_size=lookback_window_size)
    batch_size = 1000
    print('started1')
    train_dataset = RLDataset(train_env)
    print('started2')
    train_loader = DataLoader(train_dataset, batch_size=batch_size, num_workers=1)
    print('started3')

    model = DQNLit(train_env, batch_size)

    _ = iter(train_loader)  # Call iter(train_loader) to avoid the RuntimeError
    trainer = Trainer(accelerator='gpu')#, max_epochs=200, strategy='ddp_notebook')
    print('started4')
    trainer.fit(model, train_loader)
    print('started5')

if __name__ == '__main__':
    # Required on Windows when using multiprocessing
    #mp.set_start_method("spawn")
    main()












Solution

  • train_loader = DataLoader(train_dataset, batch_size=batch_size) #, num_workers=1

    is the issue as num_workers=1 creates 2 instances and this is the issue. BUT setting it to num_workers=0 creates an error that says num_workers needs to be greater than 0 but this is a bug. SO REMOVE num_workers parameter then it works fine as essentially that equals num_workers = 0.