I would like to process data using Spark on Azure HDI cluster(16 nodes, 96 cores). The data is in blobs text files (several text files ~ 1000 files each about 100k-10M). Although the cluster suppose to be sufficient, processing a few GB data takes ages (even just counting) what am I doing wrong? If I save the dataframe to parquet, will it be distributed over nodes?
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
logging.info('start')
sc = SparkContext('spark://headnodehost:7077', 'pyspark')
sqlContext = SQLContext(sc)
logging.info('context: %s', sc)
def get_df(path, sep = '\t', has_header = True):
rdd = sc.textFile(path, 50)
rddsplit = rdd.map(lambda x: [str(xx) for xx in x.split(sep)], preservesPartitioning=True)
if has_header:
header = rddsplit.first()
logging.info(header)
schema = StructType([StructField(h, StringType(), True) for h in header])
rdd_no_header = rddsplit.filter(lambda x: x!=header)
df = sqlContext.createDataFrame(rdd_no_header, schema).persist()
else:
df = sqlContext.createDataFrame(rddsplit).persist()
return df
path = r"wasb://blob_name@storage_name.blob.core.windows.net/path/*/*.tsv"
df = get_df(path)
logging.info('type=%s' , type(df))
df.count().show()
alert_count = df.groupBy('ColumnName').count().show()
Thanks, Hanan