Skip to main content

Connect Apache Kafka® to Aiven for ClickHouse®

Integrate Aiven for ClickHouse® with either Aiven for Apache Kafka® service located in the same project, or an external Apache Kafka endpoint.

For a different use case

Need to deliver data from Apache Kafka® topics to a ClickHouse database for efficient querying and analysis? Use a ClickHouse sink connector.

A single Aiven for ClickHouse instance can connect to multiple Kafka clusters with different authentication mechanism and credentials. Behind the scenes, the integration between Aiven for ClickHouse and Apache Kafka services relies on ClickHouse Kafka Engine.

note

Aiven for ClickHouse service integrations are available for Startup plans and higher.

Prerequisites

  • Services to integrate using Startup plans and higher:

  • At least one topic in the Apache Kafka service

Tools

Variables

Variables used to set up and configure the integration:

VariableDescription
CLICKHOUSE_SERVICE_NAMEName of your Aiven for ClickHouse service.
KAFKA_SERVICE_NAMEName of the Apache Kafka service you use for the integration.
PROJECTName of Aiven project where your services are located.
CONNECTOR_TABLE_NAMEName of the Kafka engine virtual table that is used as a connector.
DATA_FORMATInput/output data format in which data is accepted into Aiven for ClickHouse. See Reference.
CONSUMER_GROUP_NAMEName of the consumer group. Each message is delivered once per consumer group.

Create an integration

To connect Aiven for ClickHouse and Aiven for Apache Kafka by enabling a data service integration, see Create data service integrations.

When you create the integration, a database is automatically added in your Aiven for ClickHouse. Its name is service_KAFKA_SERVICE_NAME, where KAFKA_SERVICE_NAME is the name of your Apache Kafka service. In this database, you create virtual connector tables, which is also a part of the integration creation in the Aiven Console. You can have up to 400 such tables for receiving and sending messages from multiple topics.

Update integration settings

Upon creating the integration and configuring your tables, you can edit both mandatory integration settings and optional integration settings on a table level either in the Aiven Console or the Aiven CLI.

  1. Log in to the Aiven Console, and go to your project.

  2. On the Services page, select an Aiven for ClickHouse service that includes a table to be edited.

  3. On your service's page, click Databases and tables in the sidebar.

  4. Find a database including the table to be edited, and expand the database using to display tables inside it.

  5. Find the table and click:

  6. In the displayed window, update existing settings and / or add new ones. Save your changes.

Read and store data

In Aiven for ClickHouse you can consume messages by running SELECT command. Replace KAFKA_SERVICE_NAME and CONNECTOR_TABLE_NAME with your values and run:

SELECT * FROM service_KAFKA_SERVICE_NAME.CONNECTOR_TABLE_NAME

However, the messages are only read once (per consumer group). If you want to store the messages for later, you can send them into a separate ClickHouse table with the help of a materialized view.

For example, run to creating a destination table:

CREATE TABLE destination (id UInt64, name String)
ENGINE = ReplicatedMergeTree()
ORDER BY id;

Add a materialised view to bring the data from the connector:

CREATE MATERIALIZED VIEW materialised_view TO destination AS
SELECT *
FROM service_KAFKA_SERVICE_NAME.CONNECTOR_TABLE_NAME;

Now the messages consumed from the Apache Kafka topic will be read automatically and sent into the destination table directly.

For more information on materialized views, see Create materialized views in ClickHouse®.

note

ClickHouse is strict about allowed symbols in database and table names. You can use backticks around the names when running ClickHouse requests, particularly in the cases when the name contains dashes.

Write data back to the topic

You can also bring the entries from ClickHouse table into the Apache Kafka topic. Replace KAFKA_SERVICE_NAME and CONNECTOR_TABLE_NAME with your values:

INSERT INTO service_KAFKA_SERVICE_NAME.CONNECTOR_TABLE_NAME(id, name)
VALUES (1, 'Michelangelo')
warning

Writing to more than one topic is not supported.

Reference

Mandatory integration settings

Click to see the list

  • name - name of the connector table
  • columns - array of columns with names and types
  • topics - array of topics to pull data from
  • data_format - format for input data (see supported formats)
  • group_name - consumer group name to be created on your behalf

Optional integration settings

Click to see the list

NameTypeDescriptionDefaultExampleAllowed values / Range
auto_offset_resetstringAction to take when there is no initial offset in offset store or the desired offset is out of rangeearliestlatestsmallest, earliest, beginning, largest, latest, end
date_time_input_formatstringMethod to read DateTime from text input formatsbasicbest_effortbasic, best_effort, best_effort_us
handle_error_modestringHow to handle errors for Kafka enginedefaultstreamdefault, stream
max_block_sizeintegerNumber of rows collected by polls for flushing data from Kafka01000000 - 1_000_000_000
max_rows_per_messageintegerMaximum number of rows produced in one Kafka message for row-based formats11000001 - 1_000_000_000
num_consumersintegerNumber of consumers per table per replica141 - 10
poll_max_batch_sizeintegerMaximum amount of messages to be polled in a single Kafka poll0100000 - 1_000_000_000
poll_max_timeout_msintegerTimeout in milliseconds for a single poll from Kafka. Defaults to stream_flush_interval_ms (500 ms).010000 - 30_000
skip_broken_messagesintegerSkip at least this number of broken messages from Kafka topic per block0100000 - 1_000_000_000
thread_per_consumerbooleanProvide an independent thread for each consumer. All consumers run in the same thread by default.falsetruetrue, false
producer_batch_sizeintegerMax size in bytes of a batch of messages sent to Kafka. If exceeded, the batch is sent.100000010000000 - 2_147_483_647
producer_batch_num_messagesintegerMax number of messages in a batch sent to Kafka. If exceeded, the batch is sent.10000100001 - 1_000_000
producer_compression_codecstringCompression codec to use for Kafka producernonezstdnone, gzip, lz4, snappy, zstd
producer_compression_levelintegerCompression level for Kafka producer.
The range depends on producer_compression_codec.
Codec-dependent value.
-1 if producer_compression_codec set to none
5Codec-dependent ranges:
  • gzip: 0 - 9
  • lz4: 0 - 12
  • snappy: only 0
  • none: -1 - 12
producer_linger_msintegerTime in ms to wait for additional messages before sending a batch. If exceeded, the batch is sent.550 - 900_000
producer_queue_buffering_max_messagesintegerMax number of messages to buffer before sending. Max messages in producer queue.1000001000000 - 2_147_483_647
producer_queue_buffering_max_kbytesintegerMax size of buffer in kilobytes before sending. Max size of producer queue in kB.104857610485760 - 2_147_483_647
producer_request_required_acksintegerNumber of acknowledgments required from Kafka brokers for a message to be considered successful-11-1 - 1000

Formats supporting the integration

When connecting ClickHouse® to Kafka® using Aiven integrations, data exchange requires using specific formats. Check the supported formats for input and output data in Formats for ClickHouse®-Kafka® data exchange.