3

I'm busy working through an ETL pipeline, but for this particular problem, I need to take a table of data, and turn each column into a set - that is, a unique array.

I'm struggling to wrap my head around how I would accomplish this within the Kiba framework.

Here's the essence of what I'm trying to achieve:

Source:

[
  { dairy: "Milk",   protein: "Steak",   carb: "Potatoes" },
  { dairy: "Milk",   protein: "Eggs",    carb: "Potatoes" },
  { dairy: "Cheese", protein: "Steak",   carb: "Potatoes" },
  { dairy: "Cream",  protein: "Chicken", carb: "Potatoes" },
  { dairy: "Milk",   protein: "Chicken", carb: "Pasta" },
]

Destination

{
  dairy:   ["Milk", "Cheese", "Cream"],
  protein: ["Steak", "Eggs", "Chicken"],
  carb:    ["Potatoes", "Pasta"],
}

Is something like this a) doable in Kiba, and b) even advisable to do in Kiba?

Any help would be greatly appreciated.

Update - partially solved.

I've found a partial solution. This transformer class will transform a table of rows into a hash of sets, but I'm stuck on how to get that data out using an ETL Destination. I suspect I'm using Kiba in a way in which it's not intended to be used.

class ColumnSetTransformer
  def initialize
    @col_set = Hash.new(Set.new)
  end

  def process(row)
    row.each do |col, col_val|
      @col_set[col] = @col_set[col] + [col_val]
    end

    @col_set
  end
end 

2 Answers2

2

Your solution will work just fine, and indeed the reason to have such a design in Kiba (mostly "Plain Old Ruby Objects") is to make it easy to call the components yourself, should you need it! (this is very useful for testing!).

That said here are a few extra possibilities.

What you are doing is a form of aggregation, which can be implemented in various ways.

Buffering destination

Here the buffer would be a single row, actually. Use a code such as:

class MyBufferingDestination
  attr_reader :single_output_row

  def initialize(config:)
    @single_output_row = []
  end

  def write(row)
    row.each do |col, col_val|
      single_output_row[col] += [col_val]
    end
  end

  def close # will be called by Kiba at the end of the run
    # here you'd write your output
  end
end

Using an instance variable to aggregate + post_process block

pre_process do
  @output_row = {}
end

transform do |row|
  row.each do |col, col_val|
    @output_row = # SNIP
  end      
  row
end

post_process do
  # convert @output_row to something
  # you can invoke a destination manually, or do something else
end

Soon possible: using a buffering transform

As described here, it will soon be possible to create buffering transforms, to better decouple the aggregating mechanism from the destination itself.

It will go like this:

class MyAggregatingTransform
  def process(row)
    @aggregate += xxx
    nil # remove the row from the pipeline
  end

  def close
    # not yet possible, but soon
    yield @aggregate
  end
end

This will be the best design, because then you'll be able to reuse existing destinations, without modifying them to support buffering, so they'll become more generic & reusable:

transform MyAggregatingTransform

destination MyJSONDestination, file: "some.json"

It will even be possible to have multiple rows in the destination, by detecting boundaries in the input dataset, & yielding accordingly.

I will update the SO answer once this is possible.

Thibaut Barrère
  • 8,845
  • 2
  • 22
  • 27
  • Thanks Thibaut, I like this answer a lot... It should be noted that I am not attempting to write to JSON or anything, but rather just trying to get a data structure out of the ETL pipeline. I'm definitely going to try your approach once Kiba gets this feature. – Gabriel Fortuna Mar 22 '18 at 11:33
  • Got you! Yes, you can just "print" or do whatever you want, it will work equally. Thanks for your feedback! Please consider marking the answer as accepted if that's a good match! – Thibaut Barrère Mar 22 '18 at 12:55
  • I went with a mix of your first approach and my solution. The buffered destination coupled with running the classes outside of a Kiba.job context allowed me to pull out the final attr_reader in an easy manner. Again, your clean and simple interface really paid off big time. Thank you so much! – Gabriel Fortuna Mar 22 '18 at 13:42
  • Thanks for the feedback! Much appreciated & glad you found something that works nicely for you! – Thibaut Barrère Mar 22 '18 at 14:15
0

OK - So, using Kiba within a job context doesn't seem to be the way this tool was intended to be used. I wanted to use Kiba because I've already implemented a lot of related E, T, and L code for this project, and the reuse would be huge.

So, if I've got the code to reuse, but I can't use it within the Kiba framework, I can just call it as if it was normal code. This is all thanks to Thibaut's excellently simple design!

Here's how I solved the problem:

source  = CSVOrXLSXSource.new("data.xlsx", document_config: { some: :settings })
xformer = ColumnSetTransformer.new

source.each do |row|
  xformer.process(row)
end

p xformer.col_set # col_set must be attr_reader on this class.

And now I have my data handily transformed :)