Hi i'm trying to write a RESTful service with Kafka. I'm writing my code to make it store the messages received from kafka by the consumer into an open search index every time i make a GET or a POST request, but i'm constantly receiving this error:
Result: {"error":"Content-Type header [application/x-www-form-urlencoded] is not supported","status":406}
This is my consumer:
public class Main {
static Client client = new Client();
public static void main(String[] args) throws IOException, NoSuchAlgorithmException, KeyManagementException {
String servers = "localhost:9092";
String groupId = "id";
String topic = "mytopic";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
kafkaConsumer.subscribe(Arrays.asList(topic));
Logger logger = LoggerFactory.getLogger(Main.class.getName());
while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(3000));
for (ConsumerRecord<String, String> record : records) {
logger.info("Key:" + record.key() + ", Value:" + record.value());
logger.info("Partition:" + record.partition() + ", Offset:" + record.offset());
String result = client.run("https://localhost:9200/index3/_create/" + record.offset(), "admin", "admin", record);
System.out.println("Result: " + result);
}
}
}
}
Is there a way to avoid this??
My message is "Request":"valid", i tried to write it like a key-value json but i'm wrong
EDIT: i tried to set the Content type to "application/json" but it throws the same error!
This my client:
public String run(String url, String username, String password, ConsumerRecord<String, String> record) throws IOException, NoSuchAlgorithmException, KeyManagementException {
//TrustManager per accettare tutti i certificati
TrustManager[] trustAllCerts = new TrustManager[]{new Client()};
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, trustAllCerts, new SecureRandom());
//Client
OkHttpClient client = new OkHttpClient.Builder()
.connectionSpecs(Arrays.asList(ConnectionSpec.MODERN_TLS, ConnectionSpec.COMPATIBLE_TLS))
.sslSocketFactory(sslContext.getSocketFactory(), (X509TrustManager) trustAllCerts[0])
.hostnameVerifier((hostname, session) -> true)
.build();
//Authentication
String credentials = username + ":" + password;
String encode = Base64.getEncoder().encodeToString(credentials.getBytes());
String authHeader = "Basic " + encode;
RequestBody requestBody = new FormBody.Builder()
.add("Message:", "n°" + record.offset())
.build();
Request request = new Request.Builder()
.url(url)
.header("Authorization", authHeader)
.header("Content-Type", "application/json")
.post(requestBody) //Body della post
.build();
try (Response response = client.newCall(request).execute()) {
return response.body().string();
}
}
}
I solved making my request body a JsonObject using GSon dependencies:
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
And that's the code:
JsonObject json = new JsonObject();
json.addProperty("Message n°", record.offset());
RequestBody requestBody = RequestBody.create(MediaType.parse("application/json"), json.toString());
Request request = new Request.Builder()
.url(url)
.header("Authorization", authHeader)
.post(requestBody)
.build();