0

I have the following partition strategy in an ADLS Gen2 store

dir_parquet = "abfss://blah.windows.net/container_name/project=cars/make=*/model=*/*.parquet"

And this would load in the already partitioned data into a dataframe accordingly. I am aware of using the .filepath(n) in SQL to achieve this, and effectively require the same thing but in a notebook dataframe.

How can I keep the project, make and model values in the dataframe as separate columns?

According to this other SO thread setting .option("mergeSchema","true") on read would work however it did not.

Thanks.

CHEEKATLAPRADEEP
  • 12,191
  • 1
  • 19
  • 42
Paul Wilson
  • 562
  • 5
  • 16

2 Answers2

1

Have you tried to add a base path to the call:

val dataFrame = sparkSession.read
  .option("basePath", path)
  .parquet(path + "/day=*/hour=*/platform=*/*.parquet")

If I do not add the base path I won't have the columns day/hour/platform. I do with the option.

This article kind of explain why it is not working without, but actually forgot that you can add it manualy.

Observer
  • 122
  • 6
0

Since I received no answer to this and cannot find an official means to do so, I wrote the below code.

People with this problem may also find recursively returning blob directories to be useful and if so please see the deep_ls function here (not my code).

import pyspark
import pyspark.sql.functions as F
from typing import List

def load_dataframes_with_partition_steps(dir_urls:List[str]) -> List[pyspark.sql.dataframe.DataFrame]:
    """
    Written by: Paul Wilson, 2022-07-29

    Takes in a list of blob directories including their partition steps and returns a list of dataframes with the associated
    partition steps in the in the dataframe.

    Ex. input...:
        ['abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
            'abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Ford/model=Fiesta/transmission=Automatic']

    ...which is turned into a list of dicts...
        [{'url': 'abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
        'make': 'Vauxhall',
        'model': 'Astra',
        'transmission': 'Manual'},
        {'url': 'abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Ford/model=Fiesta/transmission=Automatic',
        'make': 'Ford',
        'model': 'Fiesta',
        'transmission': 'Automatic'}]

    ...and from that list a list of dataframes per url and associated partition steps, such as:
        [df1, df2, ..., dfn]
    """

    def load_dataframe(url:str=None, partition_steps:dict={}, file_format:str=None, df:pyspark.sql.dataframe.DataFrame=None) -> pyspark.sql.dataframe.DataFrame:
        """
        Recursively load a dataframe and apply the partition steps via withColumn
        """
        if file_format is None or len(file_format) == 0:
            raise(ValueError('file_format must not be none, the URL must end in the file format (.parquet, .csv, etc)'))

        # if there is a url and non empty partition steps without a df loaded then load the dataframe
        if (url is not None and len(partition_steps.keys()) > 0 and df is None):
            df = spark.read.format(file_format).load(url)

            # df is loaded so do not pass a url indicating it is loaded
            return load_dataframe(url=None, partition_steps=partition_steps, df=df)

        # if here then the df is loaded and proceed to apply withColumn
        if (url is None and df is not None and len(partition_steps.keys()) > 0):

            # load the first item in the partition steps dict
            key = list(partition_steps.keys())[0]
            value = list(partition_steps.values())[0]

            # remove the first item from the partition steps dict
            partition_steps.pop(key)

            # load the dataframe with the new partition step
            df = df.withColumn(key, F.lit(value))
            return load_dataframe(url=None, partition_steps=partition_steps, df=df)

        # if it makes it here then the dataframe is loaded and the partition steps are applied
        return df

    # list of dataframe dict values of url and partition steps
    list_df_dicts = list()

    if not isinstance(dir_urls, list):
        raise TypeError('dir_urls must be a list of string values')

    # iterate over all urls and generate dict of partition values
    for url in dir_urls:

        # dict to store url and partition steps
        d_dict = dict()
        d_dict['url'] = url

        # get the format from the last part of the url
        file_format = url.split('.')[-1]
        d_dict['file_format'] = file_format

        # split the url keeping only partition steps (ex. make=Vauxhall)
        url_split = [u for u in d_dict['url'].split('/') if '=' in u]

        if len(url_split) == 0:
            raise ValueError('The list of URLs must contain the partition steps, ex. make=ford')

        # turn the partition=item into a key:value
        partition_items = [u.split('=') for u in url_split]

        # iterate over every item in partition_items=[['key', 'value']] and set dict[key] = value
        for item in partition_items:
            key = item[0]
            value = item[1]
            d_dict[key] = value

        list_df_dicts.append(d_dict)

    # iterate over all the dicts and load the dataframes to a list with their partition steps in place
    list_dfs = list()
    for d_dict in list_df_dicts:

        # get the url from the d_dict
        url = d_dict['url']

        # get the format
        file_format = d_dict['file_format']

        # remove the url from the d_dict
        d_dict.pop('url')
        df = load_dataframe(url=url, partition_steps=d_dict, file_format=file_format)
        list_dfs.append(df)

    # return the list of dataframes
    return list_dfs
Paul Wilson
  • 562
  • 5
  • 16