7

I have a pyspark to load data from a TSV file and save it as parquet file as well save it as a persistent SQL table.

When I run it line by line through pyspark CLI, it works exactly like expected. When I run it as as an application using spark-submit it runs without any errors but I get strange results: 1. the data is overwritten instead of appended. 2. When I run SQL queries against it I get no data returned even though the parquet files are several gigabytes in size (what I expect). Any suggestions?

Code:

from pyspark import SparkContext, SparkConf
from pyspark.sql.types import *
from pyspark.sql.functions import *

csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv'
parquet_dir = '/srv/spark/data/parquet/ipfixminute'

sc = SparkContext(appName='import-ipfixminute')
spark = SQLContext(sc)

fields = [StructField('time_stamp', TimestampType(), True),
                StructField('subscriberId', StringType(), True),
                StructField('sourceIPv4Address', StringType(), True),
                StructField('destinationIPv4Address', StringType(), True),
                StructField('service',StringType(), True),
                StructField('baseService',StringType(), True),
                StructField('serverHostname', StringType(), True),
                StructField('rat', StringType(), True),
                StructField('userAgent', StringType(), True),
                StructField('accessPoint', StringType(), True),
                StructField('station', StringType(), True),
                StructField('device', StringType(), True),
                StructField('contentCategories', StringType(), True),
                StructField('incomingOctets', LongType(), True),
                StructField('outgoingOctets', LongType(), True),
                StructField('incomingShapingDrops', IntegerType(), True),
                StructField('outgoingShapingDrops', IntegerType(), True),
                StructField('qoeIncomingInternal', DoubleType(), True),
                StructField('qoeIncomingExternal', DoubleType(), True),
                StructField('qoeOutgoingInternal', DoubleType(), True),
                StructField('qoeOutgoingExternal', DoubleType(), True),
                StructField('incomingShapingLatency', DoubleType(), True),
                StructField('outgoingShapingLatency', DoubleType(), True),
                StructField('internalRtt', DoubleType(), True),
                StructField('externalRtt', DoubleType(), True),
                StructField('HttpUrl',StringType(), True)]

schema = StructType(fields)
df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss')
df = df.drop('all')
df = df.withColumn('date',to_date('time_stamp'))
df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir)
Mikhail Venkov
  • 358
  • 2
  • 11
  • When you run spark-submit, you delegate the execution to your cluster (Spark? YARN?) on which you may be misssing dependencies. See also this discussion See https://stackoverflow.com/questions/36910014/spark-pyspark-vs-spark-submit – Harold May 25 '18 at 14:08
  • 2
    Looks like a duplicate of [Spark can access Hive table from pyspark but not from spark-submit](https://stackoverflow.com/q/36359812) – Alper t. Turker May 26 '18 at 21:59
  • Try to insert time.sleep(600) at the end of code, go to Spark UI and check out logs. – Eugene Lopatkin May 29 '18 at 22:44
  • @user8371915 Yes that was the same issue! Although in my case it wasn't throwing any errors so it was hard to understand what was the root cause. Thanks! – Mikhail Venkov May 30 '18 at 17:29

1 Answers1

3

As @user8371915 suggested it is similar to this:

Spark can access Hive table from pyspark but not from spark-submit

I needed to replace

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

with

from pyspark.sql import HiveContext

sqlContext = HiveContext(sc)

This resolved this issue.

Mikhail Venkov
  • 358
  • 2
  • 11