javajsonapache-kafkaopensearchhttp-status-code-406

Getting status code 406 every time i send a message with kafka


Hi i'm trying to write a RESTful service with Kafka. I'm writing my code to make it store the messages received from kafka by the consumer into an open search index every time i make a GET or a POST request, but i'm constantly receiving this error:

Result: {"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}

This is my consumer:

    public class Main {
    static Client client = new Client();

    public static void main(String[] args) throws IOException, NoSuchAlgorithmException, KeyManagementException {

        String servers = "localhost:9092";
        String groupId = "id";
        String topic = "mytopic";

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.subscribe(Arrays.asList(topic));

        Logger logger = LoggerFactory.getLogger(Main.class.getName());

        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
            for (ConsumerRecord<String, String> record : records) {
                logger.info("Key:" + record.key() + ", Value:" + record.value());
                logger.info("Partition:" + record.partition() + ", Offset:" + record.offset());

                String result = client.run("https://localhost:9200/index3/_create/" + record.offset(), "admin", "admin", record);
                System.out.println("Result: " + result);
            }
        }
    }
}

Is there a way to avoid this??

My message is "Request":"valid", i tried to write it like a key-value json but i'm wrong

EDIT: i tried to set the Content type to "application/json" but it throws the same error!

This my client:

    public String run(String url, String username, String password, ConsumerRecord<String, String> record) throws IOException, NoSuchAlgorithmException, KeyManagementException {

        //TrustManager per accettare tutti i certificati
        TrustManager[] trustAllCerts = new TrustManager[]{new Client()};

        SSLContext sslContext = SSLContext.getInstance("TLS");
        sslContext.init(null, trustAllCerts, new SecureRandom());

        //Client
        OkHttpClient client = new OkHttpClient.Builder()
                .connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS))
                .sslSocketFactory(sslContext.getSocketFactory(), (X509TrustManager) trustAllCerts[0])
                .hostnameVerifier((hostname, session) -> true)
                .build();

        //Authentication
        String credentials = username + ":" + password;
        String encode = Base64.getEncoder().encodeToString(credentials.getBytes());
        String authHeader = "Basic " + encode;


        RequestBody requestBody = new FormBody.Builder()
                .add("Message:", "n°" + record.offset())
                .build();

        Request request = new Request.Builder()
                .url(url)
                .header("Authorization", authHeader)
                .header("Content-Type", "application/json")
                .post(requestBody) //Body della post
                .build();

        try (Response response = client.newCall(request).execute()) {
            return response.body().string();
        }
    }
}

Solution

  • I solved making my request body a JsonObject using GSon dependencies:

       <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.10.1</version>
        </dependency>
    

    And that's the code:

    JsonObject json = new JsonObject();
        json.addProperty("Message n°", record.offset());
        RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), json.toString());
    
        Request request = new Request.Builder()
                .url(url)
                .header("Authorization", authHeader)
                .post(requestBody) 
                .build();