I was happy for while that got it working but after that after thinking that it does not sound right.
Here a simplified version of the problem with all relevant information:
kafka Connector configuration:
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
"connect.cassandra.kcql": "INSERT INTO my_table SELECT id, my_list FROM test-topic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
Kafka message that works. Copied form AKHQ
{
"id": 2,
"my_list": "[{\"id\":2, \"name\": \"text\"},{\"id\":5,\"name\":\"text2\"}]"
}
How the message is sent.
producer = KafkaProducer(
bootstrap_servers="server:port",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="service.cert",
ssl_keyfile="service.key",
value_serializer=lambda v: json.dumps(v).encode('ascii'),
key_serializer=lambda v: json.dumps(v).encode('ascii')
)
#simple message load from string
producer.send("test-topic",key={"key":1}, value=json.loads(simple_message))
Cassandra type and table
CREATE TYPE IF NOT EXISTS my_keyspace.my_type (
id int,
name text
);
CREATE TABLE IF NOT EXISTS my_keyspace.my_table (
id int,
my_list list<frozen<my_type>>,
primary key (id));
Kafka message that should work but does not
{
"id": 8,
"my_list": [
{
"id": 3,
"name": "text"
},
{
"id": 5,
"name": "text2"
}
]
}
Error caused by the message
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.IllegalArgumentException: A KCQL exception occurred. Invalid path my_list