2

Objective

I'm building datalake, the general flow looks like Nifi -> Storage -> ETL -> Storage -> Data Warehouse.

The general rule for Data Lake sounds like no pre-processing on ingestion stage. All ongoing processing should happen at ETL, so you have provenance over raw & processed data.

Issue

Source system sends corrupted CSV files. Means besides header and data, the first too lines are always of free format metadata we'll never use. Only single table is corrupted, the corrupted CSV is used by single Spark job at the moment (lets call it X).

Question

Is it a good approach to remove those two lines at Nifi layer? See option 3 at "Workarounds".

Workarounds

  1. Handle the corrupted records inside Spark job X. IMHO, this is bad approach, because we gonna use that file at different tools in future (data governance schema crawlers, maybe some Athena/ADLA-like engines over ADLS/S3). Means corrupted records handling logic should be implemented at multiple places.
  2. Fix corrupted files on ETL layer and store them at "fixed" layer. All ongoing activities (ETL, data governance, MPP engines) will work only with "fixed" layer, instead of "raw" layer. This sounds for me as an overhead, to create a new layer for single CSV.
  3. Fix (remove the first two strings from the CSV) at Nifi layer. Means "raw" storage layer will always contain readable data. IMHO, this is good because it's simple and the handling logic is implemented at one place.
Jesse
  • 3,243
  • 1
  • 22
  • 29
VB_
  • 45,112
  • 42
  • 145
  • 293

2 Answers2

3

First thing, I think that your question is brilliant and in the way you expose the mental process I can say that you have your answer already.

As you mention

The general rule for Data Lake sounds like no pre-processing on the ingestion stage.

This is the philosophical bottom line, and all the hype is growing over this easy to oversimplify idea.

If we check the definition of AWS of what is a data lake.

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning to guide better decisions.

It is a basic definition, but let's use it as a "appeal to authority". They say clearly that you can store data "as-is".

  1. My first question is: does "you can" mean strictly "you should"?. Also, they mention that it allows you to "run different types of analytics—from dashboards and visualizations to big data processing", etc.
  2. My second question is: if the data is knowingly unstable for actually anything...is it legit to anyways dump it there?

In the same link, a bit below, the also say

The main challenge with a data lake architecture is that raw data is stored with no oversight of the contents. For a data lake to make data usable, it needs to have defined mechanisms to catalog, and secure data. Without these elements, data cannot be found, or trusted resulting in a “data swamp." Meeting the needs of wider audiences require data lakes to have governance, semantic consistency, and access controls.

In general my way of looking at it, is that throwing everything there to follow the rule of "no preprocessing, is a general attempt of being more catholic than the pope, or maybe a general tendency to oversimplify the rules. I believe that the idea of "as is", and the power of it goes more in the direction of not doing data filtering or transformation in injection, assuming that we don't really know what are all the possible use cases in the future, so having raw data is good and scalable. But it doesn't mean that having data that we know is corrupted is good, and I believe that quality is a requirement always for data and in all stages should be at least accessible.

This takes me to the next thought: one very repeated idea is that data lake allows schema-on-read (AWS, Intuit, IBM, O'Reilly). Being so, it makes sense to keep as much as possible something with some kind of schema, if we don't want to overcomplicate the life of everyone that will potentially want to use it, otherwise, we could maybe render it in cases useless as the overhead of using it can be discouraging. Actually, the O'Reilly article above, called "the death of schema on read" talks about exactly the complexity added by the lack of governance. So I guess removing some chaos will help the success of the data lake.

So far I think my position is very clear for myself -it was not that much when I started writing the response- but I will try to wrap up with the latest reference, that is an article that I read a few time. Published in gartner.com' press room as early as 2014, it is called "Beware of the Data Lake Fallacy". The whole article is quite interesting, but I will highlight this part

Data lakes, therefore, carry substantial risks. The most important is the inability to determine data quality or the lineage of findings by other analysts or users that have found value, previously, in using the same data in the lake. By its definition, a data lake accepts any data, without oversight or governance. Without descriptive metadata and a mechanism to maintain it, the data lake risks turning into a data swamp.

I agree with that. It is fun at the beginning. Save everything, see you S3 bucket populated, and even run a few queries in Athena or Presto or run some Spark jobs over lots of gzip files and feel that we are in a magic time to live in. But then this small contamination comes, and we accept it, and someday the S3 buckets are not 10 but 100, and the small exceptions are not 2 but 20, and too many things to keep in mind and things get messier and messier.

Eventually this is opinion-based. But I would say usable data will make happier your future self.

Said this, I would go to your options:

  1. Handle the corrupted records inside Spark job X. You said it. That would be hating yourself and your team, cursing them to do a work that could be avoided.

  2. Fix corrupted files on ETL layer and store them at "fixed" layer. You said it, too much overhead. You will continually tempt to delete the first layer. Actually I forecast you would end up with a lifecycle policy to get rid of old objects automatically to save cost.

  3. Seems neat and honest. No one can tell you "that is crazy". The only thing you need to make sure is that actually the data you will delete is not business-related, and there is not a possible use in the future that you cannot figure now. Even in this case, I would follow some approach to play safe:

    • Remove the first two strings from the CSV at Nifi layer, and save the readable data in the "raw" storage layer
    • To protect yourself from the case of "we didn't see this coming" keep a meta-data bucket in which you save simple files with those 2 lines removed, so you can access them in the future if need be, and you can reply to anyone with a different opinion that can say in the future "you shouldn't have deleted that". But I say this because I cannot imagine what those two lines are, maybe this is totally overkilling.

Personally, I love data lakes, and I love the philosophy behind every system but I also like to question everything case by case. I have lots of data in flat files, json, csv, and a lot of production workload based on that. But the most beautiful part of my data lake is not really purely unprocessed data, we found extremely powerful to do a first minimal cleanup, and when possible -for data that has fundamentally inserts and not updates-, also transform it to Parquet or ORC and even compress it with snappy. And I can tell you that I really enjoy using that data, even run queries on it directly. Raw data yes, but usable.

Carlos Robles
  • 10,828
  • 3
  • 41
  • 60
1

I like the philosophy offered in the accepted answer but I'd like to provide a more tactical answer...

  • Use the handle 'bad records' option on the spark read, e.g.:
spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .format("csv")
  .load("/input/csvFile.csv")

Reference "Handling bad records and files"

Reference "CSV files"

You can use this with a schema option .schema(customSchema) code to get a level of schema verification too (and better performance) on the read side of your jobs.

  • To perform schema checks on write, take a look at Delta Lake open source project which has schema on write enforcement and ACID transactions for more reliability.

  • Managed Delta Lake will let you bin pack your small files with the OPTIMIZE command Databricks Delta Lake Optimize command

    • Because of ACID transactions and bin packing, Spark Structured Streaming and Delta Lake work really well together to continue the streaming data acquisition Nifi is performing.
Douglas M
  • 1,035
  • 8
  • 17
  • thanks for your suggestion! Looks like I have to create a "rejected" folder anycase. You solution means that I still need to keep corrupted files at the first "raw" layer. But `badRecordsPath` is pretty simple, so it could be leveradged accross multiple jobs. After thinking a while, I think Governance solution will just tag data, it shouldn't read them (not sure yet). Moreover all data exploration may be done through Databricks, so it's very unlike that we'll use any other MPP engines on top of raw data. So I like your approach. – VB_ May 31 '20 at 21:33
  • the only concern about your approach is that `badRecordsPath` requires `PERMISSIVE` mode, as far as I understand. With Data Lake and schema-on-read I feel myself more safe with `FAILFAST` mode, where `badRecordsPath` may put half of the dataframe into rejected folder and succeed a processing pipeline. Are you suggesting use of `PERMISSIVE` mode + monitoring/alerts instead of `FAILFAST`? – VB_ May 31 '20 at 21:42
  • I thought about jobs may check if their dependencies-jobs have zero corrupted records. But it's a bad approach because: 1) sources with CSV+metadata will be always corrupted (always > than zero rows) 2) `badRecordsPath` use some not configurable date partitioning, so I may need to write a code to find a right partition 3) approach (of checking for corrupted dependencies) may get cumbersome in case jobs may have multiple dependencies – VB_ May 31 '20 at 22:10
  • Imagine a source system delivered JSON as a CSV file, with one column and header record. (This happened 2 weeks ago) The first 'header' record was put into the 'corrupted data' column, the JSON was properly parsed. That said, I like the data quality tagging approach mentioned. Both strategies are compatible. There is data that every downstream consumer considers 'bad' and should be set aside. There are other data quality rules that are a point of view, tags are great there. – Douglas M May 31 '20 at 23:59
  • With 'Big Data' you typically don't have time to stop the world using `FAILFAST` but have to keep going or you run out of time in your operational window. Getting 99% right the first time gives the re-run process more time to complete. It's a challenge all around, great ideas here. – Douglas M Jun 01 '20 at 00:01
  • agree about 99% now. It's hard to get more, especially because of late arrival. In other hand, ETL usually should be as simple as possible. We have only daily processing building data for DWH (no online users). Still I'm really thinking about the possibility to switch to PERMISSIVE. *Could you please clarify what did you mean by tags?* – VB_ Jun 01 '20 at 06:57
  • you see, your approach play very well when data get corrupted from time to time. But in my case, they are corrupted at 100% of cases (because of 2 meta lines). So currently I have lets say two layers: `raw => processed`. Where at `raw` layer the dataset has 500 columns in CSV, while at `processed` it has < 100 columns. Just to clarify we're speaking about the same: *are you suggesting creating a separate layer like `raw (500 cols) => fixed (500 cols) => processed (100 cols)`? Or you're suggesting fixing raw data at every notebook they're going to be used?* – VB_ Jun 01 '20 at 07:00
  • one more quick question: *would you stay with `FAILFAST` (as simpler option) till you must switch to `PERMISSIVE`+monitoring/alerts? Our you're always choosing the second option over `FAILFAST`*? – VB_ Jun 01 '20 at 07:06
  • To me corrupted implies some kind of mangling that isn't anticipated. I think in your case, you expect the first records to be non-compliant, non-parsable. You have to build that into your raw => processed job, just part of the work. At 100% level, it's an expectation rather than an exception. Only do `raw (500 cols) => fixed (500 cols)` if there's a reasonable expectation that the other 400 fields would be used. It's quite reasonable for `raw` to have 10-100x more data fields than `processed` in a data lake. – Douglas M Jun 01 '20 at 16:07
  • On 3rd question on `FAILFAST`, start with what's easiest between you and your users. There's no technical answer to this. – Douglas M Jun 01 '20 at 16:11
  • for us `FAILFAST` is simpler and business looks to be good with this. Okay, looks I was wrong and I have to remove metadata at ETL layer. *Do you have any working solution for removing 2 metadata rows but still go with `FAILFAST` for non-anticipated corrupted records?* – VB_ Jun 01 '20 at 19:07
  • You can tell the spark CSV reader that you have two header records to ignore. – Douglas M Jun 03 '20 at 17:00