javaamazon-web-servicesspring-bootapache-kafkaaws-msk

Cannot able to access AWS Glue Schema Registery using Spring Boot


I have created a Schema Registry using the AWS CLI, but cannot able to access it using the SpringBoot. It is showing me an error that is "Failed to get schemaVersionId by schema definition for schema name = hydra-testschema"

Creating Schema using the CLI

enter image description here

Then I am using SpringBoot Application to test the schema. Here is the schema structure and sample code.

Customer.avsc

{
  "type": "record",
  "namespace": "ABC_Organization",
  "name": "Employee",
  "fields": [
    {
      "name": "Name",
      "type": "string"
    },
    {
      "name": "Age",
      "type": "int"
    },
    {
      "name": "address",
      "type": {
        "type": "record",
        "name": "addressRecord",
        "fields": [
          {
            "name": "street",
            "type": "string"
          },
          {
            "name": "zipcode",
            "type": "int"
          }
        ]
      }
    }
  ]
}

SampleProducer.java





public class SampleMskProducer{
    private static final Properties properties = new Properties();
    private final static Logger LOGGER = LoggerFactory.getLogger(org.apache.kafka.clients.producer.Producer.class.getName());

    public static void main(String[] args) throws Exception {

        String username = "BrokerUserName";
        String password = "Password";

        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String jaasCfg = String.format(jaasTemplate, username, password);

        System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");

        // Setting kafka properties
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Broker");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
        properties.put(AWSSchemaRegistryConstants.AWS_REGION, "usa-east-1");
        properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "sandbox");
        properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "hydra-testschemaa");
        properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "AVRO");
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
        properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);

        SystemPropertiesCredentialsProvider systemPropertiesCredentialsProvider=new SystemPropertiesCredentialsProvider();

//Passing the secrets
        System.setProperty("aws.accessKeyId", "A");
        System.setProperty("aws.secretAccessKey", "wC");

// Your AWS SDK or AWS-related code here

        // Declearing and parsing the Schema fields for generic record builder.
        Schema schema_customer = null;
        try {
            schema_customer = new Parser().parse(new File("Customer.avsc"));
        } catch (IOException e) {
            e.printStackTrace();
        }
     
        GenericRecord customer = new GenericData.Record(schema_customer);
        GenericRecord addressRecord = new GenericData.Record(schema_customer.getField("address").schema());
        LOGGER.info("Generic records phase completed...");

        Random rand = new Random();
        int zipcodeValue = 9999;
        int ageMaxValue = 100;

        // Initializing the Producer client, build records and publishing the message to the kafka broker
        try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(properties)) {
            final ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("hydra.proxy.updates", customer);
            LOGGER.info("Starting to send records...");
            for (int i = 0; i < 10000; i++) {
                addressRecord.put("street", "city-" + i);
                addressRecord.put("zipcode", rand.nextInt(zipcodeValue));
                customer.put("Name","name-"+ i);
                customer.put("Age",rand.nextInt(ageMaxValue));
                customer.put("address", addressRecord);
                producer.send(record, new ProducerCallback());
            }
            producer.flush();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // Defining method for retrieving secret from Secretmanager

    // Callback class for producer client for logging.
    private static class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e) {
            if (e == null) {
                LOGGER.info("Received new metadata. \t" +
                "Topic:" + recordMetaData.topic() + "\t" +
                "Partition: " + recordMetaData.partition() + "\t" +
                "Offset: " + recordMetaData.offset() + "\t" +
                "Timestamp: " + recordMetaData.timestamp());
            } else {
                LOGGER.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }
}

I am getting the following error after executing:

enter image description here

Can anyone please check ?


Solution

  • Your AWS_REGION value in your configuration should be us-east-1 instead of usa-east-1 - that's why the UnknownHostException occurs in your screenshot. AWS does not have a region usa-east-1, hence the host you want to reach is unknown.