3

I have a JSON file with the following exemplified format,

{
    "Table1": {
        "Records": [
            {
                "Key1Tab1": "SomeVal",
                "Key2Tab1": "AnotherVal"
            },
            {
                "Key1Tab1": "SomeVal2",
                "Key2Tab1": "AnotherVal2"
            }
        ]
    },
    "Table2": {
        "Records": [
            {
                "Key1Tab1": "SomeVal",
                "Key2Tab1": "AnotherVal"
            },
            {
                "Key1Tab1": "SomeVal2",
                "Key2Tab1": "AnotherVal2"
            }
        ]
    }
}

The root keys are table names from an SQL database and its corresponding value is the rows. I want to split the JSON file into seperate parquet files each representing a table. Ie. Table1.parquet and Table2.parquet.

The big issue is the size of the file preventing me from loading it into memory. Hence, I tried to use dask.bag to accommodate for the nested structure of the file.

import dask.bag as db
from dask.distributed import Client
client = Client(n_workers=4)

lines = db.read_text("filename.json")

But assessing the output with lines.take(4) shows that dask can't read the new lines correct.

('{\n', '    "Table1": {\n', '        "Records": [\n', '            {\n')

I've tried to search for solutions to the specific problem but without luck.

Is there any chance that the splitting can be solved with dask or is there other tools that could do the job?

Nicolai Iversen
  • 349
  • 1
  • 4
  • 17

3 Answers3

1

As suggested here try the dask.dataframe.read_json() method

This may be sufficient, though I am unsure how it will behave if you don't have enough memory to store the entire resulting dataframe in-memory..

import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df = dd.read_json("filename.json")
df.to_parquet("filename.parquet", engine='pyarrow')

docs


If Dask doesn't process the file in chunks when on a single system (it may not happily do so as JSON is distinctly unfriendly to parse in such a way .. though I unfortunately don't have access to my test system to verify this) and the system memory is unable to handle the giant file, you may be able to extend the system memory with disk space by creating a big swapfile.

Note that this will create a ~300G file (increase count field for more) and be may be incredibly slow compared to memory (but perhaps still fast enough for your needs, especially if it's a 1-off).

# create and configure swapfile
dd if=/dev/zero of=swapfile.img bs=10M count=30000 status=progress
chmod 600 swapfile.img
mkswap swapfile.img
swapon swapfile.img
#
# run memory-greedy task
# ...
# ensure processes have exited
#
# disable and remove swapfile to reclaim disk space
swapoff swapfile.img  # may hang for a long time
rm swapfile.img 
mdurant
  • 27,272
  • 5
  • 45
  • 74
ti7
  • 16,375
  • 6
  • 40
  • 68
  • `df = dd.read_json("filename.json")` will indeed read the *whole* file in one go; the only exception is if you have line-delimited JSON, which is not the case. – mdurant Dec 02 '20 at 15:38
0

The problem is, that dask will split the file on newline characters by default, and you can't guarantee that this will not be in the middle of one of your tables. Indeed, even if you get it right, you still need to manipulate the resultant text to make complete JSON objects for each partition.

For example:

def myfunc(x):
    x = "".join(x)
    if not x.endswith("}"):
        x = x[:-2] + "}"
    if not x.startswith("{"):
        x = "{" + x
    return [json.loads(x)]

db.read_text('temp.json', 
             linedelimiter="\n    },\n", 
             blocksize=100).map_partitions(myfunc)

In this case, I have purposefully made the blocksize smaller than each part to demonstrate: you will get a JSON object or nothing for each partition.

_.compute()

[{'Table1': {'Records': [{'Key1Tab1': 'SomeVal', 'Key2Tab1': 'AnotherVal'},
    {'Key1Tab1': 'SomeVal2', 'Key2Tab1': 'AnotherVal2'}]}},
 {},
 {'Table2': {'Records': [{'Key1Tab1': 'SomeVal', 'Key2Tab1': 'AnotherVal'},
    {'Key1Tab1': 'SomeVal2', 'Key2Tab1': 'AnotherVal2'}]}},
 {},
 {},
 {}]

Of course, in your case you can immediately do something with the JSON rather than return it, or you can map to your writing function next in the chain.

mdurant
  • 27,272
  • 5
  • 45
  • 74
  • 1
    Something of this style could be a great intermediate solution (though a much larger block size, like 2x the largest db object may be more fitting to prevent the chance of one being in a boundary) .. or a custom solution with [ijson](https://pypi.org/project/ijson/) – ti7 Nov 30 '20 at 20:10
  • Exactly - once you have the text blocks correct, you can handle the data however is most convenient to get your job done. – mdurant Nov 30 '20 at 20:45
0

When working with large files the key to success is processing the data as a stream, i.e. in filter-like programs.

The JSON format is easy to parse. The following program reads the input char by char (I/O should be bufferred) and cuts the top-level JSON object to separate objects. It properly follows the data structure and not the formatting.

The demo program just prints "--NEXT OUTPUT FILE--", where real output file switch should be implemented. Whitespace stripping is implemented as a bonus.

import collections

OBJ = 'object'
LST = 'list'

def out(ch):
    print(ch, end='')

with open('json.data') as f:
    stack = collections.deque(); push = stack.append; pop = stack.pop
    esc = string = False
    while (ch := f.read(1)):
        if esc:
            esc = False
        elif ch == '\\':
            esc = True
        elif ch == '"':
            string = not string
        if not string:
            if ch in {' ', '\t', '\r', '\n'}:
                continue
            if ch == ',':
                if len(stack) == 1 and stack[0] == OBJ:
                    out('}\n')
                    print("--- NEXT OUTPUT FILE ---")
                    out('{')
                    continue
            elif ch == '{':
                push(OBJ)
            elif ch == '}':
                if pop() is not OBJ:
                    raise ValueError("unmatched { }")
            elif ch == '[':
                push(LST)
            elif ch == ']':
                if pop() is not LST:
                    raise ValueError("unmatched [ ]")
        out(ch)

Here is a sample output for my testfile:

{"key1":{"name":"John","surname":"Doe"}}
--- NEXT OUTPUT FILE ---
{"key2":"string \" ] }"}
--- NEXT OUTPUT FILE ---
{"key3":13}
--- NEXT OUTPUT FILE ---
{"key4":{"sub1":[null,{"l3":true},null]}}

The original file was:

{
    "key1": {
        "name": "John",
        "surname": "Doe"
    },  
    "key2": "string \" ] }", "key3": 13, 
    "key4": {
        "sub1": [null, {"l3": true}, null]
    }   
}
VPfB
  • 14,927
  • 6
  • 41
  • 75