2

I would like to process data using Spark on Azure HDI cluster(16 nodes, 96 cores). The data is in blobs text files (several text files ~ 1000 files each about 100k-10M). Although the cluster suppose to be sufficient, processing a few GB data takes ages (even just counting) what am I doing wrong? If I save the dataframe to parquet, will it be distributed over nodes?

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.info('start')

sc = SparkContext('spark://headnodehost:7077', 'pyspark')
sqlContext = SQLContext(sc)
logging.info('context: %s', sc)

def get_df(path, sep = '\t', has_header = True):
    rdd = sc.textFile(path, 50) 
    rddsplit = rdd.map(lambda x: [str(xx) for xx in x.split(sep)],  preservesPartitioning=True)
    if has_header:
        header = rddsplit.first()
        logging.info(header)
        schema = StructType([StructField(h, StringType(), True) for h in header])
        rdd_no_header = rddsplit.filter(lambda x: x!=header)
        df = sqlContext.createDataFrame(rdd_no_header, schema).persist()
    else:
        df = sqlContext.createDataFrame(rddsplit).persist()
    return df

path = r"wasb://blob_name@storage_name.blob.core.windows.net/path/*/*.tsv"
df = get_df(path)
logging.info('type=%s' , type(df))
df.count().show()
alert_count = df.groupBy('ColumnName').count().show()

Thanks, Hanan

zero323
  • 322,348
  • 103
  • 959
  • 935
Hanan Shteingart
  • 8,480
  • 10
  • 53
  • 66
  • 1
    I don't see any particular reason why it is slow but here are some notes: 1) With 96 you should be able to safely increase number of partitions. Right now you utilize only half of your cluster 2) ` preservesPartitioning` has no effect here 3) [`cacheTable`](http://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory) is preferred method of in-memory caching 4) [Dropping header can be done without full filter](http://stackoverflow.com/a/27857878/1560062) 5) I am not sure if it make sense to reinvent [spark-csv](https://github.com/databricks/spark-csv) – zero323 Nov 14 '15 at 02:26
  • Thanks! Would (4) work for multiple files (the number of files is larger than the number of partitions)? Regarding (5), I have seen it but I cannot find a way to install this on an Azure cluster (HDInsight). – Hanan Shteingart Dec 07 '15 at 02:46

0 Answers0