amazon-web-servicesapache-kafkaaws-lambdakafka-producer-apiamazon-msk

Not able to produce message into kafka topic using aws lambda function


So i am trying to write a lambda function on S3 event which will put message into kafka topic . My aws lambda function is triggering and not getting any error also . But i am not able to see those messages into Kafka topic .

Here is my lambda function

String srcBucket = record.getS3().getBucket().getName();

        String srcKey = record.getS3().getObject().getUrlDecodedKey();

        System.out.println("Bucket is " + srcBucket + "  and Key is " + srcKey);
        // Assign topicName to string variable
        String topicName = "AWSKafkaTutorialTopic";

        // create instance for properties to access producer configs
        Properties props = new Properties();

        props.put("bootstrap.servers",
                "b-3.205147-riskaudit.rtyrty.c5.kafka.us-east-1.amazonaws.com:9092,b-4.205147-riskaudit.rtyt.c5.kafka.us-east-1.amazonaws.com:9092,b-5.205147-tryrt.xt08nj.c5.kafka.us-east-1.amazonaws.com:9092");
        System.out.println("bootstrap.servers successfully");
        // Set acknowledgements for producer requests.
        props.put("acks", "all");

        // If the request fails, the producer can automatically retry,
        props.put("retries", 0);

        // Specify buffer size in config
        props.put("batch.size", 16384);

        // Reduce the no of requests less than 0
        props.put("linger.ms", 1);

        // The buffer.memory controls the total amount of memory available to the
        // producer for buffering.
        props.put("buffer.memory", 33554432);

        System.out.println("before key.serializer successfully");

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        System.out.println("after  key.serializer successfully");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        System.out.println("Inside loop successfully");
        for (int i = 0; i < 10; i++)

            producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)));
        System.out.println("Message sent successfully");
        producer.close();

        return "Message Pushed success fully";

My lambda function is running till for loop but not able to see what happen after that . Please help


Solution

  • All looks ok to me Just add props.put("producer.type", "async"); And you might not be running your lmbda function from the vpc in which MSK is launched . Also please be mindful about the IAM policy . Try with this AWSLambdaVPCAccessExecutionRole and also security group .

    If you set all this up code will start working .