0

I ingest a lot of data. It comes from lots of different sources, and all ultimately goes into BigQuery.

I preparse into .jsonl file(s) — 1 line per record, named by destination table.

For a rough sense of scale, here's a sample from a dataset I'm doing now. (All the data below is real, just lightly redacted / cleaned up.)

% find json -type f -size +2000c -print0 | head -z | sort |  wc --files0-from=-
  2   387   4737 json/baz_1.jsonl
  3   579   7055 json/baz_2.jsonl
  1   193   2358 json/baz_3.jsonl
 25  4835  58958 json/baz_4.jsonl
 37  7161  87467 json/baz_5.jsonl
  3   580   7072 json/baz_6.jsonl
 15  2897  35393 json/baz_7.jsonl
129 24950 304262 json/baz_8.jsonl
  3   373   4221 json/foo_1.jsonl
  6   746   8491 json/foo_2.jsonl
224 42701 520014 total

% wc -l *.jsonl
    11576 foos.jsonl
       20 bars.jsonl
   337770 bazzes.jsonl
   349366 total

% du -m *.jsonl
 3      foos.jsonl
 1      bars.jsonl
93      bazzes.jsonl

This is relatively small for me. Other datasets are in the millions of rows / terabytes of data range.

Because the data comes from external sources, often undocumented, often not matching specs or just plain messy (e.g. various signal values for null, multiple date formats in the same field, etc), I don't really know the structure beforehand.

However, I want to have a nice, clean, efficient structure in my destination table — e.g. cast to the correct type like integer/bool/date, set REQUIRED/NULLABLE correctly, know which columns are actually enums, convert stringified arrays into REPEATED columns, have a good guess on what I can use effectively for partitioning / clustering, etc. etc.

It inevitably requires some manual work on samples to infer what's actually going on, but my first pass for doing this is jq (version 1.6).

This is my current code:

~/.jq

def isempty(v):
 (v == null or v == "" or v == [] or v == {});

def isnotempty(v):
 (isempty(v) | not);

def remove_empty:
  walk(
   if type == "array" then
      map(select(isnotempty(.)))
   elif type == "object" then
      with_entries(select(isnotempty(.value))) # Note: this will remove keys with empty values
   else .
   end
  );

# bag of words
def bow(stream):
  reduce stream as $word ({}; .[($word|tostring)] += 1);

# https://stackoverflow.com/questions/46254655/how-to-merge-json-objects-by-grouping-on-key-with-jq
def add_by(f):
  reduce .[] as $x ({}; ($x|f) as $f | .[$f] += [$x])
  | [.[] | add];

# takes array of {string: #, ...}
def merge_counts:
  map(.|to_entries)|flatten | add_by(.key)|from_entries;

induce_schema.sh (linebreaks added)

#!/bin/zsh

pv -cN ingestion -s `wc -l $1` -l $1 | \
jq  -c --unbuffered --stream '{"name": ( .[0]), "encoded_type":( .[1] | type),  \
   "tonumber": (.[1] |  if (type == "string") then try(tonumber|type) catch type else null end), \
   "chars": (.[1] | if(type=="string") then try(split("") | sort | unique | join("")) else null end), \
   "length":(.[1] | length),"data":.[1]}' |  \
# sed -r 's/[0-9]+(,|])/"array"\1/g' | awk '!_[$0]++' | sort | \
pv -cN grouping -l | \
jq -sc '. | group_by(.name,.encoded_type,.tonumber)[] | {"name":first|.name, \ 
   "encoded_type":([(first|.encoded_type),(first|.tonumber)]|unique - [null]|join("_")), \
   "allchars": (map(.chars) | join("")|split("")|sort|unique|join("")), \
   "count_null": (map(.data | select(.==null)) | length), \
   "count_empty": (map(.data | select(.==[] or . == {} or . == "")) | length), \
   "count_nonempty": (map(.data | select(. != null and . != "")) |length), \
   "unique": (map(.data)|unique|length), "length": bow(.[] | .length)  }' | \
pv -cN final -l | \
jq -sc '. | group_by(.name)[] | {"name":first|.name, \
   "nullable":(map(.encoded_type) | contains(["null"])), \
   "schemas_count":(map(. | select(.encoded_type != "null") )|length), \
   "lengths":(map(.length)|merge_counts), "total_nonempty":(map(.count_nonempty)|add), \
   "total_null":(map(.count_null)|add), "total_empty": (map(.count_empty) |add), \
   "schemas":map(. | select(.encoded_type != "null") | del(.name) )}'

Here's a partial output for bars.jsonl (linebreaks added for ease of reading):

{"name":["FILING CODE"],"nullable":false,"schemas_count":1,
 "lengths":{"0":1930,"2":16},
 "total_nonempty":16,"total_null":0,"total_empty":1930,
 "schemas":[
  {"encoded_type":"string","allchars":"EGPWX",
   "count_null":0,"count_empty":1930,"count_nonempty":16,"unique":6,
   "length":{"0":1930,"2":16}}
 ]}
{"name":["LAST NAME"],"nullable":true,"schemas_count":1,
 "lengths":{"0":416,"5":4650,"6":5648,"7":4796,"4":1934,"8":3042,"9":1362,"10":570,"11":226,"3":284,"14":30,"12":70,"13":54,"16":20,"15":26,"17":10,"18":8,"2":4,"19":2},
 "total_nonempty":22736,"total_null":2,"total_empty":416,
 "schemas":[
  {"encoded_type":"string","allchars":" ABCDEFGHIJKLMNOPQRSTUVWXYZ",
   "count_null":0,"count_empty":416,"count_nonempty":22736,"unique":6233,
   "length":{"5":4650,"6":5648,"7":4796,"4":1934,"8":3042,"9":1362,"10":570,"11":226,"3":284,"14":30,"12":70,"0":416,"13":54,"16":20,"15":26,"17":10,"18":8,"2":4,"19":2}}
 ]}
{"name":["NUMBER OF COFFEES"],"nullable":false,"schemas_count":2,
 "lengths":{"1":16,"0":4},
 "total_nonempty":16,"total_null":0,"total_empty":4,
 "schemas":[
  {"encoded_type":"number_string","allchars":"1",
   "count_null":0,"count_empty":0,"count_nonempty":16,"unique":1,
   "length":{"1":16}},
  {"encoded_type":"string","allchars":"",
   "count_null":0,"count_empty":4,"count_nonempty":0,"unique":1,
   "length":{"0":4}}
 ]}
{"name":["OFFICE CODE"],"nullable":false,"schemas_count":2,
 "lengths":{"3":184,"0":22092},
 "total_nonempty":1036,"total_null":0,"total_empty":22092,
 "schemas":[
  {"encoded_type":"number_string","allchars":"0123456789",
   "count_null":0,"count_empty":0,"count_nonempty":852,"unique":254,
   "length":{"3":852}}, 
  {"encoded_type":"string","allchars":"0123456789ABCDEIJQRSX",
   "count_null":0,"count_empty":22092,"count_nonempty":184,"unique":66,
   "length":{"0":22092,"3":184}}
 ]}
{"name":["SOURCE FILE"],"nullable":true,"schemas_count":1,
 "lengths":{"0":416,"7":22708},
 "total_nonempty":22708,"total_null":23124,"total_empty":416,
 "schemas":[
  {"encoded_type":"string","allchars":"0123456789F_efil",
   "count_null":0,"count_empty":416,"count_nonempty":22708,"unique":30,
   "length":{"7":22708,"0":416}}
 ]}
...

The point of this is to get a summary of "how is this unknown dataset structured and what's in it" that I can readily transform into my BigQuery table schema / parameters, use to point at what I'll probably need to do next for turning it into something cleaner & more usable than what I got, etc.

This code works, but those -s (slurp) lines are really hard on server RAM. (They simply wouldn't work on if the dataset were any larger than this; I added those parts just today. On the bazzes dataset, it uses about 20GB total RAM, including swap.)

It also doesn't detect e.g. any of the date/time field types.

I believe it should be possible to make this far more efficient using @joelpurra's jq + parallel and/or the jq cookbook's reduce inputs, but I'm having difficulty figuring out how.

So, I'd appreciate advice on how to make this

  1. more CPU & RAM efficient
  2. otherwise more useful (e.g. recognize date fields, which could be in almost any format)
Mikhail Berlyant
  • 165,386
  • 8
  • 154
  • 230
Sai
  • 6,919
  • 6
  • 42
  • 54

1 Answers1

1

Using inputs is the way to go, whether or not any parallelization techniques are brought to bear.

In the jq module for inducing structural schema that I wrote some time ago (https://gist.github.com/pkoppstein/a5abb4ebef3b0f72a6ed), there's a filter, schema/1 defined as:

def schema(stream):
  reduce stream as $x ("null";  typeUnion(.; $x|typeof));

This can therefore be used as suggested by this snippet:

jq -n 'include "schema"; schema(inputs)' FILESPECIFICATIONS

(This assumes that the file "schema.jq" defining the schema module has been appropriately installed.)

The point here is not so much that schema.jq might be adapted to your particular expectations, but that the above "def" can serve as a guide (whether or not using jq) for how to write an efficient schema-inference engine, in the sense of being able to handle a very large number of instances. That is, you basically have only to write a definition of typeof (which should yield the desired "type" in the most general sense), and of typeUnion (which defines how two types are to be combined).

Of course, inferring schemas can be a tricky business. In particular, schema(stream) will never fail, assuming the inputs are valid JSON. That is, whether or not the inferred schema will be useful depends largely on how it is used. I find that an integrated approach based on these elements to be essential:

  1. a schema specification language;

  2. a schema inference engine that generates schemas that conform to (1);

  3. a schema-checker.

Further thoughts

schema.jq is simple enough to be tailored to more specific requirements, e.g. to infer dates.

You might be interested in JESS ("JSON Extended Structural Schemas"), which combines a JSON-based specification language with jq-oriented tools: https://github.com/pkoppstein/JESS

peak
  • 105,803
  • 17
  • 152
  • 177
  • AFAICT, your code believes the types of the data it ingests. My problem is that the data I ingest is usually not typed — it's strings. I have to figure out e.g. whether I can convert all .foobar to numeric (or date or whatever) without loss; otherwise I waste too much space in the destination table, and won't be able to do type-specific operations (numeric comparison, date sharding, etc). – Sai Oct 16 '20 at 03:01
  • @sai - it looks like you’ve misunderstood both my posting and schema.jq. It might be simplest if you amended your Q with some examples of the variety of input you’re dealing with. – peak Oct 16 '20 at 03:59
  • @sai - I've added a paragraph which will hopefully make at least some things clearer. – peak Oct 16 '20 at 06:07