0

We need to convert text data into parquet/avro on daily basis where the input comes from multiple sources has different structure we would like to have spark sql based scala code to achieve this irrespective of the delimiter and number of columns or structure.

  • multiple sources ? jdbc, s3 or datalake ? are you dumping all data into one particular location or it can be dynamic ? – maogautam Oct 04 '19 at 19:07
  • We are getting text data into HDFS, each source has different structure and the data is delimited. As part of our we converting the delimited text to parquet..to do we are using spark SQL with structype. – sangam.gavini Oct 04 '19 at 19:11
  • How r you maintaining structype for various types? does your existing data has header in it ? – maogautam Oct 04 '19 at 19:14
  • Through a spark scala program we are defining structure..for each source.. – sangam.gavini Oct 04 '19 at 19:27

2 Answers2

1

After analyzing your problem statement, i'm making following assumptions,

1. data source can be anything, primarily HDFS
2. delimiter can be anything
3. you're maintaining  structure for each source. 
4. file does not contains header

Suggestion: here problem is you have to generate StructType if your data doesn't contains header. Come up with some structure may be json structure to define your data source. Then load and parse json using jackson using scala. Or simply pass column_map to your program.

Example: 
{
    "inputLocation": "",
    "delimiter" : ",",
    "column_map" : "col1, datatype; col12, datatype;col1, datatype; col12, datatype"
    "outputLocation": ""
}

Now, use column_map to generate your struct type dynamically.

object GenerateStructType {

  import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}

  def generateStrucType(columnsList: Seq[String]): StructType = {

    val res=columnsList.map( columnDetail => {
      val  columnName = columnDetail.split(",")(0).trim
      val  columnType = columnDetail.split(",")(1).trim

      columnType match {
        case "String" => StructField(columnName,StringType,true)
        case "Bool" => StructField(columnName,BooleanType,true)
        case _ => StructField(columnName,StringType,true)

      }
    })
    StructType(res)
  }

  def main(args: Array[String]): Unit = {
    val columnMap=  "col1, datatype; col12, datatype;col1, datatype; col12, datatype"

    val result= GenerateStructType.generateStrucType(    columnMap.split(";"))
    println(result)
  }

}

dynamically generated StructType:

StructType(StructField(col1,StringType,true), StructField(col12,StringType,true), StructField(col1,StringType,true), StructField(col12,StringType,true))

use the struct type while loading data.

Hope this helps ....

maogautam
  • 318
  • 1
  • 13
  • Thanks for your time and effort to bring a solution for this. With this we are able to generate the stucttype but still we have issue in generating a row rdd to create a dataframe. - I have found a solution and posted that as an answer please have a look at it, and let me know if you find anything. Thanks again! – sangam.gavini Oct 05 '19 at 06:04
0

I have written this code in spark 2.1.0 - Spark SQL

Input used

1238769|Michael|Hoffman|50000|New York
1238769|Michael1|Hoffman1|50000|New York1
1238770|Michael2|Hoffman2|50000|New York2
1238771|Michael3|Hoffman3|50000|New York3
1238772|Michael4|Hoffman4|50000|New York4
1238773|Michael5|Hoffman5|50000|New York5
1238774|Michael6|Hoffman6|50000|New York6
1238775|Michael7|Hoffman7|50000|New York7
1238776|Michael8|Hoffman8|50000|New York8
1238777|Michael9|Hoffman9|50000|New York9

In this example i am going to convert a pipe("|") text file into a parquet

Step #1: Reading the input variables

//creating spark session
val spark = SparkSession.builder().appName("Text to Parquet").master("local[*]").getOrCreate()
import spark.implicits._

//Assigning values to the variables
val input_location = args(0).trim.toString()
val delimiter = "\\|" //You can make it dynamic by passing it as an argument
val selectColString_location = args(1).trim().toString()
val output_location = args(2).trim().toString()

Step #2: Reading input text data and splitting as per the delimiter

//Reading data from text file
val input_rdd = spark.sparkContext.textFile(input_location)

//Split the input data using the delimiter(we are suing pipe(\\|) as delimiter for this example)
val input_array_rdd:RDD[Array[String]] = input_rdd.map(x => x.split(delimiter, -1))

Step #3: Converting rdd created in step #2 into dataframe using toDF with only a single column - col, which will be an array column

//Converting input_array_rdd into dataframe with only one column - col
val input_df:DataFrame = input_array_rdd.toDF("col")

//Creating temp table on top of input_df with the name TABLE1
input_df.createOrReplaceTempView("TABLE1")

Step #4: Preparing an select statement as per the input structure using the temp table - TABLE1 and the array column - col & saving this in a text file as a single row

select cast(col[0] as bigint) as cust_id, col[1] as first_name, col[2] as last_name, cast(col[3] as decimal(18,6)) as amount, col[4] as city from table1

Step #5: Reading the select statement from the file and executing it to generate output

//Reading the selectColString, remember we are reading only the first row from the file
//Select SQL should be only one row in the selectColString.txt file
val sqlColString = spark.sparkContext.textFile(selectColString_location).first().toString()
//Generating the output using the colString
val output_df = spark.sql(sqlColString)

Step #6: Writing the output as parquet

output_df.write.mode(SaveMode.Overwrite).parquet(output_location)

Output parquet schema

root
 |-- cust_id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- amount: decimal(18,6) (nullable = true)
 |-- city: string (nullable = true)

With this single program we are able to convert all our text files to parquet by just modifying the selectColString file as per the input text.

Github Code Link: https://github.com/sangamgavini/ReusableCodes/tree/master/src/main/scala/com/sangam/TexttoParquet

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • instead of loading file as a text file load as a csv and provide delimiter. see https://stackoverflow.com/questions/39926411/provide-schema-while-reading-csv-file-as-a-dataframe. – maogautam Oct 05 '19 at 06:14