14

I'm using Apache Spark 1.0.1. I have many files delimited with UTF8 \u0001 and not with the usual new line \n. How can I read such files in Spark? Meaning, the default delimiter of sc.textfile("hdfs:///myproject/*") is \n, and I want to change it to \u0001.

Giulio Vian
  • 8,248
  • 2
  • 33
  • 41
dotan
  • 1,484
  • 3
  • 11
  • 13

5 Answers5

10

You can use textinputformat.record.delimiter to set the delimiter for TextInputFormat, E.g.,

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val conf = new Configuration(sc.hadoopConfiguration)
conf.set("textinputformat.record.delimiter", "X")
val input = sc.newAPIHadoopFile("file_path", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)
val lines = input.map { case (_, text) => text.toString}
println(lines.collect)

For example, my input is a file containing one line aXbXcXd. The above code will output

Array(a, b, c, d)
zsxwing
  • 20,270
  • 4
  • 37
  • 59
  • 1
    When I run above codes in spark-shell, I got the following errors: scala> val job = new Job(sc.hadoopConfiguration) warning: there were 1 deprecation warning(s); re-run with -deprecation for details java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:283) How to fix this "java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING" problem? – Leo Nov 27 '14 at 02:19
  • Could you paste the full stack track in some place and provide a link? – zsxwing Nov 27 '14 at 02:32
7

In Spark shell, I extracted data according to Setting textinputformat.record.delimiter in spark:

$ spark-shell
...
scala> import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.LongWritable

scala> import org.apache.hadoop.io.Text
import org.apache.hadoop.io.Text

scala> import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.conf.Configuration

scala> import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

scala> val conf = new Configuration
conf: org.apache.hadoop.conf.Configuration = Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml

scala> conf.set("textinputformat.record.delimiter", "\u0001")

scala> val data = sc.newAPIHadoopFile("mydata.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf).map(_._2.toString)
data: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text)] = NewHadoopRDD[0] at newAPIHadoopFile at <console>:19

sc.newAPIHadoopFile("mydata.txt", ...) is a RDD[(LongWritable, Text)], where the first part of the elements is the starting character index, and the second part is the actual text delimited by "\u0001".

Community
  • 1
  • 1
Leo
  • 1,710
  • 2
  • 26
  • 29
7

In python this could be achieved using:

rdd = sc.newAPIHadoopFile(YOUR_FILE, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
            "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
            conf={"textinputformat.record.delimiter": YOUR_DELIMITER}).map(lambda l:l[1])
singer
  • 2,616
  • 25
  • 21
1

Here is a ready-to-use version of Chad's and @zsxwing's answers for Scala users, which can be used this way:

sc.textFile("some/path.txt", "\u0001")

The following snippet creates an additional textFile method implicitly attached to the SparkContext using an implicit class (in order to replicate SparkContext's default textFile method):

package com.whatever

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

object Spark {

  implicit class ContextExtensions(val sc: SparkContext) extends AnyVal {

    def textFile(
        path: String,
        delimiter: String,
        maxRecordLength: String = "1000000"
    ): RDD[String] = {

      val conf = new Configuration(sc.hadoopConfiguration)

      // This configuration sets the record delimiter:
      conf.set("textinputformat.record.delimiter", delimiter)
      // and this one limits the size of one record:
      conf.set("mapreduce.input.linerecordreader.line.maxlength", maxRecordLength)

      sc.newAPIHadoopFile(
          path,
          classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          conf
        )
        .map { case (_, text) => text.toString }
    }
  }
}

which can be used this way:

import com.whatever.Spark.ContextExtensions
sc.textFile("some/path.txt", "\u0001")

Note the additional setting mapreduce.input.linerecordreader.line.maxlength which limits the maximum size of a record. This comes in handy when reading from a corrupted file for which a record could be too long to fit in memory (more chances of it happening when playing with the record delimiter).

With this setting, when reading a corrupted file, an exception (java.io.IOException - thus catchable) will be thrown rather than getting a messy out of memory which will stop the SparkContext.

Xavier Guihot
  • 54,987
  • 21
  • 291
  • 190
1

If you are using spark-context, the below code helped me sc.hadoopConfiguration.set("textinputformat.record.delimiter","delimeter")

con
  • 5,767
  • 8
  • 33
  • 62
sravans
  • 11
  • 2
  • if Spark >= 2.0, use `spark.sparkContext._jsc.hadoopConfiguration().set("textinputformat.record.delimiter","delimeter")` – noleto Dec 22 '21 at 21:10