0

I am just learning Spark and started with RDDs and now moving on to DataFrames. In my current pyspark project, I am reading an S3 file into an RDD and running some simple transformations on them. Here is the code.

segmentsRDD = sc.textFile(fileLocation). \
    filter(lambda line: line.split(",")[6] in INCLUDE_SITES). \
    filter(lambda line: line.split(",")[2] not in EXCLUDE_MARKETS). \
    filter(lambda line: "null" not in line). \
    map(splitComma). \
    filter(lambda line: line.split(",")[5] == '1')

SplitComma is a function that does some date calculations on the row data and return 10 comma-delimited fields back. Once I get that I run the last filter as shown to only pickup rows where value in field [5] = 1. So far everything is fine.

Next, I would like to convert the segmentsRDD to DF with schema as shown below.

interim_segmentsDF = segmentsRDD.map(lambda x: x.split(",")).toDF("itemid","market","itemkey","start_offset","end_offset","time_shifted","day_shifted","tmsmarketid","caption","itemstarttime")

But I get an error about unable to convert a "pyspark.rdd.PipelinedRDD" to DataFrame. Can you please explain the difference between "pyspark.rdd.PipelinedRDD" and "row RDD"? I am attempting to convert to DF with a schema as shown. What am I missing here?

Thanks

NetRocks
  • 467
  • 6
  • 25

1 Answers1

0

You have to add the following lines in your code:

from pyspark.sql import SparkSession
spark = SparkSession(sc)

The method .toDF() is not an original method of the rdd. If you take a look in Spark source code you will see that the method .toDF() is a monkey patch.

So, with SparkSession initialization you call this monkey pached method; in other words when you run rdd.toDF() you run directly the method .toDF() from Dataframe API.

ggeop
  • 1,230
  • 12
  • 24
  • I am confused. I have been doing the following (deriving spark context from spark session). from pyspark.sql import SparkSession from pyspark.conf import SparkConf spark = SparkSession.builder.appName('SampleApp').master('local[2]').getOrCreate() sc = spark.sparkContext segmentsRDD = sc.textFile(fileLocation) .... .... segmentsRDD.repartition(1).saveAsTextFile(('s3://....')). This has been working fine. Your code looks the opposite - deriving spark session from spark context. – NetRocks Jan 29 '20 at 19:27
  • Yes exactly, when I initialize spark session I create a spark context, so it works! Take a look in the [Spark session](https://github.com/apache/spark/blob/7c7d7f6a878b02ece881266ee538f3e1443aa8c1/python/pyspark/sql/session.py#L43-L59) in the lines 211-220 – ggeop Jan 29 '20 at 19:35
  • @NetRocks has the question answered? – ggeop Jan 29 '20 at 19:36
  • After making the changes you suggested, now I am getting "PipelinedRDD' object has no attribute 'toDF". The return line from SplitComma function looks like this ---- return "{},{},{},{},{},{},{},{},{},{}".format(a,b,c d,e,f,g, h,i,j). – NetRocks Jan 29 '20 at 19:51
  • If you add my 2 lines of code at the beginning, you will not take this error, because my initialization fixes exactly this error. – ggeop Jan 29 '20 at 20:01
  • I don't know about your return, because I don't have your input file or the splitComma function. These transformations are not relevant with your main question "How to convert the RDD to Dataframe". I you have problem with the transformations, you can create another question. – ggeop Jan 29 '20 at 20:06