1

I am reading (using polars/pyarrow) a directory full of parquets from Azure blob storage that totals 60gb on a Standard_E8_v3 (8 cores, 64 GB RAM, 200 GB disk) compute instance.

After I have read the data I am wanting to group the data and collect the result, however on collecting the result I receive this error

enter image description here

I don't really understand what it is telling me -

Is it saying that it can't process my data because it is too big for the machine I am on?

Is there an error with the code?

Is there an error with the data I need to handle?

If anyone could highlight the issue that would be much appreciated - it is important that any solution is a Polars based one :)

Code below:

import pyarrow.dataset as ds
from azureml.fsspec import AzureMachineLearningFileSystem
import polars as pl
from azureml.core import Workspace

ws = Workspace.from_config()

# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
workspace = ws.name
datastore_name = 'datastore_name'
path_on_datastore = 'path_to_data'

# long-form Datastore uri format:
uri = f'azureml://subscriptions/{subscription}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}'

aml_fs = AzureMachineLearningFileSystem(uri)
files = aml_fs.glob()

myds=ds.dataset(path_on_datastore, filesystem=aml_fs, format="parquet")
df = (
    pl.scan_pyarrow_dataset(myds)
    .select([
        'COLUMN_LIST'
    ])
    #.with_columns(pl.col('turnovervalue').cast(pl.Float64, strict=False))
    .filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)

grouped = (df.lazy()
    .groupby(['colA','colB'])
    .agg(
        [
            pl.n_unique('colC').alias('Blah'),
            pl.sum('colD').alias("BlahBlah"),
            pl.n_unique('colE').alias('BlahBlahBlah'),
            (pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
        ]
    )
).collect()

EDIT:

Checked the schema of my Polars dataframe and it outputs a sensible result so I assume my connection to Azure is correct so I thought I would go upstream and check if what Polars is reading in from Pyarrow is working but looks like it is a Pyarrow issue rather than a Polars one. The below snip is from me just checking the head of the Pyarrow dataset that I have got from Azure.

enter image description here

I would assume that it seems like the data type it has inferred isn't the data type it is receiving when reading it in however I am unsure what the data at position 4 is (in the whole table) and am unsure how I am going figure it out???

Going to change some of the tags and title to hopefully align the new issue with the right people that can help

Hillygoose
  • 177
  • 8

1 Answers1

0

So I have my code working from a change to how I accessed the data within Azure so I am assuming that this was the overarching issue.

Instead of using the AzureMachineLearningFileSystem I have turned to adlfs.AzureBlobFileSystem.

There is a bit more code involved to access all the correct credentials etc but isn't too verbose - and ultimately it is working :)

import pyarrow.dataset as ds
import polars as pl
import adlfs
from azureml.core import Workspace,Datastore
from azure.mgmt.storage import StorageManagementClient
from azure.identity import DefaultAzureCredential

# Acquire a credential object
credential = DefaultAzureCredential()
# Get Workspace
ws = Workspace.from_config()
# Get specific datastore
datastore = Datastore.get(ws,'datastore_name')

# Azure Machine Learning workspace details:
subscription = ws.subscription_id
resource_group = ws.resource_group
datastore_name = datastore.account_name
container_name = datastore.container_name
path_on_datastore = f'{container_name}/path/to/data'

# Provision the storage account, starting with a management object.
storage_client = StorageManagementClient(credential, subscription)

# Retrieve the account's primary access key
keys = storage_client.storage_accounts.list_keys(resource_group, datastore_name)
key_to_access = keys.keys[0].value

# ... load your credentials and configure the filesystem
fs = adlfs.AzureBlobFileSystem(account_name=datastore_name, account_key=key_to_access)

dd = ds.dataset(path_on_datastore, filesystem=fs)

df = (
    pl.scan_pyarrow_dataset(dd)
    .select([
        'COLUMN_LIST'
    ])
    .filter((pl.col('col1')>0)&(pl.col('col2') >= 2022))
)

grouped = (df.lazy()
    .groupby(['colA','colB'])
    .agg(
        [
            pl.n_unique('colC').alias('Blah'),
            pl.sum('colD').alias("BlahBlah"),
            pl.n_unique('colE').alias('BlahBlahBlah'),
            (pl.col('colF') == "C").count().alias('BlahBlahBlahBlah')
        ]
    )
).collect()

References for help to others:

Pyarrow connection to Azure Blob - https://arrow.apache.org/docs/python/filesystems.html#using-fsspec-compatible-filesystems-with-arrow

adlfs docs - https://github.com/fsspec/adlfs

Programmatically get blob connection string - How to programmatically retrieve the connection string from an Azure storage account in Python

Will accept my own answer for now but if there is a better way of doing this then please feel free to post and I will change the acceptance if so

Hillygoose
  • 177
  • 8