keycloakfastapi

"Invalid token" in keycloak


I am trying to use keycloak in my FastAPI app My code

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from keycloak import KeycloakOpenID
import requests
import logging
import os

from .config import settings


# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Keycloak configuration
KEYCLOAK_SERVER_URL = settings.KEYCLOAK_SERVER_URL
KEYCLOAK_REALM = settings.KEYCLOAK_REALM
KEYCLOAK_CLIENT_ID = settings.KEYCLOAK_CLIENT_ID
KEYCLOAK_CLIENT_SECRET = settings.KEYCLOAK_CLIENT_SECRET
ALGORITHM = "RS256"
TOKEN_URL = f"{KEYCLOAK_SERVER_URL}/realms/fastapi-realm/protocol/openid-connect/token"


# Initialize KeycloakOpenID
keycloak_openid = KeycloakOpenID(
    server_url=f"{KEYCLOAK_SERVER_URL}",
    client_id=KEYCLOAK_CLIENT_ID,
    realm_name=KEYCLOAK_REALM,
    client_secret_key=KEYCLOAK_CLIENT_SECRET,
    verify=False
)
config_well_known = keycloak_openid.well_known()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_token(token: str = Depends(oauth2_scheme)):
    try:
        decoded_token = keycloak_openid.decode_token(
            token,validate=True,
        )
        username = decoded_token['preferred_username']
        logger.info(f"Decoded token: {decoded_token}")
        # Verify the issuer claim
        issuer = decoded_token["iss"]

        expected_issuer = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}"
        I# Token example -- token: {'exp': 1731303036, 'iat': 1731267036, 'jti': 'f1b71d25-4de6-4c03-b5f5-d9726b39d51f', 'iss': 'https://feast-keycloak.pimc-st.innodev.local/realms/feast-realm', 'aud': 'account', 'sub': 'ac48f45e-f26b-4380-bde8-e752febb6d18', 'typ': 'Bearer', 'azp': 'feast-client-id', 'session_state': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'acr': '1', 'allowed-origins': ['https://feast-frontend.pimc-st.innodev.local', '/*', 'http://localhost:5173'], 'realm_access': {'roles': ['default-roles-feast-realm', 'offline_access', 'uma_authorization']}, 'resource_access': {'account': {'roles': ['manage-account', 'manage-account-links', 'view-profile']}}, 'scope': 'profile email', 'sid': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'email_verified': False, 'name': 'A B', 'preferred_username': 'my_username', 'given_name': 'A', 'family_name': 'B', 'email': 'aamoskalenko@inno.tech'}
        logger.info(f"XXX_ issuer={issuer}")
        logger.info(f"XXX_ expected_issuer={expected_issuer}")
        if issuer != expected_issuer:
            raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid issuer")
        
        logger.info(f"username: {username}")
        return decoded_token
    except Exception as e:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

In my main.py I have the following code

from fastapi import Depends, FastAPI, HTTPException, status, Security


from .keycloak import verify_token, oauth2_scheme, KEYCLOAK_CLIENT_ID, KEYCLOAK_CLIENT_SECRET, TOKEN_URL

app = FastAPI()

@app.post("/project", response_model=schemas.Project, tags=["project",])
def create_project(project: schemas.CreateProject, db: Session = Depends(get_db), payload: dict = Security(verify_token)):
    ...


@app.post("/login")
def get_token(body: schemas.Login):
    data = {
        "grant_type": "password", # TODO: clarify grant_type client_credentials (requires only client id and secret or password - requires password and login)
        "client_id": KEYCLOAK_CLIENT_ID,
        "client_secret": KEYCLOAK_CLIENT_SECRET,
        "password": body.password,
        "username": body.login
    }
    response = requests.post(TOKEN_URL, data=data,  verify=False)
    return JSONResponse(status_code=response.status_code, content=response.json())

I am obtaining token with help of /login method Then I am applying token like this: enter image description here

For requet /project I have an error:

"Invalid token"

How to fix the error?


Solution

  • This 3 Steps can verify token

    Step 1: Fetch the public key certificate from Keycloak to verify the JWT token signature.

    Step 2: Convert the certificate into a usable public key format.

    Step 3: Decode and verify the JWT token signature using the public key, while skipping audience and issuer checks.

        # Step 1: Get public key for signature verification
        certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
        certs_response = requests.get(certs_url)
        certs_response.raise_for_status()
        public_key_data = certs_response.json()
        certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
    
        # Step 2: Convert certificate to public key
        cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
        public_key = cert.public_key().public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
    
        # Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
        jwt.decode(
            token,
            public_key,
            algorithms=['RS256'],
            options={
                'verify_aud': False,
                'verify_iss': False
            }
        )
    

    Launching Key cloak by Docker Compose

    version: '3.8'
    
    services:
      postgres:
        image: postgres:15.6
        container_name: postgres_db
        restart: always
        ports:
          - "5432:5432"
        volumes:
          - postgres_data:/var/lib/postgresql/data
        environment:
          POSTGRES_DB: keycloak
          POSTGRES_USER: keycloak
          POSTGRES_PASSWORD: password
    
      keycloak_web:
        image: quay.io/keycloak/keycloak:26.0.5
        container_name: keycloak_web
        environment:
          KC_DB: postgres
          KC_DB_URL: jdbc:postgresql://postgres:5432/keycloak
          KC_DB_USERNAME: keycloak
          KC_DB_PASSWORD: password
    
          KC_HOSTNAME: localhost
          KC_HOSTNAME_STRICT: false
          KC_HOSTNAME_STRICT_HTTPS: false
    
          KC_LOG_LEVEL: info
          KC_METRICS_ENABLED: true
          KC_HEALTH_ENABLED: true
          KEYCLOAK_ADMIN: admin
          KEYCLOAK_ADMIN_PASSWORD: admin
        command: start-dev
        depends_on:
          - postgres
        ports:
          - 8080:8080
    
    volumes:
      postgres_data:
    

    Create realm & user

    realm

    fastapi-realm
    

    user

    username: user1
    password: 1234
    

    enter image description here

    Install dependency in Python 3.12 with conda

    pip install fastapi uvicorn requests python-keycloak python-jose cryptography
    

    enter image description here

    FastAPI server code

    api-server.py

    from fastapi import FastAPI, HTTPException, Depends
    from pydantic import BaseModel
    from starlette.responses import JSONResponse
    from keycloak import KeycloakOpenID
    from jose import jwt, JWTError
    from cryptography import x509
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import serialization
    
    import requests
    import json
    
    app = FastAPI()
    
    # Configuration variables
    KEYCLOAK_URL = 'http://localhost:8080'
    REALM = 'fastapi-realm'
    TOKEN_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/token"
    KEYCLOAK_CLIENT_ID = 'admin-cli'
    
    # Keycloak service class
    class KeycloakService:
        def __init__(self):
            self.keycloak_url = KEYCLOAK_URL
            self.realm = REALM
    
        def decode_and_print_token(self, token):
            try:
                # Retrieve claims without verifying the signature or audience
                decoded_token = jwt.get_unverified_claims(token)
                print("Decoded JWT token:")
                print(json.dumps(decoded_token, indent=4))
                return decoded_token
            except JWTError as e:
                print(f"Error decoding token: {e}")
                return None
            
        def validate_token(self, token: str, client_id: str) -> bool:
            decoded_token = self.decode_and_print_token(token)
            if not decoded_token:
                print("Failed to decode token.")
                return False
    
            try:
                # Extract issuer, azp, and aud directly from decoded token
                issuer = decoded_token.get('iss')
                authorized_party = decoded_token.get('azp')
                audience = decoded_token.get('aud')
    
                print("Issuer (iss) in token:", issuer)
                print("Authorized party (azp) in token:", authorized_party)
                print("Audience (aud) in token:", audience)
    
                expected_issuer = f"{self.keycloak_url}/realms/{self.realm}"
                if issuer != expected_issuer:
                    print(f"Invalid issuer. Expected: {expected_issuer}, Got: {issuer}")
                    return False
    
                # Allow `aud` to be `None` for user tokens, or match "account" or the client_id for client tokens
                if audience is None:
                    print("Audience is None, assuming this is a user token without an audience.")
                elif audience not in ["account", client_id]:
                    print(f"Invalid audience. Expected: 'account' or '{client_id}', Got: {audience}")
                    return False
    
                # Step 1: Get public key for signature verification
                certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
                certs_response = requests.get(certs_url)
                certs_response.raise_for_status()
                public_key_data = certs_response.json()
                certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
    
                # Step 2: Convert certificate to public key
                cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
                public_key = cert.public_key().public_bytes(
                    encoding=serialization.Encoding.PEM,
                    format=serialization.PublicFormat.SubjectPublicKeyInfo
                )
    
                # Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
                jwt.decode(
                    token,
                    public_key,
                    algorithms=['RS256'],
                    options={
                        'verify_aud': False,
                        'verify_iss': False
                    }
                )
                print("Token signature is valid.")
                return True
            except JWTError as e:
                print(f"Error validating token: {e}")
                return False
            except requests.RequestException as e:
                print(f"Error fetching public key: {e}")
                return False
    
    keycloak_service = KeycloakService()
    
    # Schemas
    class Login(BaseModel):
        login: str
        password: str
    
    class VerifyToken(BaseModel):
        token: str
        client_id: str
    
    # Login endpoint
    @app.post("/login")
    def get_token(body: Login):
        data = {
            "grant_type": "password",
            "client_id": KEYCLOAK_CLIENT_ID,
            "username": body.login,
            "password": body.password
        }
        response = requests.post(TOKEN_URL, data=data, verify=False)
        if response.status_code == 200:
            return JSONResponse(status_code=200, content=response.json())
        else:
            raise HTTPException(status_code=response.status_code, detail=response.json())
    
    # Verify endpoint
    @app.post("/verify")
    def verify_token(body: VerifyToken):
        is_valid = keycloak_service.validate_token(body.token, body.client_id)
        return {"valid": is_valid}
    
    

    Launching FastAPI server

    uvicorn api-server:app --reload
    

    enter image description here

    API document URL

    http://localhost:8000/docs#/
    

    enter image description here

    Get Token by Postman

    URL

    POST http://localhost:8000/login
    

    Input Body

    {
      "login": "user1",
      "password": "1234"
    }
    

    Script for assign user_token variable

    var jsonData = JSON.parse(responseBody);
    
    if (jsonData.access_token) {
        postman.setEnvironmentVariable("user_token", jsonData.access_token);
    } else {
        console.error("access_token not found in the response");
    }
    

    enter image description here

    enter image description here

    enter image description here

    Verify Token by Postman

    URL

    POST http://localhost:8000/verify
    

    Input Body

    {
      "token": "{{user_token}}",
      "client_id": "admin-cli"
    }
    

    enter image description here

    So the user token verified with OK (true) value.

    Token Valid Result

    {
        "valid": true
    }