4

I frequently find myself needing to generate parquet files for testing infrastructure components like Hive, Presto, Drill, etc.

There are surprisingly few sample parquet data sets online, and one of the only ones I come across here https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet is mock data for credit card numbers, incomes, etc. I don't like having that in my data lakes in case someone thinks its real.

What is the best way to generate parquet data files when you need to test? I usually have spark around and end up using that; and I'll post my solution as an answer since one doesn't seem to exist here. But I'm curious what better solutions people have using spark or other technologies.

John Humphreys
  • 37,047
  • 37
  • 155
  • 255
  • 1
    Have you heard about spark-testing-base package by Holden Karau? Besides stuff for testing itself, there are several generators for generating RDDs and Data Frames. This generators are based on ScalaCheck package and may be used for random datasets as well as semi-random. It is totally deserves your attention! – Pavel Filatov Jun 06 '19 at 20:54
  • That sounds awesome, and I’ve never heard of it! I’ll take a look :). You should make this an answer. – John Humphreys Jun 06 '19 at 21:07
  • Oh thank you! I will write some code tomorrow to demonstrate it in action) – Pavel Filatov Jun 06 '19 at 21:12

4 Answers4

3

I guess the main goal is to generate data, not to write it in a certain format.

Let's start with a very simple example.

To generate an arbitrary DataFrame, the first thing you need is its schema. Hereafter I will use a very simple schema modelling some user transactions.

val transactionsSchema: StructType = new StructType()
    .add("user_id", IntegerType)
    .add("ts", TimestampType)
    .add("amount", DoubleType)

Package com.holdenkarau.spark.testing has an object DataframeGenerator. This object has two methods two generate DataFrames: .arbitraryDataFrame (fully random result) and .arbitraryDataFrameWithCustomFields (where you can set custom generators for given attributes, others will be generated automatically).

DataFrame generator gets sqlContext and schema as inputs.

val transactionsDFGenerator: Arbitrary[DataFrame] =
    DataframeGenerator.arbitraryDataFrame(spark.sqlContext, transactionsSchema)

And the function to get random DataFrame.

def generateTransactionsDF(): DataFrame =
    transactionsDFGenerator
      .arbitrary(Gen.Parameters.default, Seed(100), 10)
      .get

And this is the resulting dataset:

+-----------+------------------------------+-----------------------+
|user_id    |ts                            |amount                 |
+-----------+------------------------------+-----------------------+
|-375726664 |1970-01-01 03:00:00.001       |-2.9945060451319086E271|
|0          |1970-01-01 02:59:59.999       |-4.774320614638788E-237|
|1          |215666-12-06 17:54:3333.972832|8.78381185978856E96    |
|-2147483648|1970-01-01 03:00:00.001       |1.6036825986813454E58  |
|568605722  |219978-07-03 23:47:3737.050592|6.632020739877623E-165 |
|-989197852 |1970-01-01 03:00:00.001       |8.92083260179676E233   |
|-2147483648|264209-01-26 00:54:2525.980256|-7.986228470636884E-216|
|0          |145365-06-27 03:25:5656.721168|-5.607570396263688E-45 |
|-1         |1970-01-01 02:59:59.999       |2.4723152616146036E-227|
|-2147483648|4961-05-03 05:19:42.439408    |1.9109576041021605E83  |
+-----------+------------------------------+-----------------------+

Full code:

import co.featr.sia.utils.spark.getSparkSession
import com.holdenkarau.spark.testing.DataframeGenerator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalacheck.rng.Seed
import org.scalacheck.{Arbitrary, Gen}

object GenerateData {
  Logger.getLogger("org").setLevel(Level.OFF)
  def main(args: Array[String]): Unit = {
    val spark = spark.builder.master("local").getOrCreate()
    val runner = new GenerateData(spark)
    runner.run()
  }
}

class GenerateData(spark: SparkSession) {

  def run(): Unit = {
    val df: DataFrame = generateTransactionsDF()
    df.show(10, false)
  }

  def generateTransactionsDF(): DataFrame =
    transactionsDFGenerator
      .arbitrary(Gen.Parameters.default, Seed(100))
      .get

  lazy val transactionsDFGenerator: Arbitrary[DataFrame] =
    DataframeGenerator.arbitraryDataFrame(spark.sqlContext, transactionsSchema, 10)

  lazy val transactionsSchema: StructType = new StructType()
    .add("user_id", IntegerType)
    .add("ts", TimestampType)
    .add("amount", DoubleType)
}
Pavel Filatov
  • 586
  • 3
  • 6
3

The farsante library lets you generate fake PySpark / Pandas datasets that can easily be written out in the Parquet file format. Here's an example:

import farsante
from mimesis import Person
from mimesis import Address
from mimesis import Datetime

person = Person()
address = Address()
datetime = Datetime()
df = farsante.pyspark_df([person.full_name, person.email, address.city, address.state, datetime.datetime], 3)
df.write.mode('overwrite').parquet('./tmp/spark_fake_data')

It's easier to simply use Pandas to write out sample Parquet files. Spark isn't needed for a task like this.

df = farsante.pandas_df([person.full_name, person.email, address.city, address.state, datetime.datetime], 3)
df.to_parquet('./tmp/fake_data.parquet', index=False)

Looks like there is a Scala faker library but it doesn't look nearly as mature as the mimesis library. Go has good faker and Parquet libraries so that's another option for generating fake data.

Powers
  • 18,150
  • 10
  • 103
  • 108
0

My normal solution to this problem is to use Spark and a mutable list in Scala to build up some simple sample data. I introduce dates and various other data types as needed, but this is how I usually go about it.

Basically, I just turn the mutable list into a data frame and coalesce to the number of target files I need in the output and then save to parquet.

//Create a mutable list buffer based on a loop.
import scala.collection.mutable.ListBuffer
var lb = ListBuffer[(Int, Int, String)]()
for (i <- 1 to 5000) {
  lb += ((i, i*i, "Number is " + i + "."))
}

//Convert it to a data frame.
import spark.implicits._
val df = lb.toDF("value", "square", "description")

df.coalesce(5).write.mode(SaveMode.Overwrite).parquet("<your-hdfs-path>/name.parquet")

It would be very nice to have a way of doing this without spark though. Also, if I wanted much larger data sets I'd have to modify this to avoid generating the records all in the driver; this is more for small to mid-size data sets.

John Humphreys
  • 37,047
  • 37
  • 155
  • 255
0

The pyarrow library for Python allows you to write parquet from a pandas DataFrame with just a few lines of code.

https://arrow.apache.org/docs/python/parquet.html

Bryan Johnson
  • 651
  • 5
  • 5