7

I'm quite new to pyspark and am trying to use it to process a large dataset which is saved as a csv file. I'd like to read CSV file into spark dataframe, drop some columns, and add new columns. How should I do that?

I am having trouble getting this data into a dataframe. This is a stripped down version of what I have so far:

def make_dataframe(data_portion, schema, sql):
    fields = data_portion.split(",")
    return sql.createDateFrame([(fields[0], fields[1])], schema=schema)

if __name__ == "__main__":
    sc = SparkContext(appName="Test")
    sql = SQLContext(sc)

    ...

    big_frame = data.flatMap(lambda line: make_dataframe(line, schema, sql))
                .reduce(lambda a, b: a.union(b))

    big_frame.write \
        .format("com.databricks.spark.redshift") \
        .option("url", "jdbc:redshift://<...>") \
        .option("dbtable", "my_table_copy") \
        .option("tempdir", "s3n://path/for/temp/data") \
        .mode("append") \
        .save()

    sc.stop()

This produces an error TypeError: 'JavaPackage' object is not callable at the reduce step.

Is it possible to do this? The idea with reducing to a dataframe is to be able to write the resulting data to a database (Redshift, using the spark-redshift package).

I have also tried using unionAll(), and map() with partial() but can't get it to work.

I am running this on Amazon's EMR, with spark-redshift_2.10:2.0.0, and Amazon's JDBC driver RedshiftJDBC41-1.1.17.1017.jar.

ZygD
  • 22,092
  • 39
  • 79
  • 102
Tim B
  • 3,033
  • 1
  • 23
  • 28
  • What is the input (CSV fields), and what should be the output? You might need to redesign your code. It seems to me that you can avoid the need to create data-frames and union them.... – Yaron Oct 30 '16 at 10:55
  • @Yaron The csv is just numbers that can be arranged into the schema for the dataframe. I know that I can easily save this as an RDD nice and efficiently, but if I do that I can't write it to a redshift database (as far as I know), which is the ultimate goal. – Tim B Oct 30 '16 at 11:19
  • what I tried to hint - I guess that you can solve it using one spark data-frame, without the need for several data-frames + union of them. again - what algorithm are you trying to use? what is the expected output? – Yaron Oct 30 '16 at 11:27
  • @Yaron I've added the spark-redshift write function I would use with the dataframe. This would append the data to an existing table. You could be right, I don't know if there is a better way to return the data from the map function (make_dataframe) to allow me to create a frame? – Tim B Oct 30 '16 at 11:43

1 Answers1

11

Update - answering also your question in comments:

Read data from CSV to dataframe: It seems that you only try to read CSV file into a spark dataframe.

If so - my answer here: https://stackoverflow.com/a/37640154/5088142 cover this.

The following code should read CSV into a spark-data-frame

import pyspark
sc = pyspark.SparkContext()
sql = SQLContext(sc)

df = (sql.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("/path/to_csv.csv"))

// these lines are equivalent in Spark 2.0 - using [SparkSession][1]
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark.read.format("csv").option("header", "true").load("/path/to_csv.csv") 
spark.read.option("header", "true").csv("/path/to_csv.csv")

drop column

you can drop column using "drop(col)" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

drop(col)

Returns a new DataFrame that drops the specified column.
Parameters: col – a string name of the column to drop, or a Column to drop.

>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.drop(df.age).collect()
[Row(name=u'Alice'), Row(name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
[Row(age=5, height=85, name=u'Bob')]

>>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
[Row(age=5, name=u'Bob', height=85)]

add column You can use "withColumn" https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html

withColumn(colName, col)

Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
Parameters: 

    colName – string, name of the new column.
    col – a Column expression for the new column.

>>> df.withColumn('age2', df.age + 2).collect()
[Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]

Note: spark has a lot of other functions which can be used (e.g. you can use "select" instead of "drop")

Yaron
  • 10,166
  • 9
  • 45
  • 65
  • Thanks for this. As part of the processing I need to remove a column from the data frame (which is included in the CSV file being read), and add a new column to the frame containing an ID before I write the data to redshift. Could I use the method you have here to read the data, and then perform this processing on the dataframe? – Tim B Oct 30 '16 at 12:56
  • I've updated my answer , to answer you questions in your comment – Yaron Oct 30 '16 at 13:09
  • @TimB - if I answered your question, please accept it. – Yaron Oct 30 '16 at 13:19
  • This is great, thanks. Do I have to call collect()? I would like to avoid bringing all of the data back to the driver if possible. I was wondering if I could just append the write() command to, for example, the withColumn command? – Tim B Oct 30 '16 at 13:21
  • @TimB - I don't see any need for using collect(). As you suggested, you can execute the "write()" command on the last dataframe – Yaron Oct 30 '16 at 13:36
  • @Yaron I'm working with Spark 2.3.0 and pyspark, but these lines do not work: ` `spark.read.format("csv").option("header", "true").load("/path/to_csv.csv") spark.read.option("header", "true").csv("/path/to_csv.csv")`: `name 'spark' is not defined` – clstaudt Apr 11 '18 at 08:40