-1

I want to read a large text file with Spark and change the textinputformat.record.delimiter to H. This code does NOT work:

appName = "My Test"
fname="myfile.txt"

import operator
from pyspark import SparkContext, SparkConf

if __name__=="__main__":

    conf = SparkConf().setAppName(appName)
    conf.set("textinputformat.record.delimiter", "H")
    sc   = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    rdd1 = sc.textFile(fname)
    print("normal:",rdd1.collect())

It appears that the only way to do this is to use the sc.newAPIHadoopFile API call. However, I can't figure out how to do that from Python. This doesn't work:

    conf2 = SparkConf()
    conf2.set("textinputformat.record.delimiter", "H")
    rdd2 = sc.newAPIHadoopFile(fname, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text", conf2)
    print("improved:",rdd2.collect())

I get this error:

AttributeError: 'SparkConf' object has no attribute '_get_object_id'

Although this uses the same API as the question creating spark data structure from multiline record, it is a different use of the API. In that question, a function is created that allows multiple lines to be read at once. In this use, we change the textinputformat.record.delimiter

vy32
  • 28,461
  • 37
  • 122
  • 246

1 Answers1

1

Below is the proper syntax for setting the configuration and calling newAPIHadoopFile

appName = "My Test"
fname="myfile.txt"

from pyspark import SparkContext, SparkConf

if __name__=="__main__":
    conf = SparkConf().setAppName(appName)
    conf.set("textinputformat.record.delimiter", "H")
    sc   = SparkContext(conf=conf)
    sc.setLogLevel("ERROR")
    rdd1 = sc.textFile(fname)
    print("normal:",rdd1.collect())

    rconf = { "textinputformat.record.delimiter": "H" }
    rdd2 = sc.newAPIHadoopFile(fname,
                               "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", # inputFormatClass
                               "org.apache.hadoop.io.Text", # keyClass
                               "org.apache.hadoop.io.LongWritable", # valueClass
                               conf=rconf).map(lambda a:'H'+a[1])
    print("improved:",rdd2.collect())
vy32
  • 28,461
  • 37
  • 122
  • 246