0

I have 1 large dataframe that is created by importing a csv file (sparkscv). This dataframe has many rows of daily data. The data is identified by date, region, service_offered and count.

I filter for region and service_offered and aggregate the count (sum) and roll that up to month. Each time the filter is run in the loop, it selects a region, then a service_offered and aggregates that.

if I append that to the df over and over the big 0 starts to happen and it becomes very slow. There are 360 offices and about 5-10 services per office. How do I save a select/filter to a list first and append those before making the final dataframe?

I saw this post Using pandas .append within for loop but it only shows for list.a and list.b. What about 360 lists?

Here is my code that loops/aggregates the data

#spark session
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

#spark schema
schema = StructType([
  StructField('office', StringType(), True),
  StructField('service', StringType(), True),
  StructField('date', StringType(), True),
  StructField('count', IntegerType(), True)
  ])

#empty dataframe
office_summary = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)
office_summary .printSchema()

x = 1
try :
  for office in office_lookup :
  office = office[0]
  print(office_locations_count - x, " office(s) left")
  x = x + 1
    for transaction in service_lookup :
    transaction = transaction[0]  
    monthly_counts = source_data.filter((col("office").rlike(office)) & (col("service").rlike(transaction))).groupby("office", "service", "date").sum()
    #office_summary = office_summary.unionAll(monthly_counts)
except Exception as e:
print(e)

I know there are issues with like returning more results than expected, but that is not a problem with the current data. the first 30% of the process is very quick and then it starts to slow down as expected.

How do I save a filter result to a list, append or join that over and over and finally create the final dataframe? This code does finish, but it should not take 30 minutes to run.

Thanks!

GIZNAJ
  • 501
  • 5
  • 23
  • 1
    Why are you doing this in nested loops? – Andrew Oct 16 '20 at 16:43
  • @Andrew - what you're asking me is probably in the answer I'm looking for. The nested loop is so that I can do the combination of 'office' with each 'service' type. Is there another way to aggregate at that part? – GIZNAJ Oct 17 '20 at 22:49
  • Can you use 1 or more groupby() calls together? – GIZNAJ Oct 17 '20 at 22:50
  • @Andrew - now using Spark, there is no loop needed like you mentioned. Thanks for the input. I will write a more elaborate post for others to use as reference – GIZNAJ Jan 15 '21 at 16:11

1 Answers1

1

With the help from @Andrew, I was able to continue to use Pyspark Dataframes to accomplish this. The command scrubbed looks like this.

df2 = df1.groupby("Column1", "Column2", "Column3").agg(sum('COUNT'))

This allowed me to create a new dataframe based off df1 where grouping and aggregation was satisfied on one line. This command takes about 0.5 seconds to execute, over the other way in the initial post.

The constant creation of a new dataframe and retaining the old ones in memory were the problem. This is the most efficient way to do it.

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
GIZNAJ
  • 501
  • 5
  • 23