0

I am looking for logging capabilities in PySparks DataFrameReader and DataFrameWriter. When dataframes are read or written, the number of files, partitions, records/rows, rawbytes etc. involved in the operation should be tracked and returned to be usable afterwards.

I have checked official Apache Spark documentation and searched with Google, but seems to be no such capability implemented in PySpark right now. These are ideas I came up with:

  • Checking same directories (used at read/write) with dbutils in Databricks, cloud provider libraries (boto3 for AWS S3 or azure-storage-blob for Azure) or Python builtin os package
  • Interact with SparkContext and use something like StatusTracker to monitor job/stage progress and return job information via jobId (-> not checked)
  • Run Spark in some kind of debug/dryrun mode and extract infos from loggers (-> not checked)

But I am more looking for any easy-to-use implementation like with a fictional option("statistics", "true") setting:

df, read_statistics = spark.read.option("statistics", "true").csv("inputfile.csv")

write_statistics = df.write.option("statistics", "true").csv("outputfile.csv")

Thanks in advance for any insights on alternative implementations or future release plans of the community!

SOLUTION (partial):

An easy-to-use solution (for number of bytes and records) based on collecting and aggregating Spark events is via ContextSparkListener from pyspark-spy package:

from pyspark_spy import ContextSparkListener, register_listener

listener = ContextSparkListener()
register_listener(spark_context, listener)

with listener as events:
    df = spark.read.csv("inputfile.csv")
    spark.write.csv("outputfile.csv")

print("Read statistics: ", listener.stage_input_metrics_aggregate())
print("Write statistics: ", listener.stage_output_metrics_aggregate())

>>> Read statistics:  InputMetrics(bytesRead=140129708, recordsRead=271502)
>>> Write statistics:  OutputMetrics(bytesWritten=136828555, recordsWritten=265106)
choeh
  • 119
  • 1
  • 4
  • Spark provides a callback `SparkListenerInterface` and default no-op implementation. – mazaneicha Jun 09 '21 at 13:25
  • @mazaneicha Thanks for hinting to `SparkListenerInterface`. This is partly resolving the question due to only `bytes` and `records` are loggable when checking [OutputMetrics](https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/status/api/v1/OutputMetrics.html). So am I right, that logging number of `partitions` or `files` is not implemented to `SparkListener` itselt? – choeh Jun 09 '21 at 15:41
  • Not entirely. You can find all kind of info by interrogating `RDDInfo`, `StageInfo`, etc. data objects. – mazaneicha Jun 09 '21 at 15:58
  • @mazaneicha Looks promising, thanks for pointing out. When checking [RDDInfo](https://spark.apache.org/docs/latest/api/java/org/apache/spark/storage/RDDInfo.html) the number of `partitions` is easily available. Will check it soonly and post my solution with `PySpark` – choeh Jun 09 '21 at 16:24

0 Answers0