pythonapache-kafkaapicurio-registry

Apicurio Schema Registry 3.0.7 - unable to update multiple versions of schema for artifactId


I have Apicurio Schema Registry 3.0.7 installed, and I'm trying to use the following code to publish a v2 of schema where GROUP = default, ARTIFACT_ID = com.versa.apicurio.confluent.Employee

here is the code :

import requests
import json
import urllib.parse

# ─────────────────────────────────────────────────────────────────────────────
# CONFIGURATION – update these values!
# ─────────────────────────────────────────────────────────────────────────────
KEYCLOAK_URL   = "https://keycloak.vkp.versa-vani.com/realms/readonly-realm"
CLIENT_ID      = "apicurio-registry"
CLIENT_SECRET  = "<secret>"  # ← your client secret here

REGISTRY_URL   = "https://apicurio-sr.vkp.versa-vani.com/apis/registry/v3"

GROUP          = "default"
ARTIFACT_ID    = "com.versa.apicurio.confluent.Employee"

# ─────────────────────────────────────────────────────────────────────────────
def get_token():
    """
    Use client_credentials grant to obtain a service-account token.
    """
    token_url = f"{KEYCLOAK_URL}/protocol/openid-connect/token"
    data = {
        "grant_type": "client_credentials"
    }
    # Use HTTP Basic auth with client_id:client_secret
    resp = requests.post(token_url, data=data,
                         auth=(CLIENT_ID, CLIENT_SECRET))
    resp.raise_for_status()
    return resp.json()["access_token"]


def publish_schema(token):
    """
    Publish (register) an Avro schema under GROUP/artifactId.
    If artifact exists → create new version.
    If not → create artifact first time.
    """
    url_check = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}"
    headers = {
        "Authorization": f"Bearer {token}"
    }
    check_response = requests.get(url_check, headers=headers)
    print(f" check_response.status_code : {check_response.status_code}")
    schema = {
        "type": "record",
        "name": "Employee",
        "namespace": "com.versa.apicurio.confluent",
        "fields": [
            {"name": "id",         "type": "int"},
            {"name": "name",       "type": "string"},
            {"name": "salary",     "type": ["null", "float"],  "default": None},
            {"name": "age",        "type": ["null", "int"],    "default": None},
            {
                "name": "department",
                "type": {
                    "type": "enum",
                    "name": "DepartmentEnum",
                    "symbols": ["HR", "ENGINEERING", "SALES"]
                }
            },
            {"name": "email",      "type": ["null", "string"], "default": None},
            {"name": "new_col",    "type": ["null", "string"], "default": None},
            {"name": "new_col2",   "type": ["null", "string"], "default": None},
            {"name": "new_col3",   "type": ["null", "string"], "default": None},
        ]
    }
    
    schema_str = json.dumps(schema)
    
    # If artifact exists, upload a new version
    if check_response.status_code == 200:
        print("Artifact exists, uploading a new version...")
        url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}/versions"
        
        headers_publish = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }
        
        # For version updates in Apicurio 3.0.7, send the schema directly as content
        resp = requests.post(
            url_publish,
            headers=headers_publish,
            data=schema_str  # Send the schema JSON as the request body
        )
        
        print("Publish response:", resp.status_code, resp.text)
    elif check_response.status_code == 404:
        print("Artifact not found, creating new artifact...")
        url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts"
        
        # For creating a new artifact
        headers_create = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }
        
        # Add X-Registry-ArtifactId header for the new artifact
        headers_create["X-Registry-ArtifactId"] = ARTIFACT_ID
        
        # Add X-Registry-ArtifactType header
        headers_create["X-Registry-ArtifactType"] = "AVRO"
        
        # Send the schema directly as the request body
        resp = requests.post(
            url_publish,
            headers=headers_create,
            data=schema_str  # Send the schema JSON as the request body
        )
    else:
        print(f"Unexpected error while checking artifact: {check_response.status_code} - {check_response.text}")
        check_response.raise_for_status()
        return
        
    print("Publish response:", resp.status_code, resp.text)
    if resp.ok:
        print("Schema published successfully!")
        if 'globalId' in resp.json():
            print("Registered globalId:", resp.json().get("globalId"))
    else:
        print("Error publishing schema:", resp.status_code, resp.text)
    
    resp.raise_for_status()




def get_schemas_and_versions(token, artifact_id):
    """
    List all artifacts and their versions with globalIds in the specified GROUP.
    For each version, print the schema content.
    """
    url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts"
    headers = {
        "Authorization": f"Bearer {token}",
        "Accept": "application/json"
    }

    resp = requests.get(url, headers=headers)
    print("Get schemas response:", resp.status_code)
    resp.raise_for_status()

    # Print the raw response for debugging
    print("Raw artifacts response:", json.dumps(resp.json(), indent=2))

    # Handle both possible response formats
    artifacts_data = resp.json()

    print(" type(resp.json()) -> ", type(resp.json()))

    print(f" artifacts_data : {artifacts_data}")

    if isinstance(artifacts_data, dict):
        artifacts = artifacts_data.get("artifacts", [])
    elif isinstance(artifacts_data, list):
        artifacts = artifacts_data
    else:
        print("Unexpected response format for artifacts list!")
        return

    print(f"Found {len(artifacts)} artifacts in group `{GROUP}`:")

    for art in artifacts:

        print(f"  - Artifact: {art}")

        # Handle both dict and string artifact representations
        if isinstance(art, dict):
            artifact_id_resp = art.get('artifactId')
            created_by = art.get('createdBy', '<unknown>')
            

        else:
            artifact_id_resp = art
            created_by = '<unknown>'

        print(f" artifactId : {artifact_id_resp}, created_by : {created_by}")

        if not artifact_id_resp:
            print(f"  - Skipping artifact with missing ID: {art}")
            continue

        if artifact_id != artifact_id_resp:
            continue

        print(f"  - Artifact ID: {artifact_id_resp}, createdBy: {created_by}")

        # Get all versions for this artifact
        versions_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions"
        versions_resp = requests.get(versions_url, headers=headers)

        print("Get versions response:", versions_resp.status_code)

        if versions_resp.status_code == 200:
            versions_data = versions_resp.json().get('versions', [])

            print(f"Found {len(versions_data)} versions for artifact `{artifact_id_resp}`:")
            print(f"  - Versions data: {versions_data}")

            version_ids = [v.get('version') for v in versions_data]
            print(f" Versions: {version_ids}")

            # For each version, print the globalId and schema content
            for version in version_ids:
                # version_meta_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}/meta"
                version_meta_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}"

                version_meta_resp = requests.get(version_meta_url, headers=headers)
                
                print(" Get version metadata response:", version_meta_resp.status_code)
                if version_meta_resp.status_code == 200:
                    version_meta = version_meta_resp.json()
                    global_id = version_meta.get('globalId', '<not found>')
                    print(f"      - Version: {version}, globalId: {global_id}")

                    # Fetch and print the schema content
                    content_url = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{urllib.parse.quote(str(artifact_id_resp), safe='')}/versions/{version}/content"
                    content_resp = requests.get(content_url, headers=headers)
                    if content_resp.status_code == 200:
                        print(f"        Schema content for version {version}:\n{content_resp.text}")
                    else:
                        print(f"        Failed to get schema content for version {version}: {content_resp.status_code} - {content_resp.text}")
                else:
                    print(f"      - Failed to get version {version} metadata: {version_meta_resp.status_code} - {version_meta_resp.text}")
        else:
            print(f"    Failed to get versions: {versions_resp.status_code} - {versions_resp.text}")

    return artifacts



# ─────────────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    token = get_token()
    print("Token acquired, length:", len(token))

    print(" toekn : ", token)

    # Register your Employee schema
    publish_schema(token)
    
    print("calling - get_schemas_and_versions")
    # List existing schemas
    get_schemas_and_versions(token, ARTIFACT_ID)
    # Optionally, list all schemas
    # get_schemas(token)

Note - the first version of the schema gets published successfully, but the 2nd version is failing .. here is the error

check_response.status_code : 200
Artifact exists, uploading a new version...
Publish response: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"}
Publish response: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"}
Error publishing schema: 400 {"detail":"MissingRequiredParameterException: Request is missing a required parameter: content","title":"Request is missing a required parameter: content","status":400,"name":"MissingRequiredParameterException"}
Traceback (most recent call last):
  File "/Users/karanalang/Documents/Technology/apicurio-schema-registry/helm-charts/keycloak/consolidate-tls-admin-confluent-v3.py", line 405, in <module>
    publish_schema(token)
  File "/Users/karanalang/Documents/Technology/apicurio-schema-registry/helm-charts/keycloak/consolidate-tls-admin-confluent-v3.py", line 124, in publish_schema
    resp.raise_for_status()
  File "/opt/homebrew/lib/python3.9/site-packages/requests/models.py", line 1024, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://apicurio-sr.vkp.versa-vani.com/apis/registry/v3/groups/default/artifacts/com.versa.apicurio.confluent.Employee/version

Issue seems to be with this part of the code :

if check_response.status_code == 200:
        print("Artifact exists, uploading a new version...")
        url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}/versions"
        
        headers_publish = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }
        
        # For version updates in Apicurio 3.0.7, send the schema directly as content
        resp = requests.post(
            url_publish,
            headers=headers_publish,
            data=schema_str  # Send the schema JSON as the request body
        )
        
        print("Publish response:", resp.status_code, resp.text)

How do i debug/fix this ?

tia!


Solution

  • This change worked for me ->

    if check_response.status_code == 200:
            print("Artifact exists, uploading a new version...")
            
            # According to Apicurio Registry 3.0.x docs, version endpoint
            url_publish = f"{REGISTRY_URL}/groups/{GROUP}/artifacts/{ARTIFACT_ID}/versions"
            
            # For version content - use the nested structure the API is expecting
            version_content = {
                "content": {
                    "content": schema_str,  # Send schema as string in nested content structure
                    "contentType": "application/json"
                }
            }
            
            resp = requests.post(
                url_publish,
                headers={
                    "Authorization": f"Bearer {token}",
                    "Content-Type": "application/json"
                },
                json=version_content
            )