I have a "generic" spark structured stream job, which monitors a top level folder (an umbrella) and goes through all the subfolders (kafka topic data) and then writes each of those Kafka topic data folders as delta in separate output container. Each Kafka topic data folder will have its own output folder. Furthermore, compaction is done and also creation of external tables. The aim is to have this as generic as possible, so that for every Kakfka topic data folder, one does not need to write a separate spark structured streaming job.
Now the problem, of course . For historical reasons, some of the Kafka data topic folder have files as avro whereas for some , it is parquet. We are now, for certain reasons, shifting towards avro being a common format in the landing zone. But the problem is the topics where the data is in parquet. Point to note is : One Kafka data topic has one format only (either avro or parquet). But the problem is the fact that I am trying to write this generic spark streaming job, looking at the top level folder.
Specifying the format as either avro or parquet, throws exception while doing readstream
and then it throws a very generic exception as AnalysisException
, which can result from other reasons also (not just problems with inferring schema). Right now, I am just doing a blanket exception handling, but it hides the real errors, which is not good. How do I completely ignore the avro topic folders, when doing a spark streaming job with format as parquet? I have tried both format
and pathGlobFilter
but when it goes to an avro folder , it filters all the avro files away and then gets an empty
folder which again throws an exception.
Please note there is no pattern in the name that can be used to distinguish between subfolders containing avro and parquet. Pictorially explained below
Enough said, code snippet, depicting the problem. Check the last line and especially the try catch
where I am asking for help here.
top_level_folder_path= f"abfss://{sourcecontainer}@{datalakename}.dfs.core.windows.net/toplevelfolder"
for datahub_domain in dbutils.fs.ls(top_level_folder_path):
for datahub_topic in topicsearchpath:
....derive some variables here.....
...............................
# some config
cloudfile = {
"cloudFiles.format": "parquet",
"cloudFiles.includeExistingFiles": "true",
"cloudFiles.inferColumnTypes": "true",
"cloudFiles.schemaLocation": f"abfss://raw@{datalakename}.dfs.core.windows.net/{originaldomainname}/autoloader/schemas/{originaltopicname}/",
"cloudFiles.schemaEvolutionMode": "addNewColumns",
"cloudFiles.allowOverwrites": "true",
"ignoreCorruptFiles": "true",
"ignoreMissingFiles": "true",
}
try:
df = (
spark.readStream.format("cloudFiles")
.options(**cloudfile)
.load(datahub_topic.path)
)
dstreamQuery = (
df.writeStream.format("delta")
.outputMode("append")
.queryName(f"{schema_name}_raw_{table_name}")
.option(
"checkpointLocation",
f"abfss://raw@{datalakename}.dfs.core.windows.net/autoloader/checkpoint/{originaldomainname}/{originaltopicname}/",
)
.option("mergeSchema", "true")
.partitionBy("Year", "Month", "Day")
.trigger(availableNow=True)
.start(
f"abfss://raw@{datalakename}.dfs.core.windows.net/{originaldomainname}/delta/{originaltopicname}"
)
)
while len(spark.streams.active) > 0:
spark.streams.awaitAnyTermination()
except Exception as e: # I DO NOT WANT TO DO THIS
logger.warning(f"Error reading stream: {str(e)}") #### VERY BAD
EDIT 1:
In response to one of the comments about the variables , used in the loops, here is more info
for datahub_domain in dbutils.fs.ls(filePath):
if datahub_domain.size == 0 and 'datahub' in datahub_domain.name:
topicsearchpath= dbutils.fs.ls(datahub_domain.path)
for datahub_topic in topicsearchpath:
...rest of the code, which I already shared
here
- filepath is the top level folder. Please note that there can be multiple top level folders, each with a structure similar to the one I had in my picture.
- datahub_domain : is one top level folder, which dbutils.fs.ls returns. In the example , I illustrated in the picture attached, it would return "toplevelfolder".
- topicsearchpath: would hold the list of subfolders (the actual topic folders). This corresponds to "topic 1" and "topic 2" in picture attached. Please note that readstream actually gets the path of each individual topic folder. therefore it starts one stream per Topic folder. i.e. one stream for topic 1, one stream for topic 2 and so on
Also, the multiple streams kicked off during a run of the job, can be seen below (these are the streams, which got kicked off because the topic folders had data as parquet. also to re-iterate- one topic folder (which is passed to readStream), has ONLY 1 kind of files (either avro or parquet).
Also please note that this job works very well and we get different delta tables for different topics and I am only looking at a way to gracefully ignore the topic folders where there is avro. I have thought of using a method that is called to check for a specific extension before doing readStream but that would mean doing a file listing and since this is not based on dates (but rather the checkpoint), I would have to also, figure out which partition to check for (since the data is partitioned by year, month and day).