I'm trying to connect to a Confluent Cloud Kafka server which is configured with OpenID identity provider using NodeJS.
I'm able to connect when using the kafka-console-consumer
command (like the confluent cloud example)
# ./kafka.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.login.connect.timeout.ms=15000
sasl.oauthbearer.token.endpoint.url=https://myidp.example.com/oauth2/default/v1/token
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="MY_CLIENT_ID" \
clientSecret="MY_CLIETN_SECRET" \
scope="SCOPE" \
extension_logicalCluster="CLUSTER_ID" \
extension_identityPoolId="IDENTITY_POOL_ID" ;
kafka-console-consumer \
--bootstrap-server KAFKA_BROKER \
--consumer.config ./kafka.properties \
--topic MY_TOPIC \
--group GROUP_NAME
Currently I'm following KafkaJS oauthbearer example https://kafka.js.org/docs/configuration#oauthbearer-example but I'm getting lost when trying to introduce SASL extension
to the mix (i.e extension_logicalCluster and extension_identityPoolId) .
My code so far
# oauthBearerProviderOpenId.ts
import { Issuer, TokenSet } from 'openid-client';
export interface oauthBearerProviderOpenIdOptions {
issuer: string;
clientId: string;
clientSecret: string;
refreshThresholdMs: number;
scope: string,
logicalCluster: string,
identityPoolId: string,
}
export const oauthBearerProviderOpenId = async (options: oauthBearerProviderOpenIdOptions) => {
const issuer = await Issuer.discover(options.issuer);
const { Client } = issuer;
const client = new Client({
client_id: options.clientId,
client_secret: options.clientSecret,
});
let tokenPromise: Promise<string>;
let tokenSet: TokenSet | null;
async function refreshToken() {
try {
if (tokenSet == null) {
tokenSet = await client.grant({
grant_type: 'client_credentials',
scope: options.scope,
});
}
setTimeout(() => {
tokenPromise = refreshToken()
}, tokenSet!.expires_in);
if(!tokenSet.access_token) {
throw new Error('Unable to fetch access_token');
}
return tokenSet.access_token;
} catch (error) {
const e = error as any;
tokenSet = null;
console.error(e.data.payload.toString());
throw error;
}
}
tokenPromise = refreshToken();
return async function () {
return {
value: await tokenPromise
}
}
};
ERROR LOG
SASL OAUTHBEARER authentication failed: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY
KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Authentication failed: 1 extensions are invalid! They are: logicalCluster: CLUSTER_ID_MISSING_OR_EMPTY
After digging into KafkaJS code I found out KafkaJS dose support SASL extensions. (see: https://github.com/tulios/kafkajs/blob/ff3b1117f316d527ae170b550bc0f772614338e9/src/protocol/sasl/oauthBearer/request.js#LL50C53-L50C53)
To pass the SASL extensions use the following code (Based on KafkaJS oauthBearerProvider example https://kafka.js.org/docs/configuration#oauthbearer-example).
import { AccessToken, ClientCredentials } from "simple-oauth2";
export interface OauthBearerProviderOptions {
clientId: string;
clientSecret: string;
host: string;
path: string;
refreshThresholdMs: number;
scope?: string,
saslExtension?: { [key: string]: string }
}
export const oauthBearerProvider = (options: OauthBearerProviderOptions) => {
const client = new ClientCredentials({
client: {
id: options.clientId,
secret: options.clientSecret
},
auth: {
tokenHost: options.host,
tokenPath: options.path
},
options: {
authorizationMethod: 'body',
},
});
let tokenPromise: Promise<string>;
let accessToken: AccessToken | null;
async function refreshToken() {
try {
if (accessToken == null) {
accessToken = await client.getToken({
scope: options.scope,
});
}
if (accessToken.expired(options.refreshThresholdMs / 1000)) {
accessToken = await accessToken.refresh()
}
const nextRefresh = (accessToken.token.expires_in as number) * 1000 - options.refreshThresholdMs;
setTimeout(() => {
tokenPromise = refreshToken()
}, nextRefresh);
return accessToken.token.access_token as string;
} catch (error) {
const e = error as any;
accessToken = null;
console.error(e.data.payload.toString());
throw error;
}
}
tokenPromise = refreshToken();
return async function () {
return {
value: await tokenPromise,
extensions: options.saslExtension // <--- pass sasl extension as key value
}
}
};
sasl extensions object example
{
"logicalCluster": "<CLUSTER_ID>",
"identityPoolId": "<IDENTITY_POOL_ID>"
}