djangoauthenticationoauth-2.0

Django OAuth2 Authentication Failing in Unit Tests - 401 'invalid_client' Error


I've been stuck on this issue for 3 days now, and I'm hoping someone can help me out. I'm trying to implement OAuth2 authentication for my Django backend with a REST interface. However, my unit tests for authentication are failing, and I'm getting a 401 error with the message 'invalid_client' instead of the expected 200 status code.

My user model is configured to use email as the user ID. Here's the relevant part of my user model:

class CustomUser(AbstractBaseUser, PermissionsMixin):
    email = models.EmailField(unique=True)
    is_active = models.BooleanField(default=True)
    is_staff = models.BooleanField(default=False)
    date_joined = models.DateTimeField(auto_now_add=True)
    roles = models.ManyToManyField(UserRole, related_name="roles")

    objects = CustomUserManager()

    USERNAME_FIELD = 'email'
    REQUIRED_FIELDS = []

    def __str__(self):
        return self.email

class CustomUserManager(BaseUserManager):
    def create_user(self, email, password=None, **extra_fields):
        if not email:
            raise ValueError('The Email field must be set')
        email = self.normalize_email(email)
        user = self.model( email=email, **extra_fields)
        if password:
            user.set_password(password)
        else:
            user.set_unusable_password()
        user.save(using=self._db)
        return user

    def create_superuser(self, email, password=None, **extra_fields):
        extra_fields.setdefault('is_staff', True)
        extra_fields.setdefault('is_superuser', True)

        if extra_fields.get('is_staff') is not True:
            raise ValueError('Superuser must have is_staff=True.')
        if extra_fields.get('is_superuser') is not True:
            raise ValueError('Superuser must have is_superuser=True.')

        return self.create_user(email, password, **extra_fields)`

My Django settings

from .settings import *

# Use an in-memory SQLite database for testing
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.sqlite3',
        'NAME': ':memory:',
    }
}

# Other testing-specific settings
DEBUG = True

TIME_ZONE = 'UTC'
USE_TZ = True

# Middleware to run migrations automatically
class RunMigrationsMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response
        self.run_migrations()

    def __call__(self, request):
        response = self.get_response(request)
        return response

    def run_migrations(self):
        from django.core.management import call_command
        call_command('migrate')

MIDDLEWARE = [
    'matchplan.test_settings.RunMigrationsMiddleware', 
    'django.middleware.security.SecurityMiddleware',
    'django.contrib.sessions.middleware.SessionMiddleware',
    'django.middleware.common.CommonMiddleware',
    'django.middleware.csrf.CsrfViewMiddleware',
    'django.contrib.auth.middleware.AuthenticationMiddleware',
    'oauth2_provider.middleware.OAuth2TokenMiddleware',  
    'django.contrib.messages.middleware.MessageMiddleware',
    'django.middleware.clickjacking.XFrameOptionsMiddleware',
]

# Django REST Framework settings
REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'oauth2_provider.contrib.rest_framework.OAuth2Authentication',
        'rest_framework.authentication.SessionAuthentication',
    ),
    'DEFAULT_PERMISSION_CLASSES': (
        'rest_framework.permissions.IsAuthenticated',
    ),
}

# Custom test runner
TEST_RUNNER = 'matchplan.test_runner.CustomTestRunner'

And, finally, the unit test

from rest_framework.test import APITestCase
from django.contrib.auth import get_user_model
from oauth2_provider.models import Application
from django.urls import reverse

class OAuth2Test(APITestCase):
    def setUp(self):
        self.user = get_user_model().objects.create_user(
            email='testuser@example.com',  # Provide the email argument
            password='testpass123'
        )
        self.application = Application.objects.create(
            name='Test Application',
            client_type=Application.CLIENT_CONFIDENTIAL,
            authorization_grant_type=Application.GRANT_PASSWORD,
            user=self.user
        )

    def test_password_grant(self):
        token_url = reverse('oauth2_provider:token')
        data = {
            'grant_type': 'password',
            'username': 'testuser@example.com',  # Use the email as the username
            'password': 'testpass123',
            'client_id': self.application.client_id,
            'client_secret': self.application.client_secret,
        }
        # Debug prints
        print(f"Client Type: {self.application.client_type}")
        print(f"Client ID: {self.application.client_id}")
        print(f"Client Secret: {self.application.client_secret}")

        response = self.client.post(token_url, data, format='json')
        print(response.json())  # Use response.json() to get the JSON data
        self.assertEqual(response.status_code, 200) # this assertion fails with 401

I've double-checked my OAuth2 configuration and client credentials, but I can't figure out why the authentication is failing in the unit tests. Any help or suggestions would be greatly appreciated!

Additional information

The test fails, returning a 401 status code with the error message 'invalid_client'.


Solution

  • The issue is at the point of verifying the hash by django auth hasher. When creating an oauth2 application, oauth2 automatically creates a hash for the client secret, then that secret (the one before hash) should be provided in the payload. ie, if the secret is youllneverguess and the hash is pbkdf2_sha256$7200...., then it expects the secret youllneverguess(not the hashed secret) to be provided in the payload with the client id which is used get the oauth2 application. Then, it encodes the provided secret (youllneverguess) and compares the hash with the one stored in the fetched application.

    so your solution would be:

    class OAuth2Test(APITestCase):
        def setUp(self):
            self.user = get_user_model().objects.create_user(
                email='testuser@example.com',  # Provide the email argument
                password='testpass123'
            ) # type: ignore
            self.application = Application.objects.create(
                name='Test Application',
                client_type=Application.CLIENT_CONFIDENTIAL,
                authorization_grant_type=Application.GRANT_PASSWORD,
                user=self.user,
                client_secret="youcanneverguess" # this can be an environment variable
            )
    
        def test_password_grant(self):
            token_url = reverse('oauth2_provider:token')
            data = {
                'grant_type': "password",
                'username': 'testuser@example.com',
                'password': 'testpass123',
                'client_id': self.application.client_id,
                # don't use self.application.client_secret since the field will return the hashed value 
                'client_secret': "youcanneverguess", # this can be an environment variable
            }
    
            response = self.client.post(token_url, data=data, format='json')
            print(response.json())
            self.assertEqual(response.status_code, 200)