I ingest a lot of data. It comes from lots of different sources, and all ultimately goes into BigQuery.
I preparse into .jsonl file(s) — 1 line per record, named by destination table.
For a rough sense of scale, here's a sample from a dataset I'm doing now. (All the data below is real, just lightly redacted / cleaned up.)
% find json -type f -size +2000c -print0 | head -z | sort | wc --files0-from=-
2 387 4737 json/baz_1.jsonl
3 579 7055 json/baz_2.jsonl
1 193 2358 json/baz_3.jsonl
25 4835 58958 json/baz_4.jsonl
37 7161 87467 json/baz_5.jsonl
3 580 7072 json/baz_6.jsonl
15 2897 35393 json/baz_7.jsonl
129 24950 304262 json/baz_8.jsonl
3 373 4221 json/foo_1.jsonl
6 746 8491 json/foo_2.jsonl
224 42701 520014 total
% wc -l *.jsonl
11576 foos.jsonl
20 bars.jsonl
337770 bazzes.jsonl
349366 total
% du -m *.jsonl
3 foos.jsonl
1 bars.jsonl
93 bazzes.jsonl
This is relatively small for me. Other datasets are in the millions of rows / terabytes of data range.
Because the data comes from external sources, often undocumented, often not matching specs or just plain messy (e.g. various signal values for null, multiple date formats in the same field, etc), I don't really know the structure beforehand.
However, I want to have a nice, clean, efficient structure in my destination table — e.g. cast to the correct type like integer/bool/date, set REQUIRED
/NULLABLE
correctly, know which columns are actually enums, convert stringified arrays into REPEATED
columns, have a good guess on what I can use effectively for partitioning / clustering, etc. etc.
It inevitably requires some manual work on samples to infer what's actually going on, but my first pass for doing this is jq
(version 1.6).
This is my current code:
~/.jq
def isempty(v):
(v == null or v == "" or v == [] or v == {});
def isnotempty(v):
(isempty(v) | not);
def remove_empty:
walk(
if type == "array" then
map(select(isnotempty(.)))
elif type == "object" then
with_entries(select(isnotempty(.value))) # Note: this will remove keys with empty values
else .
end
);
# bag of words
def bow(stream):
reduce stream as $word ({}; .[($word|tostring)] += 1);
# https://stackoverflow.com/questions/46254655/how-to-merge-json-objects-by-grouping-on-key-with-jq
def add_by(f):
reduce .[] as $x ({}; ($x|f) as $f | .[$f] += [$x])
| [.[] | add];
# takes array of {string: #, ...}
def merge_counts:
map(.|to_entries)|flatten | add_by(.key)|from_entries;
induce_schema.sh
(linebreaks added)
#!/bin/zsh
pv -cN ingestion -s `wc -l $1` -l $1 | \
jq -c --unbuffered --stream '{"name": ( .[0]), "encoded_type":( .[1] | type), \
"tonumber": (.[1] | if (type == "string") then try(tonumber|type) catch type else null end), \
"chars": (.[1] | if(type=="string") then try(split("") | sort | unique | join("")) else null end), \
"length":(.[1] | length),"data":.[1]}' | \
# sed -r 's/[0-9]+(,|])/"array"\1/g' | awk '!_[$0]++' | sort | \
pv -cN grouping -l | \
jq -sc '. | group_by(.name,.encoded_type,.tonumber)[] | {"name":first|.name, \
"encoded_type":([(first|.encoded_type),(first|.tonumber)]|unique - [null]|join("_")), \
"allchars": (map(.chars) | join("")|split("")|sort|unique|join("")), \
"count_null": (map(.data | select(.==null)) | length), \
"count_empty": (map(.data | select(.==[] or . == {} or . == "")) | length), \
"count_nonempty": (map(.data | select(. != null and . != "")) |length), \
"unique": (map(.data)|unique|length), "length": bow(.[] | .length) }' | \
pv -cN final -l | \
jq -sc '. | group_by(.name)[] | {"name":first|.name, \
"nullable":(map(.encoded_type) | contains(["null"])), \
"schemas_count":(map(. | select(.encoded_type != "null") )|length), \
"lengths":(map(.length)|merge_counts), "total_nonempty":(map(.count_nonempty)|add), \
"total_null":(map(.count_null)|add), "total_empty": (map(.count_empty) |add), \
"schemas":map(. | select(.encoded_type != "null") | del(.name) )}'
Here's a partial output for bars.jsonl
(linebreaks added for ease of reading):
{"name":["FILING CODE"],"nullable":false,"schemas_count":1,
"lengths":{"0":1930,"2":16},
"total_nonempty":16,"total_null":0,"total_empty":1930,
"schemas":[
{"encoded_type":"string","allchars":"EGPWX",
"count_null":0,"count_empty":1930,"count_nonempty":16,"unique":6,
"length":{"0":1930,"2":16}}
]}
{"name":["LAST NAME"],"nullable":true,"schemas_count":1,
"lengths":{"0":416,"5":4650,"6":5648,"7":4796,"4":1934,"8":3042,"9":1362,"10":570,"11":226,"3":284,"14":30,"12":70,"13":54,"16":20,"15":26,"17":10,"18":8,"2":4,"19":2},
"total_nonempty":22736,"total_null":2,"total_empty":416,
"schemas":[
{"encoded_type":"string","allchars":" ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"count_null":0,"count_empty":416,"count_nonempty":22736,"unique":6233,
"length":{"5":4650,"6":5648,"7":4796,"4":1934,"8":3042,"9":1362,"10":570,"11":226,"3":284,"14":30,"12":70,"0":416,"13":54,"16":20,"15":26,"17":10,"18":8,"2":4,"19":2}}
]}
{"name":["NUMBER OF COFFEES"],"nullable":false,"schemas_count":2,
"lengths":{"1":16,"0":4},
"total_nonempty":16,"total_null":0,"total_empty":4,
"schemas":[
{"encoded_type":"number_string","allchars":"1",
"count_null":0,"count_empty":0,"count_nonempty":16,"unique":1,
"length":{"1":16}},
{"encoded_type":"string","allchars":"",
"count_null":0,"count_empty":4,"count_nonempty":0,"unique":1,
"length":{"0":4}}
]}
{"name":["OFFICE CODE"],"nullable":false,"schemas_count":2,
"lengths":{"3":184,"0":22092},
"total_nonempty":1036,"total_null":0,"total_empty":22092,
"schemas":[
{"encoded_type":"number_string","allchars":"0123456789",
"count_null":0,"count_empty":0,"count_nonempty":852,"unique":254,
"length":{"3":852}},
{"encoded_type":"string","allchars":"0123456789ABCDEIJQRSX",
"count_null":0,"count_empty":22092,"count_nonempty":184,"unique":66,
"length":{"0":22092,"3":184}}
]}
{"name":["SOURCE FILE"],"nullable":true,"schemas_count":1,
"lengths":{"0":416,"7":22708},
"total_nonempty":22708,"total_null":23124,"total_empty":416,
"schemas":[
{"encoded_type":"string","allchars":"0123456789F_efil",
"count_null":0,"count_empty":416,"count_nonempty":22708,"unique":30,
"length":{"7":22708,"0":416}}
]}
...
The point of this is to get a summary of "how is this unknown dataset structured and what's in it" that I can readily transform into my BigQuery table schema / parameters, use to point at what I'll probably need to do next for turning it into something cleaner & more usable than what I got, etc.
This code works, but those -s
(slurp) lines are really hard on server RAM. (They simply wouldn't work on if the dataset were any larger than this; I added those parts just today. On the bazzes
dataset, it uses about 20GB total RAM, including swap.)
It also doesn't detect e.g. any of the date/time field types.
I believe it should be possible to make this far more efficient using @joelpurra's jq + parallel and/or the jq cookbook's reduce inputs, but I'm having difficulty figuring out how.
So, I'd appreciate advice on how to make this
- more CPU & RAM efficient
- otherwise more useful (e.g. recognize date fields, which could be in almost any format)