2

I have created a PySpark RDD (converted from XML to CSV) that does not have headers. I need to convert it to a DataFrame with headers to perform some SparkSQL queries on it. I cannot seem to find a simple way to add headers. Most examples start with a dataset that already has headers.

    df = spark.read.csv('some.csv', header=True, schema=schema)

However, I need to append headers.

    headers = ['a', 'b', 'c', 'd']

This seems to be a trivial problem, I am not sure why I cannot find a working solution. Thank you.

Annabanana
  • 91
  • 1
  • 3
  • 13
  • If the headers are not there, you can specify schema which will have col name, datatype, nullable. Then you can use SparkSQL. – Ranga Vure May 11 '19 at 07:44
  • Great, thank you. I will try that. I'm very new to Spark, and sometimes it's the trivial syntax type things that I get stuck on. – Annabanana May 11 '19 at 17:30
  • @Annabanana This should work: `df = spark.read.csv(filename).toDF("col1","col2","col3")` [Ref](https://stackoverflow.com/a/46921012/1232087) – nam Jun 12 '22 at 00:52

2 Answers2

5

Like this ... you need to specify schema and .option("header", "false") if your csv does not contain a header row

spark.version
'2.3.2'

! cat sample.csv

1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"

PATH = "sample.csv"

from pyspark.sql.functions import *
from pyspark.sql.types import *

schema = StructType([\
    StructField("col1", IntegerType(), True),\
    StructField("col2", FloatType(), True),\
    StructField("col3", StringType(), True)])

csvFile = spark.read.format("csv")\
.option("header", "false")\
.schema(schema)\
.load(PATH)

csvFile.show()

+----+----+---------------+
|col1|col2|           col3|
+----+----+---------------+
|   1| 2.0|          hello|
|   3| 4.0|        "there"|
|   5| 6.0| "how are you?"|
+----+----+---------------+

# if you have rdd and want to convert straight to df
rdd = sc.textFile(PATH)

# just showing rows
for i in rdd.collect(): print(i)
1, 2.0,"hello"
3, 4.0, "there"
5, 6.0, "how are you?"

# use Row to construct a schema from rdd
from pyspark.sql import Row

csvDF = rdd\
    .map(lambda x: Row(col1 = int(x.split(",")[0]),\
                       col2 = float(x.split(",")[1]),\
                       col3 = str(x.split(",")[2]))).toDF()

csvDF.show()
+----+----+---------------+
|col1|col2|           col3|
+----+----+---------------+
|   1| 2.0|        "hello"|
|   3| 4.0|        "there"|
|   5| 6.0| "how are you?"|
+----+----+---------------+

csvDF.printSchema()
root
 |-- col1: long (nullable = true)
 |-- col2: double (nullable = true)
 |-- col3: string (nullable = true)

thePurplePython
  • 2,621
  • 1
  • 13
  • 34
  • Thank you @thePurplePython. What I have is an RDD that is a comma delimited text file without headers. When I save it to my hard drive it is separated into 100 partitions. I want to skip the saving part and create a DF from a comma delimited RDD. So I need to add headers and convert an RDD to a DF. How would I do it? – Annabanana May 11 '19 at 20:27
  • What I have is an RDD of comma delimited text files: "1,2.0,hello/3,4.0,there/5,6.0,how are you?" and I need to convert it to DF as you kindly show above. Any suggestions are appreciated. Thank you. – Annabanana May 11 '19 at 20:34
  • Thank you. I tried but it gave me an error. I checked my RDD type and type(RDD) = pyspark.rdd.PipelinedRDD. How do I convert a pipeline RDD to a dataframe? – Annabanana May 11 '19 at 23:34
  • I am not sure what your rdd looks like ... try this => https://stackoverflow.com/questions/48111066/how-to-convert-pyspark-rdd-pipelinedrdd-to-data-frame-with-out-using-collect-m – thePurplePython May 12 '19 at 00:16
5

rdd.toDF(schema=['a', 'b', 'c', 'd']

kuldeep mishra
  • 154
  • 2
  • 11