2

Problem Statement: I have around 500 ZIP files with lots of XMLS, i am able to convert them to JSON and parse them to parquet files as example below for one nested JSON file.

Not able to process multiple files with spark also

I have below code that flattens whole JSON into pandas data frame but now have to run this code over 150,000 files. when my JSON is very big it takes around 2 minutes to flatten whole data. Also if i run it using SPARK over my RDD of multiple files it fails with either OOM or struct error.

Am i doing something wrong SPARK wise ?

import xmltodict
import pandas as pd

def parser(master_tree):
  flatten_tree_node = []
  def _process_leaves(tree:dict,prefix:str = "node", tree_node:dict = dict(), update:bool = True):
      is_nested = False
      if isinstance(tree,dict):
        for k in tree.keys():
            if type(tree[k]) == str:
                colName = prefix + "_" + k
                tree_node[colName] = tree[k]
            elif type(tree[k]) == dict:
                prefix += "_" + k
                leave = tree[k]
                _process_leaves(leave,prefix = prefix, tree_node = tree_node, update = False)
        for k in tree.keys():
            if type(tree[k]) == list:
                is_nested = True
                prefix += "_" + k
                for leave in tree[k]:
                    _process_leaves(leave,prefix = prefix, tree_node = tree_node.copy())
        if not is_nested and update:
            flatten_tree_node.append(tree_node)
        
  _process_leaves(master_tree)
  df = pd.DataFrame(flatten_tree_node)
  df.columns = df.columns.str.replace("@", "_")
  df.columns = df.columns.str.replace("#", "_")
  return df


def extractor(file_name,file):
    data = file.decode('utf-8')
    d = bytes(bytearray(data, encoding='utf-8'))
    data = xmltodict.parse(d)
    flatten_data = parser(dict_data)
    return (file_name,flatten_data)
    
def extract_files(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return [extractor(file_name,file_obj.open(file_name).read()) for file_name in files]
    
zip_rdd = spark.read.format('binaryFile').load('/home/me/sample.zip').select('path','content').rdd

Fails here at the time of collection:

collected_data = zip_rdd.map(extract_files).collect()

Below Errors:

org.apache.spark.api.python.PythonException: 'struct.error: 'i' format requires -2147483648 <= number <= 2147483647'. Full traceback
 

or

java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:123

Although everything works fine when ran one only single file.

Example Run of parsing nested JSON using parser function is like below:

Is there a way to make it memory and speed efficient ?

import pandas as pd
tree=     {
    "products":
    [

        {
            "id":"0",
            "name": "First",
            "emptylist":[],
            "properties" : 
            {
              "id" : "",
              "name" : ""
            }
        },
        {
            "id":"1",
            "name": "Second",
            "emptylist":[],
            "properties": 
            {
                "id" : "23",
                "name" : "a useful product",
                "features" :
                [
                    {
                        "name":"Features",
                        "id":"18",
                        "features":
                        [
                            {
                                "id":"1001",
                                "name":"Colour",
                                "value":"Black"
                            },
                            {
                                "id":"2093",
                                "name":"Material",
                                "value":"Plastic"
                            }
                        ]
                    },
                    {
                        "name":"Sizes",
                        "id":"34",
                        "features":
                        [
                            {
                                "id":"4736",
                                "name":"Length",
                                "value":"56"
                            },
                            {
                                "id":"8745",
                                "name":"Width",
                                "value":"76"
                            }
                        ]
                    }
                ]
            }
        },
        {
            "id":"2",
            "name": "Third",
            "properties" : 
            {
                "id" : "876",
                "name" : "another one",
                "features" : 
                [
                    {
                        "name":"Box",
                        "id":"937",
                        "features":
                        [
                            {
                                "id":"3758",
                                "name":"Amount",
                                "value":"1"
                            },
                            {
                                "id":"2222",
                                "name":"Packaging",
                                "value":"Blister"
                            }
                        ]
                    },
                    {
                        "name":"Features",
                        "id":"8473",
                        "features":
                        [
                            {
                                "id":"9372",
                                "name":"Colour",
                                "value":"White"
                            },
                            {
                                "id":"9375",
                                "name":"Position",
                                "value":"A"
                            },
                            {
                                "id":"2654",
                                "name":"Amount",
                                "value":"6"
                            }
                        ]
                    }
                ]
            }
        }
    ]
}


def parser(master_tree):
  flatten_tree_node = []
  def _process_leaves(tree:dict,prefix:str = "node", tree_node:dict = dict(), update:bool = True):
      is_nested = False
      if isinstance(tree,dict):
        for k in tree.keys():
            if type(tree[k]) == str:
                colName = prefix + "_" + k
                tree_node[colName] = tree[k]
            elif type(tree[k]) == dict:
                prefix += "_" + k
                leave = tree[k]
                _process_leaves(leave,prefix = prefix, tree_node = tree_node, update = False)
        for k in tree.keys():
            if type(tree[k]) == list:
                is_nested = True
                prefix += "_" + k
                for leave in tree[k]:
                    _process_leaves(leave,prefix = prefix, tree_node = tree_node.copy())
        if not is_nested and update:
            flatten_tree_node.append(tree_node)
        
  _process_leaves(master_tree)
  df = pd.DataFrame(flatten_tree_node)
  df.columns = df.columns.str.replace("@", "_")
  df.columns = df.columns.str.replace("#", "_")
  return df

print(parser(tree))

  node_products_id node_products_name  ... node_products_properties_features_features_name node_products_properties_features_features_value
0                1             Second  ...                                          Colour                                            Black
1                1             Second  ...                                        Material                                          Plastic
2                1             Second  ...                                          Length                                               56
3                1             Second  ...                                           Width                                               76
4                2              Third  ...                                          Amount                                                1
5                2              Third  ...                                       Packaging                                          Blister
6                2              Third  ...                                          Colour                                            White
7                2              Third  ...                                        Position                                                A
8                2              Third  ...                                          Amount                                                6
9                2              Third  ...                                             NaN                                              NaN

[10 rows x 9 columns]
  • what spark version are you on? – Matt Andruff Jun 22 '22 at 15:28
  • Apache Spark 3.2.1 –  Jun 22 '22 at 15:30
  • "Am i doing something wrong SPARK wise ?" - well, I see no Spark in your parser. So, yes... If you want to leverage Spark, you must use Spark functionality as much as possible instead of just feeding python to it. Your parser should be rewritten in Spark in order to make it efficient. – ZygD Jun 26 '22 at 12:21
  • But SPARK is not good at exploding unknown number depth of ARRAYTYPE and STRUCTTYPE. It will go OOM once depth or breath of node increases too much. And definitely i don't know the size and depth –  Jun 27 '22 at 12:45
  • Any reason to not use the `spark-xml` library: https://github.com/databricks/spark-xml except maybe no support for `zip` compression? Also, do not use `collect()` unless you are sure that the entire data that will be collected will fit into a single node. Sure shot way to get OOMed. – Amit Singh Jul 01 '22 at 06:36

3 Answers3

0

Do not collect this data, it's likely it will never fit in memory as you are trying to pull all the data into the driver.

You can just save it to a file directly.

collected_data = zip_rdd.map(extract_files).toDF("column","names","go","here")
collected_data.write.parquet("/path/to/folder")
Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • problem with spark is i have to have same fields but i don't as i am creating dataframes from nested JSON. For those i don't know the schema. That is why chose pandas so that i can write separate parquet files and then read them through spark option "mergeSchema" –  Jun 22 '22 at 15:23
  • don't call collect regardless of your path forward. It will never work. – Matt Andruff Jun 22 '22 at 15:30
0

I do not have spark 3.2 but I'm aware of the features it posses. And in this case it will make your life easy. unionByName is a new feature that will let you magically join schemas.

collected_data = spark.createDataFrame( data = [], schema = [] )

zip_array = spark.read.format('binaryFile').load('/home/me/sample.zip').select('path').collect() # this will likely fit in driver memory so it's OK to call. After all it's just a list of file paths.

for my_file in zip_array:
 collected_data = collected_data.unionByName( spark.createDataFrame(extract_files(my_file)), allowMissingColumns=True )
collected_data.write.parquet("/path/to/folder")
Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • Thanks, tried it but still getting it OOM at time of UNIONBYNAME as there are around 600 XMLS in one zip file. –  Jun 22 '22 at 16:17
  • if there is a way of processing lots of ZIP files better than Pyspark. I can look forward to it too. –  Jun 22 '22 at 16:26
0

For better efficiency you want to use mapParitions. There are a couple reasons why but this actually goes back to map/reduce era. You want to create an iterator as this can work at lower levels. Can be optimized and pipelined better.(Hence the use of yield)

MapParitition code will execute inside an executor, and can only contain 'python code'. No spark code allowed as you don't have access to the sparkContext in an executor. Sometimes requires imports to be completed in the function itself as the scope is local not global.

If you are looking to save more memory, you might want to reconsider an alternative to xmltodict.parse(d) and re-writing reformat. You could use a library that you initiate once per partition and re-use it for the entire set of rows in the partition. This would be more efficient than the static call to xmltodict.parse(d) that just uses memory to create the struct just to be thrown away immediately by the garbage collector as it goes out of scope. (Google lists several alternatives you can review and determine what one best fits your needs)


zip_array = spark.read.format('binaryFile').load('/home/me/sample.zip').select('path').collect() # this will likely fit in driver memory so it's OK to call. After all it's just a list of file paths.

def reformat(partitionData):
 for row in partitionData:
  in_memory_data = io.BytesIO(row[1])
  file_obj = zipfile.ZipFile(in_memory_data, "r").namelist()
  for file_name in file_obj:
    yield extractor(file_name,file_obj.open(file_name).read())  
    

collected_data = zip_array.rdd.mapPartitions(reformat).toDF("file_name","flattened_data")
collected_data.write.parquet("/path/to/folder")
Matt Andruff
  • 4,974
  • 1
  • 5
  • 21
  • I think it is better to try code once. I am getting below pickling error when used mapPartitions............org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 121830.0 failed 4 times, most recent failure: Lost task 0.3 in stage 121830.0 (TID 785133) (10.32.1.97 executor 569): org.apache.spark.api.python.PythonException: 'TypeError: 'itertools.chain' object is not subscriptable', –  Jun 30 '22 at 10:42
  • try updated code. I had a copy paste error. (`x[[1]` sshould be `row[1]`. Sorry. Wihtout providing sample data I can't really run the code here. – Matt Andruff Jun 30 '22 at 12:33