1

I am currently reading all of the text files in a public AWS bucket that contains hundreds of CSV files. I read all of the CSV files at once and then turn them into an RDD and start massaging the data so that it can be stored in Cassandra. Processing all of the textfiles is taking over two hours and a half and this is too long for just 100GB of data. Is there anything I can do to my code below to make it faster?

I appreciate any suggestions. I've also tried reading this https://robertovitillo.com/2015/06/30/spark-best-practices/ but I'm confused with how to implement some of the things mentioned like "Using the right level of parallelism." I also tried storing my RDD in cache by doing rdd.cache, but that still took over two hours.

conf = SparkConf() \
   .setMaster("spark://%s:%s" % (SPARK_IP, SPARK_PORT))

sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
rdd = sc.textFile("s3a://gdelt-open-data/events/*")

def rddCleaning(rd,timeframe):

 def check_valid(tup):
    try:
        int(tup[1])
        int(tup[4])
        float(tup[5])
        float(tup[6])
        return True
    except ValueError:
        return False


 def fillin(tup):
    if tup[2] == "" and tup[3] != "":
        return ((tup[0],tup[1],tup[3],tup[4],tup[5],tup[6]))
    else:
        return ((tup[0],tup[1],tup[2],tup[4],tup[5],tup[6]))

 def popular_avg(curr_tup):
    lst = curr_tup[1]
    dictionary = curr_tup[2]
    dict_matches = {}
    for tup in lst:
    event_type = tup[0]
        dict_matches[event_type] = dictionary[event_type]
    return ((curr_tup[0],lst,dict_matches,curr_tup[3]))

 def merge_info(tup):
    main_dict = tup[1]
    info_dict = tup[2]
    for key in info_dict:
        main_dict[key].update(info_dict[key])
    main_dict["TotalArticles"] = {"total":tup[3]}
    return ((tup[0],main_dict))

 def event_todict(tup):
    lst = tup[1]
    dict_matches = {}
    for event_tup in lst:
        dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
    return ((tup[0],dict_matches,tup[2],tup[3]))

 def sum_allevents(tup):
    type_lst = tup[1]
    total_mentions = 0
    for event in type_lst:
            total_mentions += event[1]
    return ((tup[0],type_lst,tup[2],total_mentions))

actionGeo_CountryCode = 51
time = 0
actor1Type1Code = 12
actor2Type1Code = 22
numArticles = 33
goldsteinScale = 30
avgTone = 34

if timeframe == "SQLDATE":
time = 1
elif timeframe == "MonthYear":
time = 2
else:
time = 3




rdd_reduce  = rd.map(lambda x: x.split('\t')) \
        .map(lambda y: ((y[actionGeo_CountryCode],
                                 y[time],
                                 y[actor1Type1Code],
                                 y[actor2Type1Code],
                                 y[numArticles],
                                 y[goldsteinScale],
                                 y[avgTone]))) \
        .filter(check_valid) \
        .map(lambda c: ((c[0],int(c[1]),c[2],c[3],int(c[4]),int(float(c[5])),int(float(c[6]))))) \
        .map(fillin) \
                .filter(lambda r: r[0] in tofullname and r[2] in toevent and  r[2] != "" and r[0] != "") \
                .map(lambda t: ((tofullname[t[0]],t[1],toevent[t[2]],t[3],t[4],t[5]))) \
                .map(lambda f: (((f[0],f[1],f[2]),(f[3],f[4],f[5],1)))) \
        .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1], a[2]+b[2], a[3]+b[3])) \
        .map(lambda s: ((s[0],(s[1][0],s[1][1]/s[1][3],s[1][2]/s[1][3]))))


rdd_format = rdd_reduce.map(lambda t:((t[0][0],t[0][1]),
                                      ([(t[0][2],t[1][0])],
                                      [(t[0][2],{"GoldsteinScaleAvg":t[1][1],
                                                "ToneAvg":t[1][2]})]))) \
           .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
           .map(lambda v: (v[0],
                                       sorted(v[1][0],key=itemgetter(1),reverse=True),
                                       v[1][1])) \
               .map(sum_allevents) \
                       .map(lambda f: ((f[0],f[1][:5],dict(f[2]),f[3]))) \
                       .map(popular_avg) \
               .map(event_todict) \
                       .map(merge_info) \
           .map(lambda d: ((d[0][0],d[0][1],d[1])))

return rdd_format




daily_rdd = rddCleaning(rdd,"SQLDATE")
print(daily_rdd.take(6));
monthly_rdd = rddCleaning(rdd,"MonthYear")
print(monthly_rdd.take(6));
yearly_rdd = rddCleaning(rdd,"Year")
print(yearly_rdd.take(6));

Here is a picture of my pyspark running: enter image description here

Edits made after suggestions: I made the following changes to my code and it improved the performance, but it is still taking a long time. Is this happening because every time I call df it is reading all of the files from my S3 bucket all over again? Should I put some of my df and temporary tables in cache? Here is my code:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, DoubleType, IntegerType
from abbreviations_dict import tofullname, toevent
from operator import itemgetter
import pyspark_cassandra

sc = SparkContext()
sqlContext = SQLContext(sc)

customSchema = schema = StructType([
        StructField('GLOBALEVENTID',StringType(),True),
        StructField('SQLDATE',StringType(),True),
        StructField('MonthYear',StringType(),True),
        StructField('Year',StringType(),True),
        StructField('FractionDate',StringType(),True),
        StructField('Actor1Code',StringType(),True),
        StructField('Actor1Name',StringType(),True),
        StructField('Actor1CountryCode',StringType(),True),
        StructField('Actor1KnownGroupCode',StringType(),True),
        StructField('Actor1EthnicCode',StringType(),True),
        StructField('Actor1Religion1Code',StringType(),True),
        StructField('Actor1Religion2Code',StringType(),True),
        StructField('Actor1Type1Code',StringType(),True),
        StructField('Actor1Type2Code',StringType(),True),
        StructField('Actor1Type3Code',StringType(),True),
        StructField('Actor2Code',StringType(),True),
        StructField('Actor2Name',StringType(),True),
        StructField('Actor2CountryCode',StringType(),True),
        StructField('Actor2KnownGroupCode',StringType(),True),
        StructField('Actor2EthnicCode',StringType(),True),
        StructField('Actor2Religion1Code',StringType(),True),
        StructField('Actor2Religion2Code',StringType(),True),
        StructField('Actor2Type1Code',StringType(),True),
        StructField('Actor2Type2Code',StringType(),True),
        StructField('Actor2Type3Code',StringType(),True),
        StructField('IsRootEvent',StringType(),True),
        StructField('EventCode',StringType(),True),
        StructField('EventBaseCode',StringType(),True),
        StructField('EventRootCode',StringType(),True),
        StructField('QuadClass',StringType(),True),
        StructField('GoldsteinScale',StringType(),True),
        StructField('NumMentions',StringType(),True),
        StructField('NumSources',StringType(),True),
        StructField('NumArticles',StringType(),True),
        StructField('AvgTone',StringType(),True),
        StructField('Actor1Geo_Type',StringType(),True),
        StructField('Actor1Geo_FullName',StringType(),True),
        StructField('Actor1Geo_CountryCode',StringType(),True),
        StructField('Actor1Geo_ADM1Code',StringType(),True),
        StructField('Actor1Geo_Lat',StringType(),True),
        StructField('Actor1Geo_Long',StringType(),True),
        StructField('Actor1Geo_FeatureID',StringType(),True),
        StructField('Actor2Geo_Type',StringType(),True),
        StructField('Actor2Geo_FullName',StringType(),True),
        StructField('Actor2Geo_CountryCode',StringType(),True),
        StructField('Actor2Geo_ADM1Code',StringType(),True),
        StructField('Actor2Geo_Lat',StringType(),True),
        StructField('Actor2Geo_Long',StringType(),True),
        StructField('Actor2Geo_FeatureID',StringType(),True),
        StructField('ActionGeo_Type',StringType(),True),
        StructField('ActionGeo_FullName',StringType(),True),
        StructField('ActionGeo_CountryCode',StringType(),True),
        StructField('ActionGeo_ADM1Code',StringType(),True),
        StructField('ActionGeo_Lat',StringType(),True),
        StructField('ActionGeo_Long',StringType(),True),
        StructField('ActionGeo_FeatureID',StringType(),True),
        StructField('DATEADDED',StringType(),True),
        StructField('SOURCEURL',StringType(),True)])

df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='false') \
    .options(delimiter="\t") \
    .load('s3a://gdelt-open-data/events/*', schema = customSchema)

def modify_values(r,y):
   if r == '' and y != '':
    return y
   else:
        return r

def country_exists(r):
   if r in tofullname:
        return tofullname[r]
   else:
    return ''

def event_exists(r):
   if r in toevent:
    return toevent[r]
   else:
    return ''


modify_val = udf(modify_values, StringType())
c_exists = udf(country_exists,StringType())
e_exists = udf(event_exists,StringType())
dfsub1 = df.withColumn("Actor1Type1Code",modify_val(col("Actor1Type1Code"),col("Actor2Type1Code"))) \
           .withColumn("ActionGeo_CountryCode",c_exists(col("ActionGeo_CountryCode"))) \
           .withColumn("Actor1Type1Code",e_exists(col("Actor1Type1Code")))

sqlContext.registerDataFrameAsTable(dfsub1, 'temp')
df2 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                               SQLDATE, MonthYear, Year,
                               Actor1Type1Code,
                               NumArticles,
                               GoldsteinScale,
                               AvgTone
                          FROM temp
                         WHERE ActionGeo_CountryCode <> ''
                            AND Actor1Type1Code <> ''
                            AND NumArticles <> ''
                            AND GoldsteinScale <> ''
                            AND AvgTone <> ''""")

sqlContext.registerDataFrameAsTable(df2, 'temp2')
df3 = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                               CAST(SQLDATE AS INTEGER), CAST(MonthYear AS INTEGER), CAST(Year AS INTEGER),
                               Actor1Type1Code,
                               CAST(NumArticles AS INTEGER),
                               CAST(GoldsteinScale AS INTEGER),
                               CAST(AvgTone AS INTEGER)
                          FROM temp2""")

sqlContext.registerDataFrameAsTable(df3, 'temp3')
sqlContext.cacheTable('temp3')

dfdaily = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                   SQLDATE,
                                   Actor1Type1Code,
                                   SUM(NumArticles) AS NumArticles,
                                   ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                   ROUND(AVG(AvgTone),0) AS AvgTone
                               FROM temp3
                              GROUP BY ActionGeo_CountryCode,
                                      SQLDATE,
                                      Actor1Type1Code""")

dfmonthly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                     MonthYear,
                                     Actor1Type1Code,
                                     SUM(NumArticles) AS NumArticles,
                                     ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                     ROUND(AVG(AvgTone),0) as AvgTone
                                FROM temp3
                                   GROUP BY ActionGeo_CountryCode,
                                    MonthYear,
                                    Actor1Type1Code""")

dfyearly = sqlContext.sql("""SELECT ActionGeo_CountryCode,
                                    Year,
                                    Actor1Type1Code,
                                    SUM(NumArticles) AS NumArticles,
                                    ROUND(AVG(GoldsteinScale),0) AS GoldsteinScale,
                                    ROUND(AVG(AvgTone),0) as AvgTone
                               FROM temp3
                              GROUP BY ActionGeo_CountryCode,
                                       Year,
                                       Actor1Type1Code""")

def rddCleaning(rd,timeframe):

    def popular_avg(curr_tup):
        lst = curr_tup[1]
        dictionary = curr_tup[2]
        dict_matches = {}
        for tup in lst:
        event_type = tup[0]
            dict_matches[event_type] = dictionary[event_type]
        return ((curr_tup[0],lst,dict_matches,curr_tup[3]))

    def merge_info(tup):
        main_dict = tup[1]
        info_dict = tup[2]
        for key in info_dict:
            main_dict[key].update(info_dict[key])
    main_dict["TotalArticles"] = {"total":tup[3]}
        return ((tup[0],main_dict))

    def event_todict(tup):
        lst = tup[1]
        dict_matches = {}
        for event_tup in lst:
            dict_matches[event_tup[0]] = {"ArticleMentions":event_tup[1]}
        return ((tup[0],dict_matches,tup[2],tup[3]))

    def sum_allevents(tup):
        type_lst = tup[1]
        total_mentions = 0
        for event in type_lst:
                total_mentions += event[1]
        return ((tup[0],type_lst,tup[2],total_mentions))

    rdd_format = rd.map(lambda y: ((y["ActionGeo_CountryCode"],y[timeframe]),
                                   ([(y["Actor1Type1Code"],y["NumArticles"])],
                    [(y["Actor1Type1Code"],{"Goldstein":y["GoldsteinScale"],"ToneAvg":y["AvgTone"]})]
                   ))) \
           .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
               .map(lambda v: (v[0],
                                   sorted(v[1][0],key=itemgetter(1),reverse=True),
                                   dict(v[1][1]))) \
           .map(sum_allevents) \
                   .map(popular_avg) \
           .map(event_todict) \
                   .map(merge_info) \
               .map(lambda d: ((d[0][0],d[0][1],d[1])))

    return rdd_format

print("THIS IS THE FIRST ONE ######################################################")
daily_rdd = rddCleaning(dfdaily.rdd,"SQLDATE")
print(daily_rdd.take(5))
print("THIS IS THE SECOND ONE ######################################################")
monthly_rdd = rddCleaning(dfmonthly.rdd,"MonthYear")
print(monthly_rdd.take(5))
print("THIS IS THE THIRD ONE ######################################################")
yearly_rdd = rddCleaning(dfyearly.rdd,"Year")
print(yearly_rdd.take(5))
zero323
  • 322,348
  • 103
  • 959
  • 935
pandasCat
  • 159
  • 2
  • 11
  • For starters you loose all the udfs. None of these is necessary. – zero323 Feb 20 '17 at 00:34
  • Thank you for responding! I really appreciate it. I used ufds because I was not sure how to use a python dictionary in SQL. In two of the udfs I am renaming rows based off a dictionary stored in another python file. For example, if a cell in a column has "US" then I rename it to "United States" in the udf. Can i have a dictionary variable in SQL? – pandasCat Feb 20 '17 at 01:11
  • You can use `join` with `broadcast` or [literal map](http://stackoverflow.com/a/32788650/1560062). – zero323 Feb 20 '17 at 15:25
  • @zero323 I will make the changes you suggested. Is there anything else that you think is causing my spark job to take a long time to run? Are there additional configurations I could make to make my job run faster? I am using a master node and three slave nodes and processing 100GB of data is taking over three hours. – pandasCat Feb 20 '17 at 20:13
  • @zero323 Sorry to keep asking questions, but i can't figure out how to make lit() work for my use case. I read over the example you sent me, but I don't see how to use a dictionary in lit. My dictionary has over 40 keys. – pandasCat Feb 21 '17 at 06:25
  • 1
    `pyspark.sql.functions.create_map` – zero323 Feb 21 '17 at 13:09

1 Answers1

2

The most immediate thing I can think of is to use dataframes instead of RDD. Basically RDD in python are considerably slower than in scala because of conversions between python and JVM. Also dataframes enjoy many optimizations.

It is very difficult to follow all the code here to try to suggest a conversion, however, as a basis you can use spark.read.csv to read from the csv directly to dataframe (and set a schema so that a lot of the validation would occur automatically) and the many existing functions should make it easy to write.

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56
  • If I read the csv as a dataframe then I can no longer do .map and .reduce functions right? I need the .map and .reduce to make several transformations to my data. I'm using a master and three slaves and I want to make use of all of my resources. If I use RDD then it parallelizes the tasks to the other nodes. – pandasCat Feb 13 '17 at 08:49
  • @CatherineAlv well let's say you go back and forth between RDDs and DataFrames if you have some specifics – eliasah Feb 13 '17 at 08:55
  • A quick glance on your code seems to suggest that everything you do in .map and .reduce can be done with dataframes (by using dataframes functions). Spark dataframes (which are part of the spark sql package in pyspark.sql) are distributed just like RDD (and much more optimized). If something is missing you can generally solve it with a UDF (although python UDF are slower than scala's and definitly slower than the available functions in pyspark.sql.functions). Try looking at the programming guide for spark sql for more information – Assaf Mendelson Feb 13 '17 at 09:00
  • @AssafMendelson do you have additional suggestions on the updates I've made? Are there additional optimizations I could make? – pandasCat Feb 19 '17 at 22:02
  • take a look at the built in functions. Using builtin functions is much faster than UDF. I see for example you are defining the modify_val UDF. Instead you might use when, otherwise. see pyspark.sql.functions. – Assaf Mendelson Feb 20 '17 at 05:06