0

I have a "generic" spark structured stream job, which monitors a top level folder (an umbrella) and goes through all the subfolders (kafka topic data) and then writes each of those Kafka topic data folders as delta in separate output container. Each Kafka topic data folder will have its own output folder. Furthermore, compaction is done and also creation of external tables. The aim is to have this as generic as possible, so that for every Kakfka topic data folder, one does not need to write a separate spark structured streaming job.

Now the problem, of course . For historical reasons, some of the Kafka data topic folder have files as avro whereas for some , it is parquet. We are now, for certain reasons, shifting towards avro being a common format in the landing zone. But the problem is the topics where the data is in parquet. Point to note is : One Kafka data topic has one format only (either avro or parquet). But the problem is the fact that I am trying to write this generic spark streaming job, looking at the top level folder.

Specifying the format as either avro or parquet, throws exception while doing readstream and then it throws a very generic exception as AnalysisException, which can result from other reasons also (not just problems with inferring schema). Right now, I am just doing a blanket exception handling, but it hides the real errors, which is not good. How do I completely ignore the avro topic folders, when doing a spark streaming job with format as parquet? I have tried both format and pathGlobFilter but when it goes to an avro folder , it filters all the avro files away and then gets an empty folder which again throws an exception.

Please note there is no pattern in the name that can be used to distinguish between subfolders containing avro and parquet. Pictorially explained below enter image description here

Enough said, code snippet, depicting the problem. Check the last line and especially the try catch where I am asking for help here.

top_level_folder_path= f"abfss://{sourcecontainer}@{datalakename}.dfs.core.windows.net/toplevelfolder"

for datahub_domain in dbutils.fs.ls(top_level_folder_path):
    for datahub_topic in topicsearchpath:
        ....derive some variables here.....
        ...............................
        # some config
         cloudfile = {
                    "cloudFiles.format": "parquet",
                    "cloudFiles.includeExistingFiles": "true",
                    "cloudFiles.inferColumnTypes": "true",
                    "cloudFiles.schemaLocation": f"abfss://raw@{datalakename}.dfs.core.windows.net/{originaldomainname}/autoloader/schemas/{originaltopicname}/",
                    "cloudFiles.schemaEvolutionMode": "addNewColumns",
                    "cloudFiles.allowOverwrites": "true",
                    "ignoreCorruptFiles": "true",
                    "ignoreMissingFiles": "true",
                }
                try:
                    df = (
                        spark.readStream.format("cloudFiles")
                        .options(**cloudfile)
                        .load(datahub_topic.path)
                    )
                    dstreamQuery = (
                        df.writeStream.format("delta")
                        .outputMode("append")
                        .queryName(f"{schema_name}_raw_{table_name}")
                        .option(
                            "checkpointLocation",
                            f"abfss://raw@{datalakename}.dfs.core.windows.net/autoloader/checkpoint/{originaldomainname}/{originaltopicname}/",
                        )
                        .option("mergeSchema", "true")
                        .partitionBy("Year", "Month", "Day")
                        .trigger(availableNow=True)
                        .start(
                            f"abfss://raw@{datalakename}.dfs.core.windows.net/{originaldomainname}/delta/{originaltopicname}"
                        )
                    )
                    while len(spark.streams.active) > 0:
                        spark.streams.awaitAnyTermination()
                except Exception as e: # I DO NOT WANT TO DO THIS
                    logger.warning(f"Error reading stream: {str(e)}") #### VERY BAD

EDIT 1:

In response to one of the comments about the variables , used in the loops, here is more info

for datahub_domain in dbutils.fs.ls(filePath):
  if datahub_domain.size == 0 and 'datahub' in datahub_domain.name:
    topicsearchpath= dbutils.fs.ls(datahub_domain.path)
    for datahub_topic in topicsearchpath:
        ...rest of the code, which I already shared

here

  • filepath is the top level folder. Please note that there can be multiple top level folders, each with a structure similar to the one I had in my picture.
  • datahub_domain : is one top level folder, which dbutils.fs.ls returns. In the example , I illustrated in the picture attached, it would return "toplevelfolder".
  • topicsearchpath: would hold the list of subfolders (the actual topic folders). This corresponds to "topic 1" and "topic 2" in picture attached. Please note that readstream actually gets the path of each individual topic folder. therefore it starts one stream per Topic folder. i.e. one stream for topic 1, one stream for topic 2 and so on

Also, the multiple streams kicked off during a run of the job, can be seen below (these are the streams, which got kicked off because the topic folders had data as parquet. also to re-iterate- one topic folder (which is passed to readStream), has ONLY 1 kind of files (either avro or parquet).

Also please note that this job works very well and we get different delta tables for different topics and I am only looking at a way to gracefully ignore the topic folders where there is avro. I have thought of using a method that is called to check for a specific extension before doing readStream but that would mean doing a file listing and since this is not based on dates (but rather the checkpoint), I would have to also, figure out which partition to check for (since the data is partitioned by year, month and day).

enter image description here

Kashyap
  • 15,354
  • 13
  • 64
  • 103
Saugat Mukherjee
  • 778
  • 8
  • 32
  • Doesn't make sense to create a single stream out of different file formats, let alone different topics (which c/should have diff schemas). Either topics/partitions are designed badly or the consumers. Do one of these: 1. Pass path to topic sub-folder to `load()` while reading E.g. `f"{datahub_topic.path}/Topic1"`. 2. You say you tried `pathGlobFilter`, post that code and the full exception message. Also note *"throws a very generic exception as AnalysisException"* -- spark usually provides a very useful message "suited for dummies" in `AnalysisException`. I won't call it "generic". – Kashyap Mar 23 '23 at 02:49
  • Also values of `topicsearchpath`, `dbutils.fs.ls(top_level_folder_path)` and folder structure etc are not clear from post. Add some debug logs and post as much details as possible, at least stuff I mentioned. – Kashyap Mar 23 '23 at 02:54
  • Hey. Thanks for taking the time out to respond. I however do not think you read the question in its entirety. First, there is no "single stream". There are "multiple streams" each arising out of "one Kafka topic folder" and writing to its own destination. So, each Kafka topic subfolder is one stream. This is illustrated in the "2nd for loop". Also, if you see the read stream it points to "datahub_topic.path". So, the loops are only to start as many streams as the number of topic folders and writing them to their "separate" destinations. – Saugat Mukherjee Mar 23 '23 at 08:13
  • Anyways, I think I am going to follow this https://stackoverflow.com/questions/58240991/how-to-handle-an-analysisexception-on-spark-sql (the answer by WagnerAlbJr) i.e. in addition to the Exception , also check the error message for "keywords" – Saugat Mukherjee Mar 23 '23 at 08:16
  • Regarding the topicsearch and other variables used, I think that is a fair ask, so for clarity I have updated my question under the section EDIT 1 (with some explanation) – Saugat Mukherjee Mar 23 '23 at 08:29

1 Answers1

1

First option:

  1. It would be good in your case to have table containing basically two columns: topic_name and format (which may be avro or parquet). Once you have that table, you can dynamically create your streams as format will be known to you.
  2. So as first step (before you start streams) you need to populate this table. This can be done easily as union of two autoloaders:
(
  spark
  .readStream
  .format("cloudfiles")
  .option("cloudFiles.format", "parquet")
  .select(
    lit("parquet").alias("format"),
    some_function_to_extract_topic_name_from(input_file_name()).alias("topic")
  )
  .dropDuplicates()
)
(
  spark
  .readStream
  .format("cloudfiles")
  .option("cloudFiles.format", "avro")
  .select(
    lit("avro").alias("format"),
    some_function_to_extract_topic_name_from(input_file_name()).alias("topic")
  )
  .dropDuplicates()
)
  1. You iterate through this table and basically create your streams from this.

Second option:

Load file names using batch read of binary format (just to collect file names):

(
  spark
  .read
  .format("binaryFile")
  .option("pathGlobFilter", "*.{parquet,avro}")
  .load("<path_with_pattern_for_date>")
  .select(
    extract_extension(input_file_name()).alias("format"),
    extract_topic(input_file_name()).alias("topic")
  )
)
partlov
  • 13,789
  • 6
  • 63
  • 82
  • 1
    Thanks for taking the time out to respond and posting a probable answer. This does require a one time effort/job (outside the autoloader) to create a table, which takes a random date (where we know will have data for all folders/topics, as the topics are partitioned by year, month and day) and scan those folders to determine the type. But I guess, that is the most elegant way, at this point. Thanks ! I will hold off marking this as answer to see if there are even better alternatives, but if none within a day, I will mark this one. Upvoted. – Saugat Mukherjee Mar 23 '23 at 13:38
  • I already had couple of problems which would be solved by stream reader which will take folder and load not the content of files, but just file names. This problem would be easily solved by that as well. Also if it would be able to have autoloader with something like format("binary") you could do all files and use their names only to later load content in proper format. But so far didn't found that kind of reader and may be good suggestion for small open source project. – partlov Mar 24 '23 at 10:26
  • It actually exist as batch reder: https://spark.apache.org/docs/latest/sql-data-sources-binaryFile.html, so you may execute this before maybe? – partlov Mar 24 '23 at 10:28