11

I am about doing some signal analysis with Hadoop/Spark and I need help on how to structure the whole process.

Signals are now stored in a database, that we will read with Sqoop and will be transformed in files on HDFS, with a schema similar to:

<Measure ID> <Source ID> <Measure timestamp> <Signal values>

where signal values are just string made of floating point comma separated numbers.

000123  S001  2015/04/22T10:00:00.000Z  0.0,1.0,200.0,30.0 ... 100.0
000124  S001  2015/04/22T10:05:23.245Z  0.0,4.0,250.0,35.0 ... 10.0
...
000126  S003  2015/04/22T16:00:00.034Z  0.0,0.0,200.0,00.0 ... 600.0

We would like to write interactive/batch queries to:

apply aggregation functions over signal values

SELECT *
FROM SIGNALS
WHERE MAX(VALUES) > 1000.0

To select signals that had a peak over 1000.0.

apply aggregation over aggregation

SELECT SOURCEID, MAX(VALUES) 
FROM SIGNALS
GROUP BY SOURCEID
HAVING MAX(MAX(VALUES)) > 1500.0

To select sources having at least a single signal that exceeded 1500.0.

apply user defined functions over samples

SELECT *
FROM SIGNALS
WHERE MAX(LOW_BAND_FILTER("5.0 KHz", VALUES)) > 100.0)

to select signals that after being filtered at 5.0 KHz have at least a value over 100.0.

We need some help in order to:

  1. find the correct file format to write the signals data on HDFS. I thought to Apache Parquet. How would you structure the data?
  2. understand the proper approach to data analysis: is better to create different datasets (e.g. processing data with Spark and persisting results on HDFS) or trying to do everything at query time from the original dataset?
  3. is Hive a good tool to make queries such the ones I wrote? We are running on Cloudera Enterprise Hadoop, so we can also use Impala.
  4. In case we produce different derivated dataset from the original one, how we can keep track of the lineage of data, i.e. know how the data was generated from the original version?

Thank you very much!

Ameba Spugnosa
  • 1,204
  • 2
  • 11
  • 25

2 Answers2

5

1) Parquet as columnar format is good for OLAP. Spark support of Parquet is mature enough for production use. I suggest to parse string representing signal values into following data structure (simplified):

 case class Data(id: Long, signals: Array[Double])
 val df = sqlContext.createDataFrame(Seq(Data(1L, Array(1.0, 1.0, 2.0)), Data(2L, Array(3.0, 5.0)), Data(2L, Array(1.5, 7.0, 8.0))))

Keeping array of double allows to define and use UDFs like this:

def maxV(arr: mutable.WrappedArray[Double]) = arr.max
sqlContext.udf.register("maxVal", maxV _)
df.registerTempTable("table")

sqlContext.sql("select * from table where maxVal(signals) > 2.1").show()
+---+---------------+
| id|        signals|
+---+---------------+
|  2|     [3.0, 5.0]|
|  2|[1.5, 7.0, 8.0]|
+---+---------------+

sqlContext.sql("select id, max(maxVal(signals)) as maxSignal from table group by id having maxSignal > 1.5").show()
+---+---------+
| id|maxSignal|
+---+---------+
|  1|      2.0|
|  2|      8.0|
+---+---------+

Or, if you want some type-safety, using Scala DSL:

import org.apache.spark.sql.functions._
val maxVal = udf(maxV _)
df.select("*").where(maxVal($"signals") > 2.1).show()
df.select($"id", maxVal($"signals") as "maxSignal").groupBy($"id").agg(max($"maxSignal")).where(max($"maxSignal") > 2.1).show()
+---+--------------+
| id|max(maxSignal)|
+---+--------------+
|  2|           8.0|
+---+--------------+

2) It depends: if size of your data allows to do all processing in query time with reasonable latency - go for it. You can start with this approach, and build optimized structures for slow/popular queries later

3) Hive is slow, it's outdated by Impala and Spark SQL. Choice is not easy sometimes, we use rule of thumb: Impala is good for queries without joins if all your data stored in HDFS/Hive, Spark has bigger latency but joins are reliable, it supports more data sources and has rich non-SQL processing capabilities (like MLlib and GraphX)

4) Keep it simple: store you raw data (master dataset) de-duplicated and partitioned (we use time-based partitions). If new data arrives into partition and your already have downstream datasets generated - restart your pipeline for this partition.

Hope this helps

Vitalii Kotliarenko
  • 2,947
  • 18
  • 26
  • Thank you, I am going to try your suggested approaches. Starting from scratch **today**, would you suggest to skip Hive in favour of Spark SQL? – Ameba Spugnosa Apr 27 '16 at 09:15
  • Definitely, I would choose from Spark SQL, Impala and just released [Hive-on-Spark](https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started). You can mix technologies: use Spark for batch processing preparing your data, and then query prepared data with Impala or [Presto](https://prestodb.io/) – Vitalii Kotliarenko Apr 27 '16 at 09:24
0

First, I believe Vitaliy's approach is very good in every aspect. (and I'm all for Spark)

I'd like to propose another approach, though. The reasons are:

  1. We want to do Interactive queries (+ we have CDH)
  2. Data is already structured
  3. The need is to 'analyze' and not quite 'processing' of the data. Spark could be an overkill if (a) data being structured, we can form sql queries faster and (b) we don't want to write a program every time we want to run a query

Here are the steps I'd like to go with:

  1. Ingestion using sqoop to HDFS: [optionally] use --as-parquetfile
  2. Create an External Impala table or an Internal table as you wish. If you have not transferred the file as parquet file, you can do that during this step. Partition by, preferably Source ID, since our groupings are going to happen on that column.

So, basically, once we've got the data transferred, all we need to do is to create an Impala table, preferably in parquet format and partitioned by the column that we're going to use for grouping. Remember to do compute statistics after loading to help Impala run it faster.

Moving data: - if we need to generate feed out of the results, create a separate file - if another system is going to update the existing data, then move the data to a different location while creating->loading the table - if it's only about queries and analysis and getting reports (i.e, external tables suffice), we don't need to move the data unnecessarily - we can create an external hive table on top of the same data. If we need to run long-running batch queries, we can use Hive. It's a no-no for interactive queries, though. If we create any derived tables out of queries and want to use through Impala, remember to run 'invalidate metadata' before running impala queries on the hive-generated tables

Lineage - I have not gone deeper into it, here's a link on Impala lineage using Cloudera Navigator

Partha Mishra
  • 304
  • 3
  • 10