EDIT: it's 2020, and Kiba ETL v3 includes a much better way to do this. Check out this article https://thibautbarrere.com/2020/03/05/new-in-kiba-etl-v3 for all the relevant information.
Kiba author here! You can achieve that in many different ways, depending mainly on the data size and your actual needs. Here are a couple of possibilities.
Aggregating using a variable in your Kiba script
require 'awesome_print'
transform do |r|
r[:amount] = BigDecimal.new(r[:amount])
r
end
total_amounts = Hash.new(0)
transform do |r|
total_amounts[r[:name]] += r[:amount]
r
end
post_process do
# pretty print here, but you could save to a CSV too
ap total_amounts
end
This is the simplest way, yet this is quite flexible.
It will keep your aggregates in memory though, so this may be good enough or not, depending on your scenario. Note that currently Kiba is mono-threaded (but "Kiba Pro" will be multi-threaded), so there is no need to add a lock or use a thread-safe structure for the aggregate, for now.
Calling TextQL from post_process blocks
Another quick and easy way to aggregate is to generate a non-aggregated CSV file first, then leverage TextQl to actually do the aggregation, like this:
destination CsvSource, 'non-aggregated-output.csv', [:name, :amount]
post_process do
query = <<SQL
select
name,
/* apparently sqlite has reduced precision, round to 2 for now */
round(sum(amount), 2) as total_amount
from tbl group by name
SQL
textql('non-aggregated-output.csv', query, 'aggregated-output.csv')
end
With the following helpers defined:
def system!(cmd)
raise "Failed to run command #{command}" unless system(command)
end
def textql(source_file, query, output_file)
system! "cat #{source_file} | textql -header -output-header=true -sql \"#{query}\" > #{output_file}"
# this one uses csvfix to pretty print the table
system! "cat #{output_file} | csvfix ascii_table"
end
Be careful with the precision though when doing computations.
Writing an in-memory aggregating destination
A useful trick that can work here is to wrap a given destination with a class to do the aggregation. Here is how it could look like:
class InMemoryAggregate
def initialize(sum:, group_by:, destination:)
@aggregate = Hash.new(0)
@sum = sum
@group_by = group_by
# this relies a bit on the internals of Kiba, but not too much
@destination = destination.shift.new(*destination)
end
def write(row)
# do not write, but count here instead
@aggregate[row[@group_by]] += row[@sum]
end
def close
# use close to actually do the writing
@aggregate.each do |k,v|
# reformat BigDecimal additions here
value = '%0.2f' % v
@destination.write(@group_by => k, @sum => value)
end
@destination.close
end
end
which you can use this way:
# convert your string into an actual number
transform do |r|
r[:amount] = BigDecimal.new(r[:amount])
r
end
destination CsvDestination, 'non-aggregated.csv', [:name, :amount]
destination InMemoryAggregate,
sum: :amount, group_by: :name,
destination: [
CsvDestination, 'aggregated.csv', [:name, :amount]
]
post_process do
system!("cat aggregated.csv | csvfix ascii_table")
end
The nice thing about this version is that you can reuse your aggregator with different destinations (like a database one, or anything else).
Note though that this will keep all the aggregates in memory, like the first version.
Inserting into a store with aggregating capabilities
Another way (especially useful if you have very large volumes) is to send the resulting data into something that will be able to aggregate the data for you. It could be a regular SQL database, Redis, or anything more fancy, which you would then be able to query as needed.
So as I said, the implementation will largely depend on your actual needs. Hope you will find something that works for you here!