1

It would be great if there was a way to get some kind of return object from a Kiba ETL run so that I could use the data in there to return a report on how well the pipeline ran.

We have a job that runs every 10 minutes that processes on average 20 - 50k records, and condenses them into summary records, some of which are created, and some of which are updated. The problem is, it's difficult to know what happened without trawling through reams of log files, and obviously, logs are useful to end users either.

Is there a way to populate some kind of outcome object with arbitrary data, as the pipeline runs? e.g

  • 25.7k rows found in source
  • 782 records dropped by this transformer
  • 100 records inserted
  • 150 records updated
  • 20 records had errors (and here they are)
  • This record had the highest statistic
  • 1200 records belonged to this VIP customer
  • etc.

And then at the end, use that data to send an email summary, populate a web page, render some console output, etc.

Currently, the only way I can see this working right now is to send an object in during setup and mutate it when it's flowing through the sources, transformers, and destinations. Once the run is complete, check the variable afterwards and do something with the data that is now in there.

Is this how it should be done, or is there a better way?

EDIT

Just want to add that I don't want to handle this in the post_process block, because the pipeline gets used via a number of different mediums, and I would want each use case to handle its own feedback mechanism. It's also cleaner (imo) for an ETL pipeline to not have to worry about where it's used, and what that usage scenario's feedback expectations are...

1 Answers1

1

The answer is highly dependent on the context, but here are a few guidelines.

If the outcome object is not too large, indeed I recommend that you pass an empty outcome object (typically a Hash), then populate it during the runs (you could also use some form of middleware to even track the exception itself).

How you will fill it will depend on the context and your actual needs, but this can be done in fairly job-agnostic fashion (maybe using DSL extensions https://github.com/thbar/kiba/wiki/How-to-extend-the-Kiba-DSL, you can achieve some fairly high-level extensions that will register the required transforms or blocks to achieve what you need).

The object can be used as is, or could also be serialised as JSON or similar, even stored into a DB if you need to provide some rich output later (or you could use it to prepare something else).

If needed, you could even have something fairly structured in a specific database, for that purpose (if you need an easy way to expose that to customers, for instance).

Note that you could programmatically define a post_process without the job realising it much (without the coupling). Here is a very simple example:

module ETL
  module DSLExtensions
    module EmailReport
      def setup_email_report
        pre_process do
          @email_report_stats = Hash.new(0)
        end

        post_process do
          # Do the actual email sending
        end
      end

      def track_event!(event:)
        @email_report_stats[event] += 1
      end
    end
  end
end

Kiba.parse do
  extend ETL::DSLExtensions::EmailReport

  # this will register the pre/post process
  setup_email_report

  source ...

  track_event!(event: 'row_read')

  transform
  transform
  transform

  track_event!(event: 'row_written')

  destination ...
end

If you do this, make sure to use very-well namespaces variables, to avoid any conflict.

Note that as discussed before, this doesn't cover the case of failures, but you get the idea!

Thibaut Barrère
  • 8,845
  • 2
  • 22
  • 27
  • Thanks, this was helpful. I eventually settled on creating a context hash outside of the pipeline, sending it in, and adding to it as it went through each pipeline, as I needed to keep track of certain records being discarded during transforms and their reasons. It would've been impossible to figure that out without having access to some state inside the transformer at run time. – Gabriel Fortuna Jul 06 '20 at 13:25