0

Basically i need to create output file based on the DataPartition column.Last column in the data frame

So First row and last row will be saved in Fundamental.Fundamental.Fundamental.Japan.1.2018-09-24-0937.Full.txt and middle row will be saved in Fundamental.Fundamental.Fundamental.ThirdParty.1.2018-09-24-0937.Full.txt

+--------------------------------+--------------+---------------------------+-------------------------+--------+------------------+--------+-----------------+---------------+--------------------------+---------------------------+---------------+-------------------------+-----------------------------+-----------------------------------+-----------------------+----------------------------+----------------------------------+--------------------+----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+
|Fundamental_uniqueFundamentalSet|OrganizationId|OrganizationId_objectTypeId|OrganizationId_objectType|GaapCode|ConsolidationBasis|IsFiling|NonFilingDateTime|NonFilingReason|PrimaryReportingEntityCode|TotalPrimaryReportingShares|LocalLanguageId|Fundamental_effectiveFrom|Fundamental_effectiveFromPlus|Fundamental_effectiveFromPlusNaCode|Fundamental_effectiveTo|Fundamental_effectiveToMinus|Fundamental_effectiveToMinusNACode|ConsolidationBasisId|GaapCodeId|FFAction|!||DataPartition                                                                                                                                   |
+--------------------------------+--------------+---------------------------+-------------------------+--------+------------------+--------+-----------------+---------------+--------------------------+---------------------------+---------------+-------------------------+-----------------------------+-----------------------------------+-----------------------+----------------------------+----------------------------------+--------------------+----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+
|192730241374                    |4295877894    |404010                     |Organization             |JPG     |Consolidated      |true    |                 |               |A51EF                     |117588807.00000            |505126         |2013-06-29T00:55:15Z     |                             |                                   |9999-12-31T00:00:00Z   |                            |                                  |3013598             |3011577   |I|!|       |file:/C:/Users/u6034690/Desktop/SPARK/trfsmallfffile/Fundamental/FINALSPARK/Fundamental.Fundamental.Fundamental.Japan.1.2018-09-24-0937.Full.txt|
|192730391384                    |4295877894    |404010                     |Organization             |AOG     |Consolidated      |true    |                 |               |A51EF                     |117588807.00000            |505126         |2018-09-19T09:51:46Z     |                             |                                   |9999-12-31T00:00:00Z   |                            |                                  |3013598             |1003042842|I|!|       |file:/C:/Users/u6034690/Desktop/SPARK/trfsmallfffile/Fundamental/FINALSPARK/Fundamental.Fundamental.Fundamental.ThirdParty.1.2018-09-24-0937.Full.txt|
|192730241373                    |4295877894    |404010                     |Organization             |JPG     |Parent            |true    |                 |               |A51EF                     |117588807.00000            |505126         |2013-06-29T00:55:15Z     |                             |                                   |9999-12-31T00:00:00Z   |                            |                                  |3013599             |3011577   |I|!|       |file:/C:/Users/u6034690/Desktop/SPARK/trfsmallfffile/Fundamental/FINALSPARK/Fundamental.Fundamental.Fundamental.Japan.1.2018-09-24-0937.Full.txt|
+--------------------------------+--------------+---------------------------+-------------------------+--------+------------------+--------+-----------------+---------------+--------------------------+---------------------------+---------------+-------------------------+-----------------------------+-----------------------------------+-----------------------+----------------------------+----------------------------------+--------------------+----------+-----------+------------------------------------------------------------------------------------------------------------------------------------------------+

Something this sort of things i am looking for which is not working .

import org.apache.hadoop.io.NullWritable
import org.apache.spark.HashPartitioner
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

class RddMultiTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = key.asInstanceOf[String]
}

dataframe.partitionBy(new HashPartitioner(noOfHashPartitioner)).saveAsHadoopFile(output, classOf[String], classOf[String], classOf[RddMultiTextOutputFormat], classOf[GzipCodec])

Expected Output.

Fundamental.uniqueFundamentalSet|^|OrganizationId|^|OrganizationId.objectTypeId|^|OrganizationId.objectType|^|GaapCode|^|ConsolidationBasis|^|IsFiling|^|NonFilingDateTime|^|NonFilingReason|^|PrimaryReportingEntityCode|^|TotalPrimaryReportingShares|^|LocalLanguageId|^|Fundamental.effectiveFrom|^|Fundamental.effectiveFromPlus|^|Fundamental.effectiveFromPlusNaCode|^|Fundamental.effectiveTo|^|Fundamental.effectiveToMinus|^|Fundamental.effectiveToMinusNACode|^|ConsolidationBasisId|^|GaapCodeId|^|FFAction|!|
192730241373|^|4295877894|^|404010|^|Organization|^|JPG|^|Parent|^|True|^||^||^|A51EF|^|117588807.00000|^|505126|^|2013-06-29T00:55:15Z|^||^||^|9999-12-31T00:00:00Z|^||^||^|3013599|^|3011577|^|I|!|
192730241374|^|4295877894|^|404010|^|Organization|^|JPG|^|Consolidated|^|True|^||^||^|A51EF|^|117588807.00000|^|505126|^|2013-06-29T00:55:15Z|^||^||^|9999-12-31T00:00:00Z|^||^||^|3013598|^|3011577|^|I|!|
Sudarshan kumar
  • 1,503
  • 4
  • 36
  • 83

1 Answers1

1

You need to create a PairedRDD with the key being your output file name and value being the record and then you can callsaveAsHadoopFile() to save the files the way that you are looking for.

import org.json.JSONObject

val dataframe = .... //this is the dataframe that you want to save

val pairedRDD = dataframe.toJSON.rdd.map(row => {
    val record = new JSONObject(row)
    val key = record.getString("DataPartition")

    (key, row)
})

pairedRDD.partitionBy(new HashPartitioner(noOfHashPartitioner))
    .saveAsHadoopFile("", classOf[String], classOf[String], classOf[RddMultiTextOutputFormat])

This will give you, your desired output.

Prasad Khode
  • 6,602
  • 11
  • 44
  • 59