pythonmachine-learninglinear-regressiongradient-descent

My python implementation of Gradient Descent is not working well


I am trying to create a Linear regression model that uses batch gradient descent but the error or mse value never decreases. The LinearModel is just a template class that initializes the hyperparameters (step_size=0.001, max_iter=10000, eps=0.001, theta_0=None, verbose=True)

# The data is stored in an array of (m, n), where there are n #columns and m #rows
# x = array((m,n))
# y = array((m,1))

class LinearRegression(LinearModel):
  """Linear regression with Gradient Descent."""

  def fit(self, x, y):
    """Run Gradient Descent Method to minimize J(theta) for Linear Regression.

    :param x: Training example inputs. Shape (m, n).
    :param y: Training example labels. Shape (m,).
    """

    def squared_error(theta, x,y):
      # Initialize a variable to store the total error
      error = 0
        # Calculate the predicted value using the current parameter values
        # Calculate the difference between the predicted and actual values
        # Add the squared difference to the total error
      # Return the total error divided by the number of data points
      return np.mean((x.dot(theta) - y)**2)

    # Define a function to calculate the partial derivatives of the squared error with respect to each parameter
    def grad_squared_error(theta, x,y):
      m,n = x.shape
      # Initialize a list to store the partial derivatives
      grad = np.zeros(n)

      for j in range(n):
        # Update the partial derivatives using the chain rule
        grad[j] += ((x.dot(theta)) - y).dot(x[:,j][np.newaxis].T) #np.newaxis to make the numpy array 2D for Transpose

      # Return the partial derivatives divided by the number of (data points *2)
      return [g / (2*m) for g in grad]


    m,n = x.shape
    if self.theta == None:
      self.theta = np.zeros(n)


    decay_factor = 0.9  # Adjust this as needed
    decay_interval = 10  # Adjust this as needed

    for i in range(self.max_iter):
      # Calculate the partial derivatives of the squared error using the current parameter values
      grad = grad_squared_error(self.theta, x,y)

      # Update each parameter value using gradient descent formula
      for j in range(len(self.theta)):
        self.theta[j] -= self.step_size * grad[j]

      # Print the current parameter values and squared error
      if self.verbose and i % decay_interval == 0:
        self.step_size *= decay_factor
        print(f"Iteration {i+1}: theta = {self.theta}, squared error = {squared_error(self.theta, x,y)}")


      if squared_error(self.theta, x,y) < self.eps:
        if self.verbose:
          print("Converged With Parameters:")
          print(f"Iteration {i+1}: theta = {self.theta}, squared error = {squared_error(self.theta, x,y)}")
        break



  def predict(self, x):
      """Make a prediction given new inputs x.

      :param x: Inputs of shape (m, n).
      :return:  Outputs of shape (m,).
      """
      return np.dot(x, self.theta)

A few last output values are following.

Iteration 9911: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9921: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9931: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9941: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9951: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9961: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9971: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9981: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878
Iteration 9991: theta = [-0.47145182  5.37944606  6.10798726  5.03442352  4.33745377  1.48791336], squared error = 19565.226327190878

I have tried even decreasing the learning_rate after 10 steps but still not working. I have used the Fish_Market data from Kaggle: https://www.kaggle.com/datasets/tarunkumar1912/fish-dataset1?select=Fish_dataset.csv


Solution

  • So, I was able to get the Code to work as intended.

    Here's the code:

    class LinearRegression2(LinearModel):
    """Linear regression with Multiple Methods."""
    
    def fit(self, x, y):
        """Run Gradient Descent Method to minimize J(theta) for Linear Regression.
    
        :param x: Training example inputs. Shape (m, n).
        :param y: Training example labels. Shape (m,).
        """
    
        m, n = x.shape
        if self.theta is None:
            self.theta = np.zeros(n)
        
        J_arr = []  # To store cost values for plotting
        e_arr = []  # To store MSE values for plotting
        J = 0
        error = 0
    
        if self.method == 'norm':
            self.theta = np.linalg.inv(x.T @ x) @ (x.T @ y)
            J = np.mean((y - x @ self.theta) ** 2) / 2.0
            error = np.mean((x @ self.theta - y) ** 2)
    
            if self.verbose:
                print(f"J: {J}, squared error = {np.mean((y - x @ self.theta) ** 2)}, theta = {self.theta} ")
    
        if self.method == 'gdb':
            decay_factor = 0.5  # Adjust this as needed (if needed)
            decay_interval = 10  # Adjust this as needed (if needed)
    
            for i in range(self.max_iter):
                error = y - x @ self.theta
                J = np.mean((y - x @ self.theta) ** 2) / 2.0
                J_arr.append(J)
                e_arr.append(np.mean(error ** 2))
    
                gradient = -x.T @ error / m
                self.theta -= self.step_size * gradient
    
                # if i % decay_interval == 0:
                #     self.step_size *= decay_factor
    
                if self.verbose and i % decay_interval == 0:
                    print(f"J: {J}, squared error = {np.mean(error ** 2)}, theta = {self.theta} ")
    
                if J < self.eps and self.verbose:
                    print("Converged With Parameters:")
                    print(f"J: {J}, squared error = {np.mean(error ** 2)}, theta = {self.theta} ")
                    break
    
        if self.method == 'gds':
            decay_factor = 0.5  # Adjust this as needed
            decay_interval = 10  # Adjust this as needed
            flag = False
    
            for _ in range(self.max_iter):
    
              for i in range(m):
                idx = np.random.randint(0, m)
                xi = x[idx:idx+1]
                yi = y[idx:idx+1]
                error = yi - xi @ self.theta
                J = np.mean((yi - xi @ self.theta) ** 2) / 2.0
                J_arr.append(J)
                e_arr.append(np.mean(error ** 2))
    
                gradient = - (error @ xi) / m
                self.theta -= self.step_size * gradient
    
                # if i % decay_interval == 0:
                #     self.step_size *= decay_factor
    
                if self.verbose and i % decay_interval == 0:
                  print(f"J: {J}, squared error = {np.mean(error ** 2)}, theta = {self.theta} ")
    
                if J < self.eps:
                  flag=True
                  break
    
              if self.verbose and flag==True:
                  print("Converged With Parameters:")
                  print(f"J: {J}, squared error = {np.mean(error ** 2)}, theta = {self.theta} ")
              break
    
        if self.plot and self.method != 'norm':
            fig, axes = plt.subplots(1, 2, constrained_layout=True, figsize=(15, 5))
            axes[0].set_title("J(theta)")
            axes[0].set_xlabel(f"iterations")
            axes[0].set_ylabel('J(theta)')
            axes[0].plot(J_arr, color='red')
            axes[1].set_title("mse")
            axes[1].set_xlabel(f"iterations")
            axes[1].set_ylabel('mse')
            axes[1].plot(e_arr, color='g')
        elif self.plot and self.method == 'norm':
            print('')
            print('----------------------------------------------------------')
            print('Normal Equation used to find "theta", Graph Not Available.')
        else:
            pass
    
    def predict(self, x):
        """Make a prediction given new inputs x.
    
        :param x: Inputs of shape (m, n).
        :return:  Outputs of shape (m,).
        """
        return x @ self.theta