1

While trying to push Zeek logs to Kafka (3.1.0) topics, which works fine and works as intended. Then I was trying writing them from Kafka to Cassandra (4.0.1) via the DataStax Apache Kafka® Connector (kafka-connect-cassandra-sink-1.4.0) where I am getting a weird Mapping error (see below).

My connect-standalone.properties:

bootstrap.servers=localhost:9092
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/kafka/plugins/kafka-connect-cassandra-sink-1.4.0.jar

My Cassandra connector properties cassandra-sink.properties:

name=cassandra-sink
connector.class=com.datastax.oss.kafka.sink.CassandraSinkConnector
tasks.max=1
topics=dns, http
#transforming the zeek-field-names so no error occours
"transforms": "RenameField",
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceFiel>
"transforms.RenameField.renames": "id.orig_h:id_orig_h,id.orig_p:id_orig_p,id.resp_h:id_resp_h,id.resp_p:id_resp_p,AA:aa,Z:z,RA:ra,TC:tc,TS:ts,RD:rd"
#Mapping
topic.dns.zeek.dns.mapping=  ts=value,uid=key,id_orig_h=value,id_orig_p=value,id_resp_h=value,id_resp_p=value,proto=value,trans_id=value,rtt=value,query=value,qclass=value,qclass_name=value,qtype=value,qtype_name=value,rcode=value,rcode_name=value,aa=value,tc=value,rd=value,ra=value,z=value,answers=value,rejected=value
topic.http.zeek.http.mapping= ts=value,uid=key,id_orig_h=value,id_orig_p=value,id_resp_h=value,id_resp_p=value,trans_depth=value,method=value,host=value,uri=value,version=value,user_agent=value,request_body_len=value,response_body_len=value,status_code=value,status_msg=value,tags=value,resp_fuids=value
topic.dns.zeek.dns.ttlTimeUnit=SECONDS
topic.http.zeek.http.ttlTimeUnit=SECONDS
topic.dns.zeek.dns.timestampTimeUnit=MICROSECONDS
topic.http.zeek.http.timestampTimeUnit=MICROSECONDS

Cassandra DB:

cqlsh> DESCRIBE KEYSPACE zeek

CREATE KEYSPACE zeek WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}  AND durable_writes = true;

CREATE TABLE zeek.dns (
    uid text PRIMARY KEY,
    aa text,
    answers text,
    id_orig_h text,
    id_orig_p double,
    id_resp_h text,
    id_resp_p double,
    proto text,
    qclass int,
    qclass_name text,
    qtype int,
    qtype_name text,
    query text,
    ra text,
    rcode double,
    rcode_name text,
    rd text,
    rejected text,
    rtt int,
    tc text,
    trans_id double,
    ts text,
    ttls text,
    z double
) WITH additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';

CREATE TABLE zeek.http (
    uid text PRIMARY KEY,
    host text,
    id_orig_h text,
    id_orig_p double,
    id_resp_h text,
    id_resp_p double,
    method text,
    orig_fuids text,
    request_body_len int,
    resp_fuids text,
    resp_mime_types text,
    response_body_len int,
    status_code int,
    status_msg text,
    tags text,
    trans_depth int,
    ts text,
    uri text,
    user_agent text,
    version text
) WITH additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';

An example Zeek-log I am trying to write to Cassandra looks like this:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
> --from-beginning --property print.key=true --max-messages 1 \
> --topic dns
null    {"dns": {"ts":1644391084.805351,"uid":"CUXnim2Q50AeUZoZXc","id.orig_h":"192.168.2.35","id.orig_p":55882,"id.resp_h":"192.168.2.1","id.resp_p":53,"proto":"udp","trans_id":8173,"rtt":0.01738905906677246,"query":"36.247.213.34.in-addr.arpa","qclass":1,"qclass_name":"C_INTERNET","qtype":12,"qtype_name":"PTR","rcode":0,"rcode_name":"NOERROR","AA":false,"TC":false,"RD":true,"RA":true,"Z":0,"answers":["ec2-34-213-247-36.us-west-2.compute.amazonaws.com"],"TTLs":[300.0],"rejected":false}}
Processed a total of 1 messages

With Zeek, Zookeeper, Kafka and Cassandra started I start Kafka-Connect with

bin/connect-standalone.sh config/connect-standalone.properties config/cassandra-sink.properties

This runs without interruption but throws countless Warnings like this:

[2022-02-09 13:43:50,615] WARN [cassandra-sink|task-0] Error decoding/mapping Kafka record SinkRecord{kafkaOffset=301, timestampType=CreateTime} ConnectRecord{topic='dns', kafkaPartition=0, key=null, keySchema=Schema{STRING}, value={dns={AA=false, qclass_name=C_INTERNET, id.orig_p=22793, qtype_name=AAAA, qtype=28, rejected=false, id.resp_p=53, query=connectivity-check.ubuntu.com.sphairon.box, trans_id=17766, rcode=3, rcode_name=NXDOMAIN, TC=false, RA=false, uid=Ckb97wAbf0MDgGVW7, RD=true, proto=udp, id.orig_h=192.168.2.35, Z=0, qclass=1, ts=1.644393059147673E9, id.resp_h=192.168.2.1}}, valueSchema=null, timestamp=1644393059667, headers=ConnectHeaders(headers=)}: Primary key column uid cannot be mapped to null. Check that your mapping setting matches your dataset contents. (com.datastax.oss.kafka.sink.CassandraSinkTask:305)

I dont know why it says Primary key column uid cannot be mapped to null. I tried changing the mapping to ts=dns.value, uid=dns.value etc. but it didnt help.

  • Well, in your mapping, `uid=key`, but in the logs, you clearly see `key=null`. Perhaps you should try mapping `uid=value.dns.uid` instead (not `dns.value`; `value` is the Kafka record, then dot-notation is for accessing fields of the value) – OneCricketeer Feb 09 '22 at 18:38
  • That's right, I must have done something wrong. Unfortunately, it still didn't work after the change. Even with JSON flattening in the cassandra-sink.properties this error occurs: `Required field 'value.dns.ts' (mapped to column ts) was missing from record (or may refer to an invalid function). Please remove it from the mapping. (com.datastax.oss.kafka.sink.CassandraSinkTask:305)` – kafkaconnecth8 Feb 10 '22 at 15:19
  • At least it's not complaining about the uid anymore. Are you sure all the records have a ts field? – OneCricketeer Feb 10 '22 at 15:22
  • I think it complains about the ts field because it comes before the uid field in the mapping. If I put the uid field first it says the same but with `value.dns.uid (mapped to column uid) was missing ..:` Yes all records have a ts (timestamp) field – kafkaconnecth8 Feb 10 '22 at 15:27
  • I'm not really sure.. I've not used the Cassandra connector (which hasn't had any new commit or release in several months, it seems). My best guess is that it won't accept nested data like that, or you have `value.http` records in your topic as well that 6 need filtered out... so you'd want to try flattening it using a custom Connect transform, Kafka Streams or ksqlDB, first. Alternatively, store your data in a Cassandra Map type where `dns` or `http` would be the key. – OneCricketeer Feb 12 '22 at 15:27
  • Http is in a seperate kafka topic, so that should't be the problem. I am using `"transforms": "flatten", "transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value", "transforms.flatten.delimiter": "."` to flatten the JSON as described in [link] (https://docs.confluent.io/platform/current/connect/transforms/flatten.html) but it still gives the same WARN message. I will try and look into KSQL-Flattening but i don't know if it will work when the confluent-flattening doesn't seem to work – kafkaconnecth8 Feb 14 '22 at 13:48
  • I think you'll want to use a delimiter other than period since the Cassandra mapping config uses periods as well to do something with the data. Ksql wouldn't be used to flatten, but rather parse and rename all the fields into your expected format for the tables (you can also run kafka connect as part of ksql server) – OneCricketeer Feb 14 '22 at 15:55
  • 1
    I got the flattening to work by removing the quotationmarks from the transform and changing : to = :) The only problem i have now is that zeek does not always fill all fields (because it currently has no information about the user agent for example). Therefore some fields are missing in the Kafka messages and I get the error `...was missing from record` How do I configure Zeek/Kafka to write a standard value for this? Many thanks for your help so far – kafkaconnecth8 Feb 15 '22 at 14:57
  • Cool. That was the Flatten transform within Connect, then? For missing fields in JSON, there isn't a way to add defaults like there would be with Protobuf/Avro. I don't think Cassandra supports default column values, either – OneCricketeer Feb 15 '22 at 15:04
  • Yes, with Kafka Connect Transform. It seems it works even with some values missing, so my question is answered – kafkaconnecth8 Feb 17 '22 at 08:56

0 Answers0