12

I have a simple text file, which contains "transactions".

1st line is column names e.g. "START_TIME", "END_TIME", "SIZE".. about ~100 column names.

The column names in the file are without quotes.

I want to use Spark, to convert this file to a data frame, with column names,

and then remove all columns from the file BUT some specific columns.

I'm having a bit of trouble converting the text file to data frame.

Here's my code so far:

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *

# Load relevant objects
sc = SparkContext('local')
log_txt = sc.textFile("/path/to/text/file.txt")
sqlContext = SQLContext(sc)

# Construct fields with names from the header, for creating a DataFrame
header = log_txt.first()
fields = [StructField(field_name, StringType(), True)
      for field_name in header.split(',')]

# Only columns\fields 2,3,13,92 are relevant. set them to relevant types
fields[2].dataType = TimestampType()    # START_TIME in yyyymmddhhmmss format
fields[3].dataType = TimestampType()    # END_TIME in yyyymmddhhmmss
fields[13].dataType = IntegerType()     # DOWNSTREAM_SIZE, in bytes
fields[92].dataType = BooleanType()     # IS_CELL_CONGESTED, 0 or 1
schema = StructType(fields)             # Create a schema object

# Build the DataFrame
log_txt = log_txt.filter(lambda line: line != header) # Remove header from the txt file
temp_var = log_txt.map(lambda k: k.split("\t"))

log_df = sqlContext.createDataFrame(temp_var, schema) # PROBLEMATIC LINE

Problem i have is with the last line, i fear i'm missing some steps before that final steps.

Can you help me determine which steps are missing?

Last line of code produces a lot of errors. Will update them in the post if needed.

File format is (2 lines example)

TRANSACTION_URL,RESPONSE_CODE,START_TIME,END_TIME,.... <more names>
http://www.google.com<\t seperator>0<\t seperator>20160609182001<\t seperator>20160609182500.... <more values>
http://www.cnet.com<\t seperator>0<\t seperator>20160609192001<\t seperator>20160609192500.... <more values>

Also, can someone please help me on removing unneeded columns from the data frame once its built?

Thanks

ZygD
  • 22,092
  • 39
  • 79
  • 102
Adiel
  • 1,203
  • 3
  • 18
  • 31
  • 1
    the schema that you have made, doesn't fit the data that you provide when you use `createDataFrame`. Might I suggest you create the DataFrame with all of the columns, then do a `log_df.select("columns","you","want")`? – James Tobin Dec 14 '16 at 14:51
  • @JamesTobin , not sure i understand - why does the schema i've made doesn't the fit data that i provide? I'm creating the schema from the text file header line. Regarding your suggestion - that is my intention, first create a DataFrame with all columns, than selecting only relevant columns. the `fields[i].dataType =...` is only for the columns i'll need at the end, so i don't see any point to define the dataType for ALL of the columns.. – Adiel Dec 14 '16 at 16:40
  • 1
    imagine you had a file with "a,b,c" as what you have as `temp_var`; then you try to say "a,b,c" use `Schema(a:String,c:String)` spark won't know what its supposed to do with 'b' You can load in the data with whatever defaults spark picks as the dtypes of the columns, then filter the columns, then change the dtypes of the selected columns to the ones you want. – James Tobin Dec 14 '16 at 17:22
  • How to i load the data with `whatever defaults spark picks as the dtypes` ? And by it, do you mean load the text file without removing the headers line , right? – Adiel Dec 14 '16 at 17:30

1 Answers1

18

I think you're overthinking it a little bit. Imagine we have something less complex, example below

`cat sample_data.txt`
field1\tfield2\tfield3\tfield4
0\tdog\t20160906182001\tgoogle.com
1\tcat\t20151231120504\tamazon.com

open pyspark

sc.setLogLevel("WARN")
#setup the same way you have it
log_txt=sc.textFile("/path/to/data/sample_data.txt")
header = log_txt.first()

#filter out the header, make sure the rest looks correct
log_txt = log_txt.filter(lambda line: line != header)
log_txt.take(10)
  [u'0\\tdog\\t20160906182001\\tgoogle.com', u'1\\tcat\\t20151231120504\\tamazon.com']

temp_var = log_txt.map(lambda k: k.split("\\t"))

#here's where the changes take place
#this creates a dataframe using whatever pyspark feels like using (I think string is the default). the header.split is providing the names of the columns
log_df=temp_var.toDF(header.split("\\t"))
log_df.show()
+------+------+--------------+----------+
|field1|field2|        field3|    field4|
+------+------+--------------+----------+
|     0|   dog|20160906182001|google.com|
|     1|   cat|20151231120504|amazon.com|
+------+------+--------------+----------+
#note log_df.schema
#StructType(List(StructField(field1,StringType,true),StructField(field2,StringType,true),StructField(field3,StringType,true),StructField(field4,StringType,true)))

# now lets cast the columns that we actually care about to dtypes we want
log_df = log_df.withColumn("field1Int", log_df["field1"].cast(IntegerType()))
log_df = log_df.withColumn("field3TimeStamp", log_df["field1"].cast(TimestampType()))

log_df.show()
+------+------+--------------+----------+---------+---------------+
|field1|field2|        field3|    field4|field1Int|field3TimeStamp|
+------+------+--------------+----------+---------+---------------+
|     0|   dog|20160906182001|google.com|        0|           null|
|     1|   cat|20151231120504|amazon.com|        1|           null|
+------+------+--------------+----------+---------+---------------+
log_df.schema
StructType(List(StructField(field1,StringType,true),StructField(field2,StringType,true),StructField(field3,StringType,true),StructField(field4,StringType,true),StructField(field1Int,IntegerType,true),StructField(field3TimeStamp,TimestampType,true)))

#now let's filter out the columns we want
log_df.select(["field1Int","field3TimeStamp","field4"]).show()
+---------+---------------+----------+
|field1Int|field3TimeStamp|    field4|
+---------+---------------+----------+
|        0|           null|google.com|
|        1|           null|amazon.com|
+---------+---------------+----------+

A dataframe needs to have a type for every field that it comes across, whether you actually use that field or not is up to you. You'll have to use one of the spark.SQL functions to convert the string'd dates into actual timestamps, but shouldn't be too tough.

Hope this helps

PS: for your specific case, to make the initial dataframe, try:log_df=temp_var.toDF(header.split(','))

James Tobin
  • 3,070
  • 19
  • 35
  • I ended up using spark-csv which i didn't knew existed, but your answer is great and also works so i'm selecting it as accepted answer :) I'm having trouble regarding the convertion of string'd timestamp `yyyymmddhhmmss` into actual timestamp such as `yyyy-mm-dd hh`. Can i please contact you in private for help with that? i don't want it to take the focus from this thread. – Adiel Dec 15 '16 at 19:18
  • check out http://stackoverflow.com/questions/29844144/better-way-to-convert-a-string-field-into-timestamp-in-spark – James Tobin Dec 15 '16 at 19:34