While trying to push Zeek logs to Kafka (3.1.0) topics, which works fine and works as intended. Then I was trying writing them from Kafka to Cassandra (4.0.1) via the DataStax Apache Kafka® Connector (kafka-connect-cassandra-sink-1.4.0) where I am getting a weird Mapping error (see below).
My connect-standalone.properties:
bootstrap.servers=localhost:9092
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/kafka/plugins/kafka-connect-cassandra-sink-1.4.0.jar
My Cassandra connector properties cassandra-sink.properties:
name=cassandra-sink
connector.class=com.datastax.oss.kafka.sink.CassandraSinkConnector
tasks.max=1
topics=dns, http
#transforming the zeek-field-names so no error occours
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceFiel>
"transforms.RenameField.renames": "id.orig_h:id_orig_h,id.orig_p:id_orig_p,id.resp_h:id_resp_h,id.resp_p:id_resp_p,AA:aa,Z:z,RA:ra,TC:tc,TS:ts,RD:rd"
#Mapping
topic.dns.zeek.dns.mapping= ts=value,uid=key,id_orig_h=value,id_orig_p=value,id_resp_h=value,id_resp_p=value,proto=value,trans_id=value,rtt=value,query=value,qclass=value,qclass_name=value,qtype=value,qtype_name=value,rcode=value,rcode_name=value,aa=value,tc=value,rd=value,ra=value,z=value,answers=value,rejected=value
topic.http.zeek.http.mapping= ts=value,uid=key,id_orig_h=value,id_orig_p=value,id_resp_h=value,id_resp_p=value,trans_depth=value,method=value,host=value,uri=value,version=value,user_agent=value,request_body_len=value,response_body_len=value,status_code=value,status_msg=value,tags=value,resp_fuids=value
topic.dns.zeek.dns.ttlTimeUnit=SECONDS
topic.http.zeek.http.ttlTimeUnit=SECONDS
topic.dns.zeek.dns.timestampTimeUnit=MICROSECONDS
topic.http.zeek.http.timestampTimeUnit=MICROSECONDS
Cassandra DB:
cqlsh> DESCRIBE KEYSPACE zeek
CREATE KEYSPACE zeek WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'} AND durable_writes = true;
CREATE TABLE zeek.dns (
uid text PRIMARY KEY,
aa text,
answers text,
id_orig_h text,
id_orig_p double,
id_resp_h text,
id_resp_p double,
proto text,
qclass int,
qclass_name text,
qtype int,
qtype_name text,
query text,
ra text,
rcode double,
rcode_name text,
rd text,
rejected text,
rtt int,
tc text,
trans_id double,
ts text,
ttls text,
z double
) WITH additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND extensions = {}
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair = 'BLOCKING'
AND speculative_retry = '99p';
CREATE TABLE zeek.http (
uid text PRIMARY KEY,
host text,
id_orig_h text,
id_orig_p double,
id_resp_h text,
id_resp_p double,
method text,
orig_fuids text,
request_body_len int,
resp_fuids text,
resp_mime_types text,
response_body_len int,
status_code int,
status_msg text,
tags text,
trans_depth int,
ts text,
uri text,
user_agent text,
version text
) WITH additional_write_policy = '99p'
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND cdc = false
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND default_time_to_live = 0
AND extensions = {}
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair = 'BLOCKING'
AND speculative_retry = '99p';
An example Zeek-log I am trying to write to Cassandra looks like this:
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --from-beginning --property print.key=true --max-messages 1 \
> --topic dns
null {"dns": {"ts":1644391084.805351,"uid":"CUXnim2Q50AeUZoZXc","id.orig_h":"192.168.2.35","id.orig_p":55882,"id.resp_h":"192.168.2.1","id.resp_p":53,"proto":"udp","trans_id":8173,"rtt":0.01738905906677246,"query":"36.247.213.34.in-addr.arpa","qclass":1,"qclass_name":"C_INTERNET","qtype":12,"qtype_name":"PTR","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["ec2-34-213-247-36.us-west-2.compute.amazonaws.com"],"TTLs":[300.0],"rejected":false}}
Processed a total of 1 messages
With Zeek, Zookeeper, Kafka and Cassandra started I start Kafka-Connect with
bin/connect-standalone.sh config/connect-standalone.properties config/cassandra-sink.properties
This runs without interruption but throws countless Warnings like this:
[2022-02-09 13:43:50,615] WARN [cassandra-sink|task-0] Error decoding/mapping Kafka record SinkRecord{kafkaOffset=301, timestampType=CreateTime} ConnectRecord{topic='dns', kafkaPartition=0, key=null, keySchema=Schema{STRING}, value={dns={AA=false, qclass_name=C_INTERNET, id.orig_p=22793, qtype_name=AAAA, qtype=28, rejected=false, id.resp_p=53, query=connectivity-check.ubuntu.com.sphairon.box, trans_id=17766, rcode=3, rcode_name=NXDOMAIN, TC=false, RA=false, uid=Ckb97wAbf0MDgGVW7, RD=true, proto=udp, id.orig_h=192.168.2.35, Z=0, qclass=1, ts=1.644393059147673E9, id.resp_h=192.168.2.1}}, valueSchema=null, timestamp=1644393059667, headers=ConnectHeaders(headers=)}: Primary key column uid cannot be mapped to null. Check that your mapping setting matches your dataset contents. (com.datastax.oss.kafka.sink.CassandraSinkTask:305)
I dont know why it says Primary key column uid cannot be mapped to null
.
I tried changing the mapping to ts=dns.value, uid=dns.value etc.
but it didnt help.