0

My data runs through a pipeline and I want to make sure that the input matches the output - making sure nothing in the pipeline is causing the data to change. To do this I am using Dask to compare the dataframes as the source contains over 2 million rows.

The code I am using to compare the dataframes is this:

import dask.dataframe as dd

input_df = dd.read_sql_table(input_table, con=input_engine.url, index_col=input_index_col_name, npartitions=1000)
output_df = dd.read_sql_table(output_table, con=output_engine.url, index_col=output_index_col_name, npartitions=1000)

for part_one, part_two in zip(input_df.partitions, output_df.partitions):
    input_part_df = part_one.fillna("NULL")
    output_part_df = part_two.fillna("NULL")
    print(input_part_df.eq(output_part_df).all().compute())

I am wondering if there is a way to process the input_df.fillna and output_df.fillna functions in tandem so that it is faster? I have to fill the na's or they won't match as true if they are equal.

I looked into using delayed for dask but I don't think it applies when pulling in the data for the dataframes before doing the comparison. As you can imagine, the whole thing runs rather slow due to the amount of records it is pulling.

  • I think you'd be better off doing this in SQL since you clearly have an SQL database..? – AKX Mar 17 '22 at 17:34
  • but Dask allows me to chunk the data rather than trying to pull it all in at once, which is what will happen with SQL? The input source is on one database and the output_source is on another. It's passing from one source to another through a pipeline – BigBlueMonster Mar 17 '22 at 17:37
  • How can they be different databases if `engine.url` and `table` are the same? Or did you maybe oversimplify your code for SO? :-) – AKX Mar 17 '22 at 17:38
  • Correct, I did! Edited now – BigBlueMonster Mar 17 '22 at 17:40
  • so you're just using this to check the contents of the data in the two tables against one-another to make sure they're identical? couldn't you hash the contents or something? – Michael Delgado Mar 17 '22 at 20:00
  • something like this... https://stackoverflow.com/questions/10799509/find-variations-between-two-database-tables-efficiently – Michael Delgado Mar 17 '22 at 20:02
  • but the short answer to your question is - don't loop over the partitions. You're defeating the purpose of dask by doing this. just do `input_df.fillna("NULL").eq(output_df.fillna("NULL")).all().compute())`. This will run in parallel by default. – Michael Delgado Mar 17 '22 at 20:06
  • btw - is your data a string type? if not, you could be really slowing things down by coercing the data to `object`. Try `((input_df == output_df) | (input_df.isnull() & output_df.isnull())).all().compute()` – Michael Delgado Mar 17 '22 at 20:08

1 Answers1

0

Though I agree with the above comments that there are likely better ways to make this comparison, one way to improve the snippet you've shared is to use fillna directly instead of in a loop. For example:

import dask.dataframe as dd
import pandas as pd

ddf = dd.from_pandas(
    pd.DataFrame({
        'a': [None, 1, 3],
        'b': [1, 'a', None]
    }), npartitions=2)
ddf = ddf.fillna('NULL')
ddf.compute()
scj13
  • 306
  • 1
  • 5
  • Maybe I am not grasping how dask works but if doing the above on the whole dataframe, rather than a partition, it will try and pull down the whole 2 million records? Luckily it fits within memory but if it didn't, surely looping throught the individual partions and performing my fillna task there is better? I may have to read up on this more – BigBlueMonster Mar 18 '22 at 08:24
  • The great thing about Dask is it will not do that! A Dask DataFrame is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames, so there is no need to write a for loop operating on the individual partitions. You can learn more from the docs [here](https://docs.dask.org/en/stable/dataframe.html) – scj13 Mar 18 '22 at 23:19