6

I'm trying to convert data stored in S3 as JSON-per-line textfiles to a structured, columnar format like ORC or Parquet on S3.

The source files contain data of multiple schemes (eg. HTTP request, HTTP response, ...), which need to be parsed into different Spark Dataframes of the correct type.

Example schemas:

  val Request = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("requestMethod", StringType),
    StructField("scheme", StringType),
    StructField("host", StringType),
    StructField("headers", MapType(StringType, StringType, valueContainsNull=false)),
    StructField("path", StringType),
    StructField("sessionId", StringType),
    StructField("userAgent", StringType)
  ))

  val Response = StructType(Seq(
    StructField("timestamp", TimestampType, nullable=false),
    StructField("requestId", LongType),
    StructField("contentType", StringType),
    StructField("contentLength", IntegerType),
    StructField("statusCode", StringType),
    StructField("headers", MapType(keyType=StringType, valueType=StringType, valueContainsNull=false)),
    StructField("responseDuration", DoubleType),
    StructField("sessionId", StringType)
  ))

I got that part working fine, however trying to write out the data back to S3 as efficiently as possible seems to be an issue atm.

I tried 3 approaches:

  1. muxPartitions from the silex project
  2. caching the parsed S3 input and looping over it multiple times
  3. making each scheme type a separate partition of the RDD

In the first case, the JVM ran out of memory and in the second one the machine ran out of disk space.

The third I haven't thoroughly tested yet, but this does not seem an efficient use of processing power (as only one node of the cluster (the one on which this particular partition is) would actually be writing the data back out to S3).

Relevant code:

val allSchemes = Schemes.all().keys.toArray

if (false) {
  import com.realo.warehouse.multiplex.implicits._

  val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .flatMuxPartitions(allSchemes.length, data => {
      val buffers = Vector.tabulate(allSchemes.length) { j => ArrayBuffer.empty[Row] }
      data.foreach {
        logItem => {
          val schemeIndex = allSchemes.indexOf(logItem.logType)
          if (schemeIndex > -1) {
            buffers(schemeIndex).append(logItem.row)
          }
        }
      }
      buffers
    })

  allSchemes.zipWithIndex.foreach {
    case (schemeName, index) =>
      val rdd = input(index)

      writeColumnarToS3(rdd, schemeName)
  }
} else if (false) {
  // Naive approach
  val input = readRawFromS3(inputPrefix) // returns RDD[Row]
    .persist(StorageLevel.MEMORY_AND_DISK)

  allSchemes.foreach {
    schemeName =>
      val rdd = input
        .filter(x => x.logType == schemeName)
        .map(x => x.row)

      writeColumnarToS3(rdd, schemeName)
  }

  input.unpersist()
} else {
  class CustomPartitioner extends Partitioner {
    override def numPartitions: Int = allSchemes.length
    override def getPartition(key: Any): Int = allSchemes.indexOf(key.asInstanceOf[String])
  }

    val input = readRawFromS3(inputPrefix)
      .map(x => (x.logType, x.row))
      .partitionBy(new CustomPartitioner())
      .map { case (logType, row) => row }
      .persist(StorageLevel.MEMORY_AND_DISK)

    allSchemes.zipWithIndex.foreach {
      case (schemeName, index) =>
        val rdd = input
          .mapPartitionsWithIndex(
            (i, iter) => if (i == index) iter else Iterator.empty,
            preservesPartitioning = true
          )

        writeColumnarToS3(rdd, schemeName)
    }

    input.unpersist()
}

Conceptually, I think the code should have 1 output DStream per scheme type and the input RDD should pick 'n place each processed item onto the correct DStream (with batching for better throughput).

Does anyone have any pointers as to how to implement this? And/or is there a better way of tackling this problem?

Community
  • 1
  • 1
mcuelenaere
  • 2,109
  • 1
  • 13
  • 10

2 Answers2

0

Given that the input is a json, you can read it into a dataframe of strings (each line is a single string). Then you can extract the type from each json (either by using a UDF or by using a function such as get_json_object or json_tuple).

Now you have two columns: The type and the original json. You can now use partitionBy dataframe option when writing the dataframe. This would result in a directory for each type and the content of the directory would include the original jsons.

Now you can read each type with its own schema.

You can also do a similar thing with RDD using a map which turns the input rdd into a pair rdd with the key being the type and the value being the json converted to the target schema. Then you can use partitionBy and map partition to save each partition to a file or you can use reduce by key to write to different files (e.g. by using the key to set the filename).

You could also take a look at Write to multiple outputs by key Spark - one Spark job

Note that I assumed here that the goal is to split to file. Depending on your specific use case, other options might be viable. For example, if your different schemas are close enough, you can create a super schema which encompasses all of them and create the dataframe directly from that. Then you can either work on the dataframe directly or use the dataframe partitionBy to write the different subtypes to different directories (but this time already saved to parquet).

Community
  • 1
  • 1
Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • Using some kind of unified "super-scheme" crossed my mind as well. But in the end, this seems like a hack. Also, SparkSQL does not natively seem to support an UnionType or something similar. Saving the data as JSON is not a solution, that's what the source data is for. The goal is to both split the source files into multiple separate columnar files and to make sure the data is strongly typed (allowing easy querying with eg Presto). – mcuelenaere Dec 19 '16 at 13:41
  • Then doing map which converts to a key pair with key being the type and value being the object is your best bet. You can then either partition by and map partition or use reduce by key to write directly to the target file – Assaf Mendelson Dec 19 '16 at 14:16
0

This is what I came up with eventually:

I use a custom partitioner to partition the data based on their scheme plus the hashcode of the row.

The reasoning here is that we want to be able to only process certain partitions, yet still allow all nodes to participate (for performance reasons). So we don't spread the data over just 1 partition, but over X partitions (with X being the number of nodes times 2, in this example).

Then for each scheme, we prune the partitions we don't need and thus we will only process the ones we do.

Code example:

def process(date : ReadableInstant, schemesToProcess : Array[String]) = {
  // Tweak this based on your use case
  val DefaultNumberOfStoragePartitions = spark.sparkContext.defaultParallelism * 2

  class CustomPartitioner extends Partitioner {
    override def numPartitions: Int = schemesToProcess.length * DefaultNumberOfStoragePartitions
    override def getPartition(key: Any): Int = {
      // This is tightly coupled with how `input` gets transformed below
      val (logType, rowHashCode) = key.asInstanceOf[(String, Int)]
      (schemesToProcess.indexOf(logType) * DefaultNumberOfStoragePartitions) + Utils.nonNegativeMod(rowHashCode, DefaultNumberOfStoragePartitions)
    }

    /**
      * Internal helper function to retrieve all partition indices for the given key
      * @param key input key
      * @return
      */
    private def getPartitions(key: String): Seq[Int] = {
      val index = schemesToProcess.indexOf(key) * DefaultNumberOfStoragePartitions
      index until (index + DefaultNumberOfStoragePartitions)
    }

    /**
      * Returns an RDD which only traverses the partitions for the given key
      * @param rdd base RDD
      * @param key input key
      * @return
      */
    def filterRDDForKey[T](rdd: RDD[T], key: String): RDD[T] = {
      val partitions = getPartitions(key).toSet
      PartitionPruningRDD.create(rdd, x => partitions.contains(x))
    }
  }

  val partitioner = new CustomPartitioner()
  val input = readRawFromS3(date)
    .map(x => ((x.logType, x.row.hashCode), x.row))
    .partitionBy(partitioner)
    .persist(StorageLevel.MEMORY_AND_DISK_SER)

  // Initial stage: caches the processed data + gets an enumeration of all schemes in this RDD
  val schemesInRdd = input
    .map(_._1._1)
    .distinct()
    .collect()

  // Remaining stages: for each scheme, write it out to S3 as ORC
  schemesInRdd.zipWithIndex.foreach {
    case (schemeName, index) =>
      val rdd = partitioner.filterRDDForKey(input, schemeName)
        .map(_._2)
        .coalesce(DefaultNumberOfStoragePartitions)

      writeColumnarToS3(rdd, schemeName)
  }

  input.unpersist()
}
mcuelenaere
  • 2,109
  • 1
  • 13
  • 10