Oct 13, 2021
Manage Apache Kafka® Connect connectors with kcctl
kcctl lets you manage all your Apache Kafka Connect instances in one command line tool, whether on-prem, Aiven, or other clouds. Find out how to use it.
Apache Kafka is widely used as a company data backbone with Kafka Connect acting as a bridge. This way Kafka can be integrated with other technologies easily, reliably and scalably. The Kafka Connect REST APIs provide a way to manage connectors via web calls, but crafting URLs on a terminal can sometimes be tricky.
In this blog post we explore kcctl, a new open source command line tool for Kafka Connect. You'll find out how to integrate it with Apache Kafka and manage connections to other systems.
Aiven offers similar functionality with the Aiven Command Line Interface. You can use the CLI to create, drop, change, verify, pause and restore any Kafka Connect connector running on Aiven services. If all your Kafka Connect instances are Aiven services, then the Aiven command line interface is all you need.
But if you want to use the same tool for any Kafka Connect instance, be it on-premises, on Aiven, or in other cloud providers, then kcctl is your friend.
Create an Apache Kafka instance with Kafka Connect
To start following the process in this article, make sure you already have an Apache Kafka environment with Kafka Connect up and running. If you don't have one, don't worry: Aiven can provide one in minutes. Just create one in the Aiven Console, or in the Aiven Command Line Interface with the following command:
avn service create demo-kafka \ --service-type kafka \ --cloud google-europe-west3 \ --plan business-4 \ -c kafka_connect=true \ -c kafka.auto_create_topics_enable=true
This command creates an Aiven for Apache Kafka cluster named demo-kafka
with three nodes (using the business-4
plan), in the google-europe-west3
region and enables the automatic creation of topics and Kafka Connect. With Aiven, you can deploy Kafka Connect as part of your Kafka cluster for business and premium plans, or as a separate, standalone cluster. To read more about Aiven for Apache Kafka and related Kafka Connect topics, check out the dedicated page.
Let's wait until the service is ready:
avn service wait demo-kafka
kcctl
Install At the time of writing, kcctl
is an early access release. The current set of installation instructions can be found in its GitHub repository.
Once kcctl
is installed, test that it's working in the terminal by adding the bin
subdirectory to PATH
and executing:
kcctl
If we did everything correctly, then we should see the usage information. Now it's time to connect to our Kafka Connect cluster.
Connect
In order to plug in to Kafka Connect, first retrieve the cluster URL, which can be found using the via Aiven CLI and jq, to parse the JSON output:
avn service get demo-kafka --json | jq '.connection_info.kafka_connect_uri'
Now create a kcctl
configuration context by executing the following command, replacing the cluster
parameter accordingly.
kcctl config set-context \ --cluster https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443 \ my_kafka_cluster
The above creates a context named my_kafka_cluster
pointing to the demo-kafka
instance. To verify the configuration:
kcctl info
This retrieves its definition of the current kcctl
configuration context:
URL: https://avnadmin:PASSWORD@demo-kafka-<PROJECT_NAME>.aivencloud.com:443 Version: 2.7.2-SNAPSHOT Commit: d15ddddd3ef3f5ef Kafka Cluster ID: -DvILyiXQxSpnFSK9M1qgQ
Create a data source in PostgreSQL
To see the connectors in action, create a PostgreSQL database and configure a Kafka Connect JDBC source connector to bring the data into Kafka. The connector takes data from a table named pasta
stored in a PostgreSQL database and includes it in a Kafka topic.
If you don't have a PostgreSQL database handy, you can create one at Aiven with the following Aiven CLI command:
avn service create demo-pg \ --service-type pg \ --cloud google-europe-west3 \ --plan hobbyist
Once the demo-pg
PostgreSQL instance is up and running (use avn service wait demo-pg
to wait for it), connect to it:
avn service cli demo-pg
Now create a sample pasta
table and fill it with data, using the following statements in our terminal:
create table pasta (id serial, name varchar, cooking_minutes int); insert into pasta (name, cooking_minutes) values ('spaghetti', 8); insert into pasta (name, cooking_minutes) values ('spaghettini', 6); insert into pasta (name, cooking_minutes) values ('fusilli', 9); insert into pasta (name, cooking_minutes) values ('trofie', 5);
Create a new Kafka Connect connector
Once the source data is available, create the Kafka Connect JDBC source connector, sourcing in incremental
mode the pasta
table based on the id
column. To get the required PostgreSQL connection details such as hostname, port, user and password, use this command:
avn service get demo-pg --format '{service_uri_params}'
Create a file named my_jdbc_connect_source.json
with the following JSON content (substituting the <HOST>
, <PORT>
and <PASSWORD>
with the actual information retrieved in the previous step):
{ "connector.class": "io.aiven.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://<HOST>:<PORT>/defaultdb?sslmode=require", "connection.user": "avnadmin", "connection.password": "<PASSWORD>", "table.whitelist": "pasta", "mode": "incrementing", "incrementing.column.name":"id", "poll.interval.ms": "2000", "topic.prefix": "pg_source_" }
Now invoke the connector creation via kcctl
in a new terminal window:
kcctl apply -f my_jdbc_connect_source.json --name pg-incremental-source
Verify that the connector was successfully created:
kcctl describe connector pg-incremental-source
The command output shows the pg-incremental-source
connector in RUNNING
state and all the details associated with it.
Check the data in Apache Kafka with kcctl
You can also check for a new Kafka topic called pg_source_pasta
with the same data stored in PostgreSQL, via kcat. Start by first downloading the required certificates:
avn service user-creds-download demo-kafka \ --username avnadmin \ -d certs
Then create a kcat.config
file containing the following entries:
bootstrap.servers=<HOST>:<PORT> security.protocol=ssl ssl.key.location=certs/service.key ssl.certificate.location=certs/service.cert ssl.ca.location=certs/ca.pem
And reading from the pg_source_pasta
topic with the following kcat
invocation:
kcat -F kcat.config -C -t pg_source_pasta
If we now insert some rows in the PostgreSQL pasta
table, the same changes appear in Kafka via kcat
:
Managing Kafka Connect connectors with kcctl
Creating connectors is only part of the game with kcctl
- You can also manage them! Need a list of all the connectors deployed? Just run the following command:
kcctl get connectors
Need to pause and resume connectors? The code below, for example, pauses the one named pg-incremental-source
:
kcctl pause connector pg-incremental-source
What type of connectors can we create? Glad you asked! The full plugin list is available with:
kcctl get plugins
The command shows all the connector plugins available with the related type (source
or sink
) and version. With this command you'll be able to check the list of the managed Kafka Connect connector types you can create with Aiven for Apache Kafka.
TYPE CLASS VERSION source com.couchbase.connect.kafka.CouchbaseSourceConnector 4.0.6 source com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceCon 2.1.3 nector source com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector 2.1.3 source com.google.pubsub.kafka.source.CloudPubSubSourceConnector 2.7.2-SNAPSHOT source com.google.pubsublite.kafka.source.PubSubLiteSourceConnector 2.7.2-SNAPSHOT ... sink io.aiven.kafka.connect.gcs.GcsSinkConnector 0.9.0 sink io.aiven.kafka.connect.http.HttpSinkConnector 0.4.0 sink io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector 2.12.0 ...
Wrapping up
The ability to manage Kafka Connect connectors from the terminal is just a few commands away. kcctl
makes it easy to inspect, deploy, update, pause and restore any connector to our environments. This unifies the end-user experience for Apache Kafka instances deployed on-premises, self-hosted or in Aiven.
Further reading
- kcctl GitHub repository
- Aiven for Apache Kafka
- Kafka Connect REST APIs
- Also check out our other blog articles on Apache Kafka!
Not using Aiven services yet? Sign up now for your free trial at https://console.aiven.io/signup!
In the meantime, make sure you follow our changelog and blog RSS feeds or our LinkedIn and Twitter accounts to stay up-to-date with product and feature-related news.
Subscribe to the Aiven newsletter
All things open source, plus our product updates and news in a monthly newsletter.
Related resources
Dec 7, 2021
Aiven's OSPO is dealing with migration of Apache Kafka to Scala 3. Find out how they managed!
Nov 9, 2022
Find out about the challenges some of the amazing innovators in our data community are facing, and how event streaming has helped them meet those challenges (with a little bit of help from their friends).
Dec 7, 2023
Tiered Storage for Apache Kafka® delivers an improved cost-to-performance ratio, increased operational flexibility, and greater scalability for customers