Getting issues while using Aiven custom sink connecter with Confluent cloud (Avro data only)

Hi everyone,

I am facing an issue while adding Aiven open search custom sink connecter with Confluent cloud (While my topic has Avro data).

In the case of JSON data present on Topic, everything is working fine, but in the case of Avro, I am facing issues like I can see connecter is running but showing all messages in the message behind the section. No error explains why it’s happening.

Config used:
{
“connection.password”: “PASSWORD”,
“connection.url”: “OS_URL”,
“connection.username”: “USER”,
“key.ignore”: “true”,
“topics”: “TOPIC”,
“type.name”: “os_v1”,
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.basic.auth.credentials.source”: “USER_INFO”,
“value.converter.schema.registry.basic.auth.user.info”: “API_KEY:API_SECRET”,
“value.converter.schema.registry.url”: “schema.registry.url”,
“value.converter.schemas.enable”: “true”
}
Any help will be appreciated. Thanks in advance

Open search is setup on AWS

Some logs for sink:
let me share some logs Warn/Error from confluent console
WARN
These configurations ‘[metrics.context.resource.connector, metrics.context.resource.version, confluent.telemetry.exporter.cloud.client.attempts.max, confluent.telemetry.exporter.cloud.client.retry.delay.seconds, metrics.context.resource.type, metrics.context.resource.commit.id, metrics.context.resource.task, confluent.telemetry.labels.physical_cluster_id, metrics.context.connect.kafka.cluster.id, metrics.context.connect.group.id, confluent.telemetry.labels.k8s.pod.name, confluent.telemetry.exporter.cloud.api.key, confluent.telemetry.labels.app, confluent.telemetry.exporter.cloud.type, confluent.telemetry.exporter.cloud.client.base.url, confluent.telemetry.exporter.cloud.api.secret, confluent.telemetry.exporter.cloud.enabled]’ were supplied but are not used yet.
WARN
Ignoring redefinition of existing telemetry label connect.version
ERROR
WorkerSinkTask{id=id} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Tolerance exceeded in error handler
Tolerance exceeded in error handler

1 Like

Hi Rajneesh,

Thanks for the question! The ERROR you mentioned

WorkerSinkTask{id=id} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Tolerance exceeded in error handler

Is quite generic and unfortunately doesn’t give much information.
I tried a similar setup within an Aiven for Apache Kafka and an Aiven for OpenSearch over a topic in AVRO format and was able to successfully sink data with the following configuration

{
    "connection.username": "[USERNAME]",
    "connection.password": "[PWD]",
    "connection.url": "[PROTOCOL]://[HOSTNAME]:[PORT]",
    "name": "[CONNECTOR_NAME]",
    "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
    "tasks.max": "1",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "[PROTOCOL]://[SCHEMA_REGISTRY_HOSTNAME]:[SCHEMA_REGISTRY_PORT]",
    "key.converter.basic.auth.credentials.source": "USER_INFO",
    "key.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USR]:[SCHEMA_REGISTRY_PWD]",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "[PROTOCOL]://[SCHEMA_REGISTRY_HOSTNAME]:[SCHEMA_REGISTRY_PORT]",
    "value.converter.basic.auth.credentials.source": "USER_INFO",
    "value.converter.schema.registry.basic.auth.user.info": "[SCHEMA_REGISTRY_USR]:[SCHEMA_REGISTRY_PWD]",
    "topics": "[TOPIC_NAME]",
    "transforms": "transform",
    "transforms.transform.field": "mykeyfield",
    "transforms.transform.type": "org.apache.kafka.connect.transforms.ExtractField$Key"
}

The extract transformation is needed to provide Opensearch a document Id which is not a struct.

2 Likes

HI Francesco Tisiot,

Thanks for the response it help a lot, i am missing transforms value in my config, and because of that i am getting this error.

I need one more help can you please redirect me to any documentation regrading this transforms as i am new to this.

Thanks in advance.

Hi @Rajneesh_kumar_Sharm,

I’m not sure your error is related to the missing transformation. Usually when the problem is in the transform, the connector will manifest itself with the connector failing and an error about the inability to parse the data.

The error shared by you seems more general

WorkerSinkTask{id=id} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Tolerance exceeded in error handler

Could you please investigate if there are other, more detailed, errors showing in the logs?

You are facing issues with the Aiven OpenSearch sink connector in Confluent Cloud when using Avro data. The connector seems to run but doesn’t process messages correctly, and the error messages provided are not clear about the problem. Check the Avro Converter configuration and ensure it’s set up correctly:

{
“value.converter”: “io.confluent.connect.avro.AvroConverter”,
“value.converter.schema.registry.basic.auth.user.info”: “API_KEY:API_SECRET”,
“value.converter.schema.registry.url”: “schema.registry.url”,
“value.converter.schemas.enable”: “true”
}
Verify schema compatibility between producer and sink connector. Inspect Connect logs for more detailed errors and check OpenSearch setup on AWS, ensuring accurate connection URL, username, and password. Make sure your Avro data in the Kafka topic is valid and handle schema evolution if there were any changes. If the issue persists, consider reaching out to Aiven or Confluent Cloud support for further assistance.