I have Apicurio Schema Registry 3.0.7 installed, and I'm trying to use the following code to publish a v2 of schema where GROUP = default, ARTIFACT_ID = com.versa.apicurio.confluent.Employee
here is the code :
import requests
import json
import urllib.parse
# ─────────────────────────────────────────────────────────────────────────────
# CONFIGURATION – update these values!
# ─────────────────────────────────────────────────────────────────────────────
KEYCLOAK_URL = "https://keycloak.vkp.versa-vani.com/realms/readonly-realm"
CLIENT_ID = "apicurio-registry"
CLIENT_SECRET = "<secret>" # ← your client secret here
REGISTRY_URL = "https://apicurio-sr.vkp.versa-vani.com/apis/registry/v3"
GROUP = "default"
ARTIFACT_ID = "com.versa.apicurio.confluent.Employee"
# ─────────────────────────────────────────────────────────────────────────────
def get_token():
"""
Use client_credentials grant to obtain a service-account token.
"""
token_url = f"{KEYCLOAK_URL}/protocol/openid-connect/token"
data = {
"grant_type": "client_credentials"
}
# Use HTTP Basic auth with client_id:client_secret
resp = requests.post(token_url, data=data,
auth=(CLIENT_ID, CLIENT_SECRET))
resp.raise_for_status()
return resp.json()["access_token"]
def publish_schema(token):
"""
Publish (register) an Avro schema under GROUP/artifactId.
If artifact exists → create new version.
If not → create artifact first time.
"""
url_check = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}"
headers = {
"Authorization": f"Bearer {token}"
}
check_response = requests.get(url_check, headers=headers)
print(f" check_response.status_code : {check_response.status_code}")
schema = {
"type": "record",
"name": "Employee",
"namespace": "com.versa.apicurio.confluent",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "salary", "type": ["null", "float"], "default": None},
{"name": "age", "type": ["null", "int"], "default": None},
{
"name": "department",
"type": {
"type": "enum",
"name": "DepartmentEnum",
"symbols": ["HR", "ENGINEERING", "SALES"]
}
},
{"name": "email", "type": ["null", "string"], "default": None},
{"name": "new_col", "type": ["null", "string"], "default": None},
{"name": "new_col2", "type": ["null", "string"], "default": None},
{"name": "new_col3", "type": ["null", "string"], "default": None},
]
}
schema_str = json.dumps(schema)
# If artifact exists, upload a new version
if check_response.status_code == 200:
print("Artifact exists, uploading a new version...")
url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}/versions"
headers_publish = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# For version updates in Apicurio 3.0.7, send the schema directly as content
resp = requests.post(
url_publish,
headers=headers_publish,
data=schema_str # Send the schema JSON as the request body
)
print("Publish response:", resp.status_code, resp.text)
elif check_response.status_code == 404:
print("Artifact not found, creating new artifact...")
url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts"
# For creating a new artifact
headers_create = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Add X-Registry-ArtifactId header for the new artifact
headers_create["X-Registry-ArtifactId"] = ARTIFACT_ID
# Add X-Registry-ArtifactType header
headers_create["X-Registry-ArtifactType"] = "AVRO"
# Send the schema directly as the request body
resp = requests.post(
url_publish,
headers=headers_create,
data=schema_str # Send the schema JSON as the request body
)
else:
print(f"Unexpected error while checking artifact: {check_response.status_code} - {check_response.text}")
check_response.raise_for_status()
return
print("Publish response:", resp.status_code, resp.text)
if resp.ok:
print("Schema published successfully!")
if 'globalId' in resp.json():
print("Registered globalId:", resp.json().get("globalId"))
else:
print("Error publishing schema:", resp.status_code, resp.text)
resp.raise_for_status()
def get_schemas_and_versions(token, artifact_id):
"""
List all artifacts and their versions with globalIds in the specified GROUP.
For each version, print the schema content.
"""
url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts"
headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
resp = requests.get(url, headers=headers)
print("Get schemas response:", resp.status_code)
resp.raise_for_status()
# Print the raw response for debugging
print("Raw artifacts response:", json.dumps(resp.json(), indent=2))
# Handle both possible response formats
artifacts_data = resp.json()
print(" type(resp.json()) -> ", type(resp.json()))
print(f" artifacts_data : {artifacts_data}")
if isinstance(artifacts_data, dict):
artifacts = artifacts_data.get("artifacts", [])
elif isinstance(artifacts_data, list):
artifacts = artifacts_data
else:
print("Unexpected response format for artifacts list!")
return
print(f"Found {len(artifacts)} artifacts in group `{GROUP}`:")
for art in artifacts:
print(f" - Artifact: {art}")
# Handle both dict and string artifact representations
if isinstance(art, dict):
artifact_id_resp = art.get('artifactId')
created_by = art.get('createdBy', '<unknown>')
else:
artifact_id_resp = art
created_by = '<unknown>'
print(f" artifactId : {artifact_id_resp}, created_by : {created_by}")
if not artifact_id_resp:
print(f" - Skipping artifact with missing ID: {art}")
continue
if artifact_id != artifact_id_resp:
continue
print(f" - Artifact ID: {artifact_id_resp}, createdBy: {created_by}")
# Get all versions for this artifact
versions_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions"
versions_resp = requests.get(versions_url, headers=headers)
print("Get versions response:", versions_resp.status_code)
if versions_resp.status_code == 200:
versions_data = versions_resp.json().get('versions', [])
print(f"Found {len(versions_data)} versions for artifact `{artifact_id_resp}`:")
print(f" - Versions data: {versions_data}")
version_ids = [v.get('version') for v in versions_data]
print(f" Versions: {version_ids}")
# For each version, print the globalId and schema content
for version in version_ids:
# version_meta_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}/meta"
version_meta_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}"
version_meta_resp = requests.get(version_meta_url, headers=headers)
print(" Get version metadata response:", version_meta_resp.status_code)
if version_meta_resp.status_code == 200:
version_meta = version_meta_resp.json()
global_id = version_meta.get('globalId', '<not found>')
print(f" - Version: {version}, globalId: {global_id}")
# Fetch and print the schema content
content_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}/content"
content_resp = requests.get(content_url, headers=headers)
if content_resp.status_code == 200:
print(f" Schema content for version {version}:\n{content_resp.text}")
else:
print(f" Failed to get schema content for version {version}: {content_resp.status_code} - {content_resp.text}")
else:
print(f" - Failed to get version {version} metadata: {version_meta_resp.status_code} - {version_meta_resp.text}")
else:
print(f" Failed to get versions: {versions_resp.status_code} - {versions_resp.text}")
return artifacts
# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
token = get_token()
print("Token acquired, length:", len(token))
print(" toekn : ", token)
# Register your Employee schema
publish_schema(token)
print("calling - get_schemas_and_versions")
# List existing schemas
get_schemas_and_versions(token, ARTIFACT_ID)
# Optionally, list all schemas
# get_schemas(token)
Note - the first version of the schema gets published successfully, but the 2nd version is failing .. here is the error
check_response.status_code : 200
Artifact exists, uploading a new version...
Publish response: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"}
Publish response: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"}
Error publishing schema: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"}
Traceback (most recent call last):
File "/Users/karanalang/Documents/Technology/apicurio-schema-registry/helm-charts/keycloak/consolidate-tls-admin-confluent-v3.py", line 405, in <module>
publish_schema(token)
File "/Users/karanalang/Documents/Technology/apicurio-schema-registry/helm-charts/keycloak/consolidate-tls-admin-confluent-v3.py", line 124, in publish_schema
resp.raise_for_status()
File "/opt/homebrew/lib/python3.9/site-packages/requests/models.py", line 1024, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://apicurio-sr.vkp.versa-vani.com/apis/registry/v3/groups/default/artifacts/com.versa.apicurio.confluent.Employee/version
Issue seems to be with this part of the code :
if check_response.status_code == 200:
print("Artifact exists, uploading a new version...")
url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}/versions"
headers_publish = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# For version updates in Apicurio 3.0.7, send the schema directly as content
resp = requests.post(
url_publish,
headers=headers_publish,
data=schema_str # Send the schema JSON as the request body
)
print("Publish response:", resp.status_code, resp.text)
How do i debug/fix this ?
tia!
This change worked for me ->
if check_response.status_code == 200:
print("Artifact exists, uploading a new version...")
# According to Apicurio Registry 3.0.x docs, version endpoint
url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}/versions"
# For version content - use the nested structure the API is expecting
version_content = {
"content": {
"content": schema_str, # Send schema as string in nested content structure
"contentType": "application/json"
}
}
resp = requests.post(
url_publish,
headers={
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
},
json=version_content
)