-2

How can I use SparkContext (to create SparkSession or Cassandra Sessions) on executors?

If I pass it as a parameter to the foreach or foreachPartition, then it will have a null value. Shall I create a new SparkContext in each executor?

What I'm trying to do is as follows:

Read a dump directory with millions of XML files:

dumpFiles = Directory.listFiles(dumpDirectory)
dumpFilesRDD = sparkContext.parallize(dumpFiles, numOfSlices)
dumpFilesRDD.foreachPartition(dumpFilePath->parse(dumpFilePath))

In parse(), every XML file is validated, parsed and inserted into several tables using Spark SQL. Only valid XML files will present objects of same type that can be saved. Portion of the data needs to be replaced by other keys before being inserted into one of the tables.

In order to do that, SparkContext is needed in the function parse to use sparkContext.sql().

Erick Ramirez
  • 13,964
  • 1
  • 18
  • 23
fattah.safa
  • 926
  • 2
  • 14
  • 36

2 Answers2

4

If I'd rephrase your question, what you want is to:

  1. Read a directory with millions of XML files
  2. Parse them
  3. Insert them into a database

That's a typical Extract, Transform and Load (ETL) process that terribly easy in Spark SQL.

Loading XML files can be done using a separate package spark-xml:

spark-xml A library for parsing and querying XML data with Apache Spark, for Spark SQL and DataFrames. The structure and test tools are mostly copied from CSV Data Source for Spark.

You can "install" the package using --packages command-line option:

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.11:0.4.1

Quoting spark-xml's Scala API (with some changes to use SparkSession instead):

// Step 1. Loading XML files
val path = "the/path/to/millions/files/*.xml"
val spark: SparkSession = ???
val files = spark.read
  .format("com.databricks.spark.xml")
  .option("rowTag", "book")
  .load(path)

That makes the first requirement almost no-brainer. You've got your million XML files taken care by Spark SQL.

Step 2 is about parsing the lines (from the XML files) and marking rows to be saved to appropriate tables.

// Step 2. Transform them (using parse)
def parse(line: String) = ???
val parseDF = files.map { line => parse(line) }

Your parse function could return something (as the main result) and the table that something should be saved to.

With the table markers, you split the parseDF into DataFrames per table.

val table1DF = parseDF.filter($"table" === "table1")

And so on (per table).

// Step 3. Insert into DB    
table1DF.write.option(...).jdbc(...)

That's just a sketch of what you may really be after, but that's the general pattern to follow. Decompose your pipeline into digestable chunks and tackle one chunk at a time.

Jacek Laskowski
  • 72,696
  • 27
  • 242
  • 420
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/145319/discussion-between-fattah-safa-and-jacek-laskowski). – fattah.safa May 28 '17 at 18:56
2

It is important to keep in mind that in Spark we are not supposed to program in terms of executors.

In Spark programming model, your driver program is mostly a self-contained program where certain sections will be automatically converted to a physical execution plan. Ultimately a bunch of tasks distributed across worker/executors.

When you need to execute something for each partition, you can use something like mapPartitions(). Refer Spark : DB connection per Spark RDD partition and do mapPartition for further details. Pay attention to how the dbConnection object is enclosed in the function body.

It is not clear what you mean by a parameter. If it is just data (not a DB connection or similar), I think you need to use a boradcast variable.

Tharaka
  • 2,883
  • 1
  • 21
  • 13