1

I am processing a very big dataset which includes 200 of compressed JSON files (each ~ 8G uncompressed) in Spark. I have created a main dataframe largeDF, and several additional dataframes to compute aggregates on nested attributes (which are arrays of structs). I want to perform a general stats computation (fill rates and group counts).

Each processing on the whole datasets takes ~20 minutes (to load the files, decompress, and perform aggregations). For 50 fields it takes ages because each time I am changing my criteria and run the queries with additional filters again and again.

I want to rely on the lazy evaluation of PySpark and avoid loading data several times, so I can create one complex aggregation and apply it once on the whole dataset, then convert all results to Pandas. Or better, if I can pre-define jobs and ask Spark to process them in parallel (load once, compute all), then return result for each job separately.

These are not my main ETL but I am trying to extract semantics of the dataset to write the actual ETL pipeline.

Compute 1: Calculate statistics and find fill rate for all fields:

stats = DF_large.describe().toPandas()

Compute 2: Process simple fields with categorical data:

def group_count(df, col, limit, sort, skip_null):
    """This function groups data-set on based on provided column[s], and counts each group."""
    if skip_null:
        df = df.where(df[col].isNotNull())
    if limit:
        df = df.limit(limit) 
    df = df.groupBy(col).count()
    if sort:
        df = df.sort(col, ascending=False)
    return df.toPandas()

aggregations = {}
for col in group_count_list_of_columns:
    aggregations[col] = group_count(largeDF, col, limit=0, skip_null=True, sort=False)

Compute 3: Count and calculate fill rate for nested fields:

def get_nested_fields(spDf, col : str, limit, othercols : tuple, stats = True):
    """This function unwinds a nested array field out of data-set based on provided column, and either returns the whole or statistics of it."""
    spDf = spDf.where(spDf[col].isNotNull())
    df = spDf.select(F.explode(col), *othercols)
    if limit:
        df = df.limit(limit)
    if stats:
        res = df.describe().toPandas()
    else:
        res = df.toPandas()
    return res

nested_fields_aggregate = {}
for col in nested_fields_lists:
    nested_fields_aggregate[col] = get_nested_field(largeDF, col, limit=10**4, othercols =['name', 'id', 'timestamp'], stats = True)

This requires the whole data-set to be read multiple times. The shapes are not the same so I cannot join. Theoretically there should be a way to reduce the time because none of the computations are dependent on each other.

user4157124
  • 2,809
  • 13
  • 27
  • 42
PyGuy
  • 434
  • 5
  • 15
  • 1
    What is ETL? I do not see how this is related to apache-spark, pyspark or nested-documents. You want to improve the speed, right? 1. Give us a minimal working example incl. producing sample data and expected output. You can simulate the data. Am I right to say that the main tasks are reading the files (an IO-Task on the harddrive), decompressing the files (CPU bounded task) and doing calculations (CPU task). Importent to know: Are there 20 separated data-analysis or do we need to join the 20 data-files and do one analysis on it? – buhtz Jan 13 '21 at 08:58
  • Have you looked into reading it all in, doing your initial set of ETL, and then pickling the file? – hrokr Jan 14 '21 at 17:02
  • @buhtzsaysgetvaccinated My main ETL task is to load all big files in warehouse, which is not my current intention. For now, I want to perform several statistical analysis on data. I have the queries, I want to find an efficient way to execute them altogether. Just consider running several jobs on the same time and letting the Spark to optimize running them. – PyGuy Jan 19 '21 at 17:33
  • 1
    You need to improve your question. It is still unclear and not reproducable. You even do not answer my simple question "What is ETL?". – buhtz Jan 20 '21 at 07:48

1 Answers1

0

Every time you call to pandas you are reading again DF_large dataframe. To avoid this you can cache this dataframe using DF_large = DF_large.cache().

Shadowtrooper
  • 1,372
  • 15
  • 28