4

My sample code for reading text file is

val text = sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
    var rddwithPath = text.asInstanceOf[HadoopRDD[LongWritable, Text]].mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
      val file = inputSplit.asInstanceOf[FileSplit]
      iterator.map { tpl ⇒ (file.getPath.toString, tpl._2.toString) }
    }.reduceByKey((a,b) => a)

In this way how can I use PDF and Xml files

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
AkhilaV
  • 423
  • 3
  • 8
  • 18

3 Answers3

9

PDF & XML can be parsed using Tika:

look at Apache Tika - a content analysis toolkit enter image description here look at - https://tika.apache.org/1.9/api/org/apache/tika/parser/xml/
- http://tika.apache.org/0.7/api/org/apache/tika/parser/pdf/PDFParser.html
- https://tika.apache.org/1.9/api/org/apache/tika/parser/AutoDetectParser.html
Below is example integration of Spark with Tika :

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.input.PortableDataStream
import org.apache.tika.metadata._
import org.apache.tika.parser._
import org.apache.tika.sax.WriteOutContentHandler
import java.io._

object TikaFileParser {

  def tikaFunc (a: (String, PortableDataStream)) = {

    val file : File = new File(a._1.drop(5))
    val myparser : AutoDetectParser = new AutoDetectParser()
    val stream : InputStream = new FileInputStream(file)
    val handler : WriteOutContentHandler = new WriteOutContentHandler(-1)
    val metadata : Metadata = new Metadata()
    val context : ParseContext = new ParseContext()

    myparser.parse(stream, handler, metadata, context)

    stream.close

    println(handler.toString())
    println("------------------------------------------------")
  }


  def main(args: Array[String]) {

    val filesPath = "/home/user/documents/*"
    val conf = new SparkConf().setAppName("TikaFileParser")
    val sc = new SparkContext(conf)
    val fileData = sc.binaryFiles(filesPath)
    fileData.foreach( x => tikaFunc(x))
  }
}
Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
  • yes thank you so much . Also one more doubt if I give hdfs path will it work ? for example : path ="hdfs://namenode2.aibl.net:8020/ABDF/akhilaajith/PF_knnmodel_1231480046927236/visualise1/model_points" its showing error message because double // removed from that path . How to overcome this ? – AkhilaV Feb 14 '17 at 07:11
  • If I want corresponding filename with its record , then how can I modify this code ? doc1.txt,first record doc1.txt,secondrecord doc2.txt,firstrecord like this – AkhilaV Feb 21 '17 at 09:45
  • plz help me to figure out – AkhilaV Feb 21 '17 at 09:47
  • you can use `SparkContext.wholeTextFiles` which returns an `RDD` of (filename,content). later you can change the content to binary type and pass it to tika function. note for wholetextfiles files should be not too big check docs for this. – Ram Ghadiyaram Feb 21 '17 at 09:53
  • binaryFiles also has the same signature just now I checked `def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] Permalink Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file (useful for binary data) For example, if you have the following files: hdfs://a-hdfs-path/part-00000 hdfs://a-hdfs-path/part-00001 ... hdfs://a-hdfs-path/part-nnnnn Do val rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path"), then rdd contains (a-hdfs-path/part-00000, its content) ... (a-hdfs-path/part-nnnnn, its content)` – Ram Ghadiyaram Feb 21 '17 at 09:57
  • see this [link](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext@binaryFiles(String,Int):RDD[(String,String)]) – Ram Ghadiyaram Feb 21 '17 at 10:02
  • yes I got that answer. but My question is can I print each record along with its filename like what I mentioned earlier. I want the data in that format . not like doc1.txt,fullcontent doc2.txt,fullcontent – AkhilaV Feb 21 '17 at 10:32
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/136234/discussion-between-ram-ghadiyaram-and-akhilav). – Ram Ghadiyaram Feb 21 '17 at 11:07
  • Does this work even for the non English characters like table,Mathematical formulas ,Hindi language or any other language in PDF.. – The PowerHouse Jan 17 '18 at 10:22
  • @ThePowerHouse refer [this](https://stackoverflow.com/a/18869104/647053) I havent tried foreign characters you need to do a small POC on this – Ram Ghadiyaram Jan 17 '18 at 12:37
  • has anyone figured out how to read from hdfs , i'm having an issue while submitting the rdd to Tika function. – RData Sep 11 '18 at 14:06
  • It seems the spark job fails with fileNotFoundException: def pdfRead(fileNameFromRDD: (String, PortableDataStream), sparkSession: SparkSession) = { val file: File = new File(fileNameFromRDD._1.drop(5)) val document = PDDocument.load(file); //this line throws error java.io.FileNotFoundException How do I use an hdfs file with org.apache.pdfbox.pdmodel.PDDocument ? – Sam Sep 27 '18 at 20:24
  • @RamGhadiyaram The hdfs location is correct. Something wrong is happening when I'm trying to load the file as a PDDocument. – Sam Sep 27 '18 at 23:01
  • @RamGhadiyaram When I run this job in local it works but not when submitted on cluster. Did your code work when submitted on the cluster? I've submitted a question in so with details: https://stackoverflow.com/questions/52546241/text-from-pdf-in-spark – Sam Sep 28 '18 at 15:31
4

PDF can be parse in pyspark as follow:

If PDF is store in HDFS then using sc.binaryFiles() as PDF is store in binary format. Then the binary content can be send to pdfminer for parsing.

import pdfminer
from pdfminer.pdfparser import PDFParser
from pdfminer.pdfdocument import PDFDocument

def return_device_content(cont):
    fp = io.BytesIO(cont)
    parser = PDFParser(fp)
    document = PDFDocument(parser)

filesPath="/user/root/*.pdf"
fileData = sc.binaryFiles(filesPath)
file_content = fileData.map(lambda content : content[1])
file_content1 = file_content.map(return_device_content)

Further parsing is can be done using functionality provided by pdfminer.

0

You can simply use spark-shell with tika and run the below code in a sequential manner or in a distributed manner depending upon your use case

spark-shell --jars tika-app-1.8.jar
val binRDD = sc.binaryFiles("/data/")
val textRDD = binRDD.map(file => {new org.apache.tika.Tika().parseToString(file._2.open( ))})
textRDD.saveAsTextFile("/output/")
System.exit(0)
pissall
  • 7,109
  • 2
  • 25
  • 45
cognitive
  • 31
  • 3