pythonsslapache-kafkakafka-producer-apiapache-kafka-security

Setting Python KafkaProducer sasl mechanism property


The sasl mechanism we are using is SCRAM-SHA-256 but the kafka producer will only accept sasl_mechanism as PLAIN, GSSAPI, OAUTHBEARER

The following config will give the error

sasl_mechanism must be in PLAIN, GSSAPI, OAUTHBEARER

config

    ssl_produce = KafkaProducer(bootstrap_servers='brokerCName:9093',
                     security_protocol='SASL_SSL',
                     ssl_cafile='pemfilename.pem',
                     sasl_mechanism='SCRAM-SHA-256',
                     sasl_plain_username='password',
                     sasl_plain_password='secret')

I need to know how can I specify the correct sasl mechanism.

Thanks


Solution

  • Updated answer for kafka-python v2.0.0+

    Since 2.0.0, kafka-python supports both SCRAM-SHA-256 and SCRAM-SHA-512.


    Previous answer for older versions of kafka-python

    As far as I understand, you are using kafka-python client. From the source code, I can see that sasl_mechanism='SCRAM-SHA-256' is not a valid option:

        """
        ...
        sasl_mechanism (str): Authentication mechanism when security_protocol
            is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
            PLAIN, GSSAPI, OAUTHBEARER.
        ...
        """
    
        if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'):
            assert self.config['sasl_mechanism'] in self.SASL_MECHANISMS, (
                'sasl_mechanism must be in ' + ', '.join(self.SASL_MECHANISMS)) 
    

    One quick workaround is to use confluent-kafka client that supports sasl_mechanism='SCRAM-SHA-256':

    from confluent_kafka import Producer 
    
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    conf = {
        'bootstrap.servers': 'localhost:9092',
        'security.protocol': 'SASL_SSL',
        'sasl.mechanisms': 'SCRAM-SHA-256',
        'sasl.username': 'yourUsername',
        'sasl.password': 'yourPassword', 
        # any other config you like ..
    }
    
    p = Producer(**conf)
     
    # Rest of your code goes here..