2

After resolving this issue: How to limit FPGrowth itemesets to just 2 or 3 I am trying to export the association rule output of fpgrowth using pyspark to .csv file in python. After running for almost 8-10 hrs it gives an error. My machine has enough space and memory.

    Association Rule output is like this:

    Antecedent           Consequent      Lift
    ['A','B']              ['C']           1

The code is in the link: How to limit FPGrowth itemesets to just 2 or 3 Just adding one more line

    ar = ar.coalesce(24)
    ar.write.csv('/output', header=True)

Configuration used:

 ``` conf = SparkConf().setAppName("App")
     conf = (conf.setMaster('local[*]')
    .set('spark.executor.memory', '200G')
    .set('spark.driver.memory', '700G')
    .set('spark.driver.maxResultSize', '400G')) #8,45,10
    sc = SparkContext.getOrCreate(conf=conf)
  spark = SparkSession(sc)

This keeps on running and consumed 1000GB of my C:/ drive

Is there any efficient way to save the output in .CSV format or .XLSX format.

The error is:

  ```The error is:

   Py4JJavaError: An error occurred while calling o207.csv.
   org.apache.spark.SparkException: Job aborted.at 
   org.apache.spark.sql.execution.
   datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)

   atorg.apache.spark.sql.execution.datasources.InsertIntoHadoopFs
   RelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
   at 
   org.apache.spark.sql.execution.command.
  DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.
  DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 9.0 failed 1 times, most recent failure: Lost task 10.0 in stage 9.0 (TID 226, localhost, executor driver): java.io.IOException: There is not enough space on the disk
at java.io.FileOutputStream.writeBytes(Native Method)



     The progress:
     19/07/15 14:12:32 WARN TaskSetManager: Stage 1 contains a task of very large size (26033 KB). The maximum recommended task size is 100 KB.
     19/07/15 14:12:33 WARN TaskSetManager: Stage 2 contains a task of very large size (26033 KB). The maximum recommended task size is 100 KB.
     19/07/15 14:12:38 WARN TaskSetManager: Stage 4 contains a task of very large size (26033 KB). The maximum recommended task size is 100 KB.
     [Stage 5:>                (0 + 24) / 24][Stage 6:>                 (0 + 0) / 24][I 14:14:02.723 NotebookApp] Saving file at /app1.ipynb
     [Stage 5:==>              (4 + 20) / 24][Stage 6:===>              (4 + 4) / 24]

Shubham Bajaj
  • 309
  • 1
  • 3
  • 12
  • You shouldn't use pandas to create a csv file. Just use the pyspark DatastreamWriter like `ar.write.csv('mycsv.csv')`. This will create a plenty csv-files. You can control the number with [ar.coalesce()](http://spark.apache.org/docs/2.2.1/api/python/pyspark.sql.html#pyspark.sql.DataFrame.coalesce) – cronoik Jul 02 '19 at 05:42
  • @cronoik can you say more about it? What should go inside ar.coalesce() – Shubham Bajaj Jul 02 '19 at 08:11
  • @cronoik it does not work since output is in form of list. – Shubham Bajaj Jul 02 '19 at 17:46

1 Answers1

1

Like already stated in the comments you should try to aviod toPandas(), as this function loads all your data to the driver. You can use the pysparks DataFrameWriter to write out your data, but you have to cast your array columns (antecedent and consequent) to a different format before you can write your data to csv, as arrays aren't support. One way to cast your columns to a supported type like string is concat_ws.

import pyspark.sql.functions as F
from pyspark.ml.fpm import FPGrowth

df = spark.createDataFrame([
    (0, [1, 2, 5]),
    (1, [1, 2, 3, 5]),
    (2, [1, 2])
], ["id", "items"])

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
model = fpGrowth.fit(df)
ar=model.associationRules.withColumn('antecedent', F.concat_ws('-', F.col("antecedent").cast("array<string>")))\
                         .withColumn('consequent', F.concat_ws('-', F.col("consequent").cast("array<string>")))
ar.show()

Output:

+----------+----------+------------------+----+ 
|antecedent|consequent|        confidence|lift| 
+----------+----------+------------------+----+ 
|         5|         1|               1.0| 1.0| 
|         5|         2|               1.0| 1.0| 
|       1-2|         5|0.6666666666666666| 1.0| 
|       5-2|         1|               1.0| 1.0| 
|       5-1|         2|               1.0| 1.0| 
|         2|         1|               1.0| 1.0| 
|         2|         5|0.6666666666666666| 1.0| 
|         1|         2|               1.0| 1.0| 
|         1|         5|0.6666666666666666| 1.0| 
+----------+----------+------------------+----+

You can now write your data to csv:

ar.write.csv('/bla', header=True)

This will create a csv file for each partition. You can change the number of partitions with:

ar = ar.coalesce(1)

If spark is not able to write the csv file due to memory issue, try a differnt number of partitions (before you call ar.write) and concat the files with other tools if necessary.

cronoik
  • 15,434
  • 3
  • 40
  • 78
  • Still getting the error as.... Py4JJavaError: An error occurred while calling o96.csv. : org.apache.spark.SparkException: Job aborted. at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159) – Shubham Bajaj Jul 04 '19 at 03:09
  • My data has 1,002,653 rows and 160 columns on which I am running FP growth. – Shubham Bajaj Jul 04 '19 at 04:15
  • Can you please add the full error message to your question? Have you tried different coalesce values? – cronoik Jul 04 '19 at 18:41
  • It failed at ar.write.csv('/bla', header=True) at this line. So could not apply different coalesce values. – Shubham Bajaj Jul 05 '19 at 14:26
  • You have to apply it before. – cronoik Jul 05 '19 at 14:29
  • Can you tell how to do that? – Shubham Bajaj Jul 05 '19 at 15:11
  • This will create one partiton: `ar = ar.coalesce(1)` after that it will write one file with: `ar.write.csv('/bla', header=True)`. In case this raises an error, try different values for coalesce. Maybe it makes sense to check the initial value with: `ar.rdd.getNumPartitions()` before you apply any coalesce(). In case you get an error message, please edit your original question and add the complete error message there. – cronoik Jul 05 '19 at 15:42
  • Sure. Give me sometime, I will re run and come back. Thank you. – Shubham Bajaj Jul 05 '19 at 16:24
  • ar.rdd.getNumPartitions() gave me 24. And then I followed your step with ar = ar.coalesce(24). From morning its running and consuming my C: drive. Out of 1000 GB memory it has already consumed 850 GB memory. :(. Still running. – Shubham Bajaj Jul 09 '19 at 14:17
  • Does calling `ar.count()` also lead to a crash? – cronoik Jul 10 '19 at 13:02
  • It has been 2 weeks and I have tried all possible things. But its just not working. And what is strange is I have just filtered Rules which have only 1 item in the antecedents. Still its consuming huge space. – Shubham Bajaj Jul 10 '19 at 16:35
  • I have used these config: conf = SparkConf().setAppName("App") conf = (conf.setMaster('local[*]') .set('spark.executor.memory', '200G') .set('spark.driver.memory', '700G') .set('spark.driver.maxResultSize', '400G')) #8,45,10 sc = SparkContext.getOrCreate(conf=conf) spark = SparkSession(sc) – Shubham Bajaj Jul 10 '19 at 17:17
  • Yes, it also leads to crash. – Shubham Bajaj Jul 11 '19 at 06:40
  • 1
    Well the occupied disk spaces are shuffle partitions because spark can't keep all the calculation data in memory. It doesn't crash while writting the csv, it crashes before during the calculation. How much RAM do you have? – cronoik Jul 11 '19 at 10:58
  • Its a beast machine. It has 224 GB RAM. Its deployed on azure. Any alternative. C drive has 1 TB space. – Shubham Bajaj Jul 11 '19 at 13:42
  • That is indeed huge, but a you could try a higher minSupport, try to exclude columns which aren't need for the FPGrowth (join them back after the calculation of FPGrowth has finished if needed). – cronoik Jul 11 '19 at 15:18
  • I am always using minSupport > 99%. :-) – Shubham Bajaj Jul 12 '19 at 06:30
  • Can we dump it to hive table? If its feasible can you show me how to do it? – Shubham Bajaj Jul 12 '19 at 07:23
  • I don't think that will change anything, as the calculation is not completed. Could you please try to reduce the number of columns with `model = fpGrowth.fit(df.select('items'))`? I had this issue once in a spark package were unused columns become part of the shuffle partitions, causing a crash for a otherwise simple calculation. I'm not sure if this also applies to fpGrowth, but I currently don't have the time to check the source code and you can simply try it out. – cronoik Jul 12 '19 at 18:21
  • Not working. Tried ```model = fpGrowth.fit(df.select('items')``` – Shubham Bajaj Jul 15 '19 at 09:22
  • Does it mean the error message is the (exactly) same? If this is case, then I'm out of ideas. – cronoik Jul 15 '19 at 14:02
  • Updated the progress above: – Shubham Bajaj Jul 15 '19 at 15:43
  • Hi @cronoik, I have posted one doubt. Can you have a look. https://stackoverflow.com/questions/57079588/writing-sql-query-subquery-for-pandas-multiple-dataframe – Shubham Bajaj Jul 17 '19 at 15:40