Sink connection to Cassandra with list of objects

How can I effectively connected to Cassandra with sink connection with the following JSON structure from kafka topic to a Cassandra column that involves a list of objects?

JSON Structure:

{
    "my_list":
     [
         {"id":"1", "name":"George"},
         {"id":"2", "name":"Katelyn"}
     ]
}

Cassandra Column Definition:

CREATE TYPE my_key_space.my_type (
  id int,
  name text
);

my_list list<frozen<my_type>>;

4 Likes

Thanks a lot for your question @Pauli_Immonen. We’ll get back soon with a response.

1 Like

Hiya!
This is a bit complex because I don’t know the whole table layout, or the topic structure so I am guessing a bit here:

The main thing is that the stream reactor sink works by making a CQL statement from the KQL you give it. This means that you just insert to your list as you would in cql, ajusted for kql :slight_smile: I’ve not tried to insert into a collection before via a connector (without just replacing it) so you’ll have to see if it works for you. I’d generally suggest having a map type rather than a frozen list here but it should not be an issue.

You’d select the list and insert it as the value into the table at the right colomn.

Assuming your table is called user_tbl with a structure like this:

`UUID row_id, my_list list<frozen<my_type>>`

and the kafka records are in a topic called users, all of the same schema, the the KQL would be something like this:

 "connect.cassandra.kcql": "INSERT INTO user_tbl SELECT uuid(), my_list FROM users"

more complete docs are availible on the Aiven docs or the connector docs

2 Likes

I seem to figure this out. My list in the JSON needs to be in string format in order to the connect.cassandra.kcql to get the value my_list in the kafka topic

 "my_list":
     "[{\"id\":\"1\", \"name\":\"George\"},
       {\"id\":\"2\", \"name\":\"Katelyn\"}
     ]"
3 Likes

Could you paste in your connector’s serialisation config? And the format of the records in your topic?
It could be you’ve got the wrong one selected

1 Like

I was happy for while that got it working but after that after thinking that it does not sound right.

Here a simplified version of the problem with all relevant information:

kafka Connector configuration:

    "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
    "connect.cassandra.kcql": "INSERT INTO my_table SELECT id, my_list FROM test-topic",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": "false",
    "value.converter.schemas.enable": "false"

Kafka message that works. Copied form AKHQ

{
  "id": 2,
  "my_list": "[{\"id\":2, \"name\": \"text\"},{\"id\":5,\"name\":\"text2\"}]"
}

How the message is sent.

producer = KafkaProducer(
  bootstrap_servers="server:port",
  security_protocol="SSL",
  ssl_cafile="ca.pem",
  ssl_certfile="service.cert",
  ssl_keyfile="service.key",
  value_serializer=lambda v: json.dumps(v).encode('ascii'),
  key_serializer=lambda v: json.dumps(v).encode('ascii')
)
#simple message load from string
producer.send("test-topic",key={"key":1}, value=json.loads(simple_message))

Cassandra type and table

CREATE TYPE IF NOT EXISTS my_keyspace.my_type (
  id int,
  name text
  );
CREATE TABLE IF NOT EXISTS my_keyspace.my_table (
  id int,
  my_list list<frozen<my_type>>,
  primary key (id));

Kafka message that should work but does not

{
  "id": 8,
  "my_list": [
    {
      "id": 3,
      "name": "text"
    },
    {
      "id": 5,
      "name": "text2"
    }
  ]
}

Error caused by the message
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalArgumentException: A KCQL exception occurred. Invalid path my_list

1 Like

my_list map<int,frozen<my_type>> Is the goto structure instead of list and works fine with the following nested JSON.

{
"my_list":{
   "1":{
    "my_type_field1": "something",
    "my_type_field2": "3",
    ...
    },
    {
   "2":{
    "my_type_field1": "something else",
    "my_type_field2": "6",
    ...
    }
}
2 Likes