So i am trying to write a lambda function on S3 event which will put message into kafka topic . My aws lambda function is triggering and not getting any error also . But i am not able to see those messages into Kafka topic .
Here is my lambda function
String srcBucket = record.getS3().getBucket().getName();
String srcKey = record.getS3().getObject().getUrlDecodedKey();
System.out.println("Bucket is " + srcBucket + " and Key is " + srcKey);
// Assign topicName to string variable
String topicName = "AWSKafkaTutorialTopic";
// create instance for properties to access producer configs
Properties props = new Properties();
props.put("bootstrap.servers",
"b-3.205147-riskaudit.rtyrty.c5.kafka.us-east-1.amazonaws.com:9092,b-4.205147-riskaudit.rtyt.c5.kafka.us-east-1.amazonaws.com:9092,b-5.205147-tryrt.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092");
System.out.println("bootstrap.servers successfully");
// Set acknowledgements for producer requests.
props.put("acks", "all");
// If the request fails, the producer can automatically retry,
props.put("retries", 0);
// Specify buffer size in config
props.put("batch.size", 16384);
// Reduce the no of requests less than 0
props.put("linger.ms", 1);
// The buffer.memory controls the total amount of memory available to the
// producer for buffering.
props.put("buffer.memory", 33554432);
System.out.println("before key.serializer successfully");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
System.out.println("after key.serializer successfully");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
System.out.println("Inside loop successfully");
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
System.out.println("Message sent successfully");
producer.close();
return "Message Pushed success fully";
My lambda function is running till for loop but not able to see what happen after that . Please help
All looks ok to me
Just add props.put("producer.type", "async");
And you might not be running your lmbda function from the vpc in which MSK is launched .
Also please be mindful about the IAM policy .
Try with this AWSLambdaVPCAccessExecutionRole
and also security group .
If you set all this up code will start working .