I am trying to use keycloak in my FastAPI app My code
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
from keycloak import KeycloakOpenID
import requests
import logging
import os
from .config import settings
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Keycloak configuration
KEYCLOAK_SERVER_URL = settings.KEYCLOAK_SERVER_URL
KEYCLOAK_REALM = settings.KEYCLOAK_REALM
KEYCLOAK_CLIENT_ID = settings.KEYCLOAK_CLIENT_ID
KEYCLOAK_CLIENT_SECRET = settings.KEYCLOAK_CLIENT_SECRET
ALGORITHM = "RS256"
TOKEN_URL = f"{KEYCLOAK_SERVER_URL}/realms/fastapi-realm/protocol/openid-connect/token"
# Initialize KeycloakOpenID
keycloak_openid = KeycloakOpenID(
server_url=f"{KEYCLOAK_SERVER_URL}",
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM,
client_secret_key=KEYCLOAK_CLIENT_SECRET,
verify=False
)
config_well_known = keycloak_openid.well_known()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_token(token: str = Depends(oauth2_scheme)):
try:
decoded_token = keycloak_openid.decode_token(
token,validate=True,
)
username = decoded_token['preferred_username']
logger.info(f"Decoded token: {decoded_token}")
# Verify the issuer claim
issuer = decoded_token["iss"]
expected_issuer = f"{KEYCLOAK_SERVER_URL}/realms/{KEYCLOAK_REALM}"
I# Token example -- token: {'exp': 1731303036, 'iat': 1731267036, 'jti': 'f1b71d25-4de6-4c03-b5f5-d9726b39d51f', 'iss': 'https://feast-keycloak.pimc-st.innodev.local/realms/feast-realm', 'aud': 'account', 'sub': 'ac48f45e-f26b-4380-bde8-e752febb6d18', 'typ': 'Bearer', 'azp': 'feast-client-id', 'session_state': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'acr': '1', 'allowed-origins': ['https://feast-frontend.pimc-st.innodev.local', '/*', 'http://localhost:5173'], 'realm_access': {'roles': ['default-roles-feast-realm', 'offline_access', 'uma_authorization']}, 'resource_access': {'account': {'roles': ['manage-account', 'manage-account-links', 'view-profile']}}, 'scope': 'profile email', 'sid': 'b36cd197-247d-447e-9f3d-6cf1fecae7d6', 'email_verified': False, 'name': 'A B', 'preferred_username': 'my_username', 'given_name': 'A', 'family_name': 'B', 'email': 'aamoskalenko@inno.tech'}
logger.info(f"XXX_ issuer={issuer}")
logger.info(f"XXX_ expected_issuer={expected_issuer}")
if issuer != expected_issuer:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid issuer")
logger.info(f"username: {username}")
return decoded_token
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
In my main.py I have the following code
from fastapi import Depends, FastAPI, HTTPException, status, Security
from .keycloak import verify_token, oauth2_scheme, KEYCLOAK_CLIENT_ID, KEYCLOAK_CLIENT_SECRET, TOKEN_URL
app = FastAPI()
@app.post("/project", response_model=schemas.Project, tags=["project",])
def create_project(project: schemas.CreateProject, db: Session = Depends(get_db), payload: dict = Security(verify_token)):
...
@app.post("/login")
def get_token(body: schemas.Login):
data = {
"grant_type": "password", # TODO: clarify grant_type client_credentials (requires only client id and secret or password - requires password and login)
"client_id": KEYCLOAK_CLIENT_ID,
"client_secret": KEYCLOAK_CLIENT_SECRET,
"password": body.password,
"username": body.login
}
response = requests.post(TOKEN_URL, data=data, verify=False)
return JSONResponse(status_code=response.status_code, content=response.json())
I am obtaining token with help of /login method Then I am applying token like this:
For requet /project
I have an error:
"Invalid token"
How to fix the error?
This 3 Steps can verify token
Step 1
: Fetch the public key certificate from Keycloak to verify the
JWT token signature.
Step 2
: Convert the certificate into a usable public key format.
Step 3
: Decode and verify the JWT token signature using the public key, while skipping audience and issuer checks.
# Step 1: Get public key for signature verification
certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
certs_response = requests.get(certs_url)
certs_response.raise_for_status()
public_key_data = certs_response.json()
certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
# Step 2: Convert certificate to public key
cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
public_key = cert.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
jwt.decode(
token,
public_key,
algorithms=['RS256'],
options={
'verify_aud': False,
'verify_iss': False
}
)
version: '3.8'
services:
postgres:
image: postgres:15.6
container_name: postgres_db
restart: always
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
environment:
POSTGRES_DB: keycloak
POSTGRES_USER: keycloak
POSTGRES_PASSWORD: password
keycloak_web:
image: quay.io/keycloak/keycloak:26.0.5
container_name: keycloak_web
environment:
KC_DB: postgres
KC_DB_URL: jdbc:postgresql://postgres:5432/keycloak
KC_DB_USERNAME: keycloak
KC_DB_PASSWORD: password
KC_HOSTNAME: localhost
KC_HOSTNAME_STRICT: false
KC_HOSTNAME_STRICT_HTTPS: false
KC_LOG_LEVEL: info
KC_METRICS_ENABLED: true
KC_HEALTH_ENABLED: true
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: admin
command: start-dev
depends_on:
- postgres
ports:
- 8080:8080
volumes:
postgres_data:
realm
fastapi-realm
user
username: user1
password: 1234
pip install fastapi uvicorn requests python-keycloak python-jose cryptography
api-server.py
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from starlette.responses import JSONResponse
from keycloak import KeycloakOpenID
from jose import jwt, JWTError
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
import requests
import json
app = FastAPI()
# Configuration variables
KEYCLOAK_URL = 'http://localhost:8080'
REALM = 'fastapi-realm'
TOKEN_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/token"
KEYCLOAK_CLIENT_ID = 'admin-cli'
# Keycloak service class
class KeycloakService:
def __init__(self):
self.keycloak_url = KEYCLOAK_URL
self.realm = REALM
def decode_and_print_token(self, token):
try:
# Retrieve claims without verifying the signature or audience
decoded_token = jwt.get_unverified_claims(token)
print("Decoded JWT token:")
print(json.dumps(decoded_token, indent=4))
return decoded_token
except JWTError as e:
print(f"Error decoding token: {e}")
return None
def validate_token(self, token: str, client_id: str) -> bool:
decoded_token = self.decode_and_print_token(token)
if not decoded_token:
print("Failed to decode token.")
return False
try:
# Extract issuer, azp, and aud directly from decoded token
issuer = decoded_token.get('iss')
authorized_party = decoded_token.get('azp')
audience = decoded_token.get('aud')
print("Issuer (iss) in token:", issuer)
print("Authorized party (azp) in token:", authorized_party)
print("Audience (aud) in token:", audience)
expected_issuer = f"{self.keycloak_url}/realms/{self.realm}"
if issuer != expected_issuer:
print(f"Invalid issuer. Expected: {expected_issuer}, Got: {issuer}")
return False
# Allow `aud` to be `None` for user tokens, or match "account" or the client_id for client tokens
if audience is None:
print("Audience is None, assuming this is a user token without an audience.")
elif audience not in ["account", client_id]:
print(f"Invalid audience. Expected: 'account' or '{client_id}', Got: {audience}")
return False
# Step 1: Get public key for signature verification
certs_url = f"{self.keycloak_url}/realms/{self.realm}/protocol/openid-connect/certs"
certs_response = requests.get(certs_url)
certs_response.raise_for_status()
public_key_data = certs_response.json()
certificate_pem = f"-----BEGIN CERTIFICATE-----\n{public_key_data['keys'][0]['x5c'][0]}\n-----END CERTIFICATE-----"
# Step 2: Convert certificate to public key
cert = x509.load_pem_x509_certificate(certificate_pem.encode('utf-8'), default_backend())
public_key = cert.public_key().public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Step 3: Use jwt.decode only for signature verification, disabling audience and issuer checks
jwt.decode(
token,
public_key,
algorithms=['RS256'],
options={
'verify_aud': False,
'verify_iss': False
}
)
print("Token signature is valid.")
return True
except JWTError as e:
print(f"Error validating token: {e}")
return False
except requests.RequestException as e:
print(f"Error fetching public key: {e}")
return False
keycloak_service = KeycloakService()
# Schemas
class Login(BaseModel):
login: str
password: str
class VerifyToken(BaseModel):
token: str
client_id: str
# Login endpoint
@app.post("/login")
def get_token(body: Login):
data = {
"grant_type": "password",
"client_id": KEYCLOAK_CLIENT_ID,
"username": body.login,
"password": body.password
}
response = requests.post(TOKEN_URL, data=data, verify=False)
if response.status_code == 200:
return JSONResponse(status_code=200, content=response.json())
else:
raise HTTPException(status_code=response.status_code, detail=response.json())
# Verify endpoint
@app.post("/verify")
def verify_token(body: VerifyToken):
is_valid = keycloak_service.validate_token(body.token, body.client_id)
return {"valid": is_valid}
uvicorn api-server:app --reload
API document URL
http://localhost:8000/docs#/
URL
POST http://localhost:8000/login
Input Body
{
"login": "user1",
"password": "1234"
}
Script
for assign user_token variable
var jsonData = JSON.parse(responseBody);
if (jsonData.access_token) {
postman.setEnvironmentVariable("user_token", jsonData.access_token);
} else {
console.error("access_token not found in the response");
}
URL
POST http://localhost:8000/verify
Input Body
{
"token": "{{user_token}}",
"client_id": "admin-cli"
}
So the user token verified with OK (true) value.
{
"valid": true
}