Skip to main content

Set up Kafka OAuth 2.0/OIDC authentication with AWS IAM using Outbound Identity Federation

Use AWS IAM Outbound Identity Federation to authenticate Apache Kafka® clients with OAuth 2.0/OIDC. AWS IAM principals can connect to Aiven for Apache Kafka® without managing separate credentials by using short-lived JSON Web Tokens (JWTs) issued by AWS Security Token Service (AWS STS).

Prerequisites

Before you begin, make sure you have:

Define the audience value, typically as your Kafka service hostname. Replace SERVICE_NAME with your value:

AUDIENCE=$(avn service get SERVICE_NAME --json | jq .user_config.kafka.sasl_oauthbearer_expected_audience)

Use the same audience value when you configure AWS, Aiven for Apache Kafka, and your Kafka client.

Enable Outbound Identity Federation in AWS

Enable Outbound Identity Federation for your AWS account:

aws iam enable-outbound-web-identity-federation

Verify IAM permissions

Make sure the AWS principal that requests the token has permission to call sts:GetWebIdentityToken with the configured audience.

The following example policy grants the minimum required permissions:

{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Action": "sts:GetWebIdentityToken",
"Resource": "*",
"Condition": {
"ForAllValues:StringEquals": {
"sts:IdentityTokenAudience": "${AUDIENCE}"
},
"NumericLessThanEquals": {
"sts:DurationSeconds": 3600
}
}
}]
}

Retrieve the issuer URL

Get the issuer URL from your AWS account configuration:

ISSUER_URL=$(aws iam get-outbound-web-identity-federation-info | jq -r .IssuerIdentifier)

The issuer URL has the form https://<uuid>.tokens.sts.global.api.aws.

Configure OIDC for Aiven for Apache Kafka

Configure your Kafka service with the OIDC issuer, JSON Web Key Set (JWKS) endpoint, audience, and subject claim.

Replace PROJECT_NAME and SERVICE_NAME with your values:

avn service update --project PROJECT_NAME SERVICE_NAME \
-c kafka.sasl_oauthbearer_jwks_endpoint_url="${ISSUER_URL}/.well-known/jwks.json" \
-c kafka.sasl_oauthbearer_expected_issuer="${ISSUER_URL}" \
-c kafka.sasl_oauthbearer_expected_audience="${AUDIENCE}" \
-c kafka.sasl_oauthbearer_sub_claim_name="sub"
note

This command triggers a rolling restart of your Apache Kafka brokers. To minimize impact, apply it during a maintenance window.

For details about each parameter, see OIDC parameters.

Configure Kafka ACLs for your IAM principal

The JWT issued by AWS STS contains the IAM principal Amazon Resource Name (ARN) as the sub claim. Aiven for Apache Kafka uses this value as the Kafka principal, so configure your access control lists (ACLs) to reference the full ARN prefixed with User:.

Use the Aiven CLI to add the required ACLs. Replace PROJECT_NAME, SERVICE_NAME, and the ARN with your values:

PRINCIPAL="User:arn:aws:iam::123456789012:user/my-iam-user"

# Allow topic describe operations.
avn service kafka-acl-add --project PROJECT_NAME SERVICE_NAME \
--permission allow \
--principal "${PRINCIPAL}" \
--operation Describe \
--resource-type topic \
--pattern-type literal \
--resource-name my-topic

# Allow produce operations.
avn service kafka-acl-add --project PROJECT_NAME SERVICE_NAME \
--permission allow \
--principal "${PRINCIPAL}" \
--operation Write \
--resource-type topic \
--pattern-type literal \
--resource-name my-topic

# Allow consume operations from the topic.
avn service kafka-acl-add --project PROJECT_NAME SERVICE_NAME \
--permission allow \
--principal "${PRINCIPAL}" \
--operation Read \
--resource-type topic \
--pattern-type literal \
--resource-name my-topic

# Allow consume operations with the consumer group.
avn service kafka-acl-add --project PROJECT_NAME SERVICE_NAME \
--permission allow \
--principal "${PRINCIPAL}" \
--operation Read \
--resource-type group \
--pattern-type literal \
--resource-name my-consumer-group

To verify that the ACLs are in place, run the following command:

avn service kafka-acl-list --project PROJECT_NAME SERVICE_NAME

The output is similar to the following:

ID              PERMISSION_TYPE  PRINCIPAL                                              OPERATION  RESOURCE_TYPE  PATTERN_TYPE  RESOURCE_NAME      HOST
============== =============== ===================================================== ========= ============= ============ ================= ====
acl5baf3a5cada ALLOW User:arn:aws:iam::123456789012:user/my-iam-user Describe Topic LITERAL my-topic *
acl5baf3a77c15 ALLOW User:arn:aws:iam::123456789012:user/my-iam-user Write Topic LITERAL my-topic *
acl5baf3a60d8f ALLOW User:arn:aws:iam::123456789012:user/my-iam-user Read Topic LITERAL my-topic *
acl5baf3ab8316 ALLOW User:arn:aws:iam::123456789012:user/my-iam-user Read Group LITERAL my-consumer-group *

Configure the Kafka client

Your client must request a JWT from AWS STS at runtime and use it to authenticate with the Kafka service by using the OAUTHBEARER SASL mechanism.

Python example

Install the required libraries:

pip install confluent-kafka boto3 PyJWT

Download the CA certificate for your service from the Aiven Console. Use the following code:

from confluent_kafka import Producer, Consumer
import boto3
import jwt

# Aiven Kafka config
KAFKA_BOOTSTRAP = "<your-service-hostname>:<port>"
CA_CERT_PATH = "ca.pem" # Download from the Aiven Console
AUDIENCE = "<your-kafka-service-hostname>" # Must match the server-side audience setting


def fetch_token_from_aws_sts(config):
"""
Fetch a short-lived JWT from AWS STS.

Equivalent to:
aws sts get-web-identity-token \
--audience "<AUDIENCE>" \
--signing-algorithm RS256 \
--duration-seconds 3600
"""
sts_client = boto3.client("sts")
response = sts_client.get_web_identity_token(
Audience=[AUDIENCE],
DurationSeconds=3600,
SigningAlgorithm="RS256",
)
token = response["WebIdentityToken"]
expiry = response["Expiration"].timestamp()
return token, expiry


# Verify that token retrieval works before connecting.
token, expiry = fetch_token_from_aws_sts(None)
decoded = jwt.decode(token, options={"verify_signature": False})
print(decoded)

# Client config
kafka_config = {
"bootstrap.servers": KAFKA_BOOTSTRAP,
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"oauth_cb": fetch_token_from_aws_sts,
"ssl.ca.location": CA_CERT_PATH,
}

# Producer example
delivery_errors = []


def delivery_callback(err, msg):
if err:
delivery_errors.append(err)
else:
print(
f"Message produced to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
)


producer = Producer(kafka_config)
producer.produce("my-topic", key="key", value="hello from OIDC", callback=delivery_callback)
producer.flush()

if delivery_errors:
raise RuntimeError(f"Message delivery failed: {delivery_errors[0]}")

# Consumer example
consumer_config = {
**kafka_config,
"group.id": "my-consumer-group",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_config)
consumer.subscribe(["my-topic"])
try:
while True:
msg = consumer.poll(timeout=5.0)
if msg is None:
print("No message received.")
break
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received: {msg.value().decode('utf-8')}")
finally:
consumer.close()

Java example

Implement a custom AuthenticateCallbackHandler that calls GetWebIdentityToken and refreshes the token before it expires.

Pass the handler class to your Kafka client configuration:

bootstrap.servers=<your-service-hostname>:<port>
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
sasl.login.callback.handler.class=<your-handler-that-calls-GetWebIdentityToken-and-refreshes-the-token-before-it-expires>

Configure your handler to:

  1. Call the AWS SDK StsClient.getWebIdentityToken() with the configured audience and a signing algorithm.
  2. Return the token and its expiration time.
  3. Refresh the token before it expires to avoid authentication failures.

Optional: Disable other authentication mechanisms

To use only OAuth 2.0/OIDC authentication, you can now disable all other SASL authentication mechanisms. Replace SERVICE_NAME with your value:

avn service update SERVICE_NAME \
-c "kafka_sasl_mechanisms.plain=false" \
-c "kafka_sasl_mechanisms.scram_sha_256=false" \
-c "kafka_sasl_mechanisms.scram_sha_512=false"

Related pages