I am looking for logging capabilities in PySparks
DataFrameReader and DataFrameWriter. When dataframes are read or written, the number of files
, partitions
, records/rows
, rawbytes
etc. involved in the operation should be tracked and returned to be usable afterwards.
I have checked official Apache Spark
documentation and searched with Google, but seems to be no such capability implemented in PySpark
right now. These are ideas I came up with:
- Checking same directories (used at read/write) with
dbutils
inDatabricks
, cloud provider libraries (boto3
for AWS S3 orazure-storage-blob
for Azure) orPython
builtinos
package - Interact with
SparkContext
and use something like StatusTracker to monitor job/stage progress and return job information viajobId
(-> not checked) - Run
Spark
in some kind of debug/dryrun mode and extract infos from loggers (-> not checked)
But I am more looking for any easy-to-use implementation like with a fictional option("statistics", "true")
setting:
df, read_statistics = spark.read.option("statistics", "true").csv("inputfile.csv")
write_statistics = df.write.option("statistics", "true").csv("outputfile.csv")
Thanks in advance for any insights on alternative implementations or future release plans of the community!
SOLUTION (partial):
An easy-to-use solution (for number of bytes
and records
) based on collecting and aggregating Spark
events is via ContextSparkListener
from pyspark-spy package:
from pyspark_spy import ContextSparkListener, register_listener
listener = ContextSparkListener()
register_listener(spark_context, listener)
with listener as events:
df = spark.read.csv("inputfile.csv")
spark.write.csv("outputfile.csv")
print("Read statistics: ", listener.stage_input_metrics_aggregate())
print("Write statistics: ", listener.stage_output_metrics_aggregate())
>>> Read statistics: InputMetrics(bytesRead=140129708, recordsRead=271502)
>>> Write statistics: OutputMetrics(bytesWritten=136828555, recordsWritten=265106)