I have a simple text file, which contains "transactions".
1st line is column names e.g. "START_TIME", "END_TIME", "SIZE".. about ~100 column names.
The column names in the file are without quotes.
I want to use Spark, to convert this file to a data frame, with column names,
and then remove all columns from the file BUT some specific columns.
I'm having a bit of trouble converting the text file to data frame.
Here's my code so far:
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# Load relevant objects
sc = SparkContext('local')
log_txt = sc.textFile("/path/to/text/file.txt")
sqlContext = SQLContext(sc)
# Construct fields with names from the header, for creating a DataFrame
header = log_txt.first()
fields = [StructField(field_name, StringType(), True)
for field_name in header.split(',')]
# Only columns\fields 2,3,13,92 are relevant. set them to relevant types
fields[2].dataType = TimestampType() # START_TIME in yyyymmddhhmmss format
fields[3].dataType = TimestampType() # END_TIME in yyyymmddhhmmss
fields[13].dataType = IntegerType() # DOWNSTREAM_SIZE, in bytes
fields[92].dataType = BooleanType() # IS_CELL_CONGESTED, 0 or 1
schema = StructType(fields) # Create a schema object
# Build the DataFrame
log_txt = log_txt.filter(lambda line: line != header) # Remove header from the txt file
temp_var = log_txt.map(lambda k: k.split("\t"))
log_df = sqlContext.createDataFrame(temp_var, schema) # PROBLEMATIC LINE
Problem i have is with the last line, i fear i'm missing some steps before that final steps.
Can you help me determine which steps are missing?
Last line of code produces a lot of errors. Will update them in the post if needed.
File format is (2 lines example)
TRANSACTION_URL,RESPONSE_CODE,START_TIME,END_TIME,.... <more names>
http://www.google.com<\t seperator>0<\t seperator>20160609182001<\t seperator>20160609182500.... <more values>
http://www.cnet.com<\t seperator>0<\t seperator>20160609192001<\t seperator>20160609192500.... <more values>
Also, can someone please help me on removing unneeded columns from the data frame once its built?
Thanks