0

I have written a small code in pyspark to generate quantiles on a set of columns and I am calling this function using concurrent.futures,Because I want this to be done on two set of columns parallelly.

But instead of the function which should execute from Threadpool.executor,the whole code is getting executed thrice.

I am calling the function generate_advisor_quartiles() from the main() method of another python program.

from src.utils import sql_service, apa_constant as constant
from pyspark.sql.functions import monotonicallyIncreasingId
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext,HiveContext
from pyspark.sql.functions import lit
from pyspark.sql.types import *
from pyspark.ml.feature import
from concurrent.futures import *
from pyspark.ml.feature import Bucketizer


import numpy as np
import pandas as pd
import os

def generate_Quantiles(df,attr_list,spark_context):
    jdf = df._jdf
    quantileList = []
    sqlContext = SQLContext(spark_context)
    fields = [StructField('attribute', StringType(), True),
              StructField('col1', DoubleType(), True),
              StructField('col2', DoubleType(), True),
              StructField('col3', DoubleType(), True),
              StructField('col4', DoubleType(), True),
              StructField('col5', DoubleType(), True)]
    schema = StructType(fields)
    for var in attr_list:
        bindt = spark_context._jvm.com.dstsystems.apa.util.DFQuantileFunction.approxQuantile(jdf,[var.col_name],[0.0, 0.25, 0.5, 0.75, 1.0],0.0)
        q0 = bindt[0][0]
        q1 = bindt[0][1]
        q2 = bindt[0][2]
        q3 = bindt[0][3]
        q4 = bindt[0][4]
        colQuantileList = [q0,q1,q2,q3,q4]
        quantileList.append(colQuantileList)
        bindt = sorted(list(set(list(bindt[0]))))
        bindt = [-float("inf")] + bindt
        bindt.insert(len(bindt), float("inf"))
        bindt.insert(len(bindt), float("NaN"))
        bucketizer = Bucketizer().setInputCol(var.col_name).setOutputCol("{}_quantile".format(var.col_name)).setSplits(bindt)
        df = bucketizer.transform(df)
        df = df.withColumn("{}_quantile".format(var.col_name),(lit(4.0) - df["{}_quantile".format(var.col_name)]))
        df.drop(var.col_name)

    quantileRDD = spark_context.parallelize(quantileList)
    quantileDF = sqlContext.createDataFrame(quantileRDD,schema)
    df.count()
    quantileDF.count()
    return df,quantileDF




def generate_advisor_quartiles(spark_context, hive_context, log, **kwargs):

    log.info("Started - Generate adviser quartile reports ")

    sql = """describe dbName.tablename""" #.format(kwargs['sem_db'])
    op = hive_context.sql(sql)
    res = op.withColumn("ordinal_position", monotonicallyIncreasingId())
    res.registerTempTable('attribs')
    id_lst = hive_context.sql(
        "select col_name from attribs where ordinal_position <= 24 order by ordinal_position").collect()

    sql = "select %s from %s.tablename " % ((", ".join(str(v.col_name) for v in id_lst)), kwargs['sem_db'])
    id_tbl = hive_context.sql(sql)

    attr_lst = hive_context.sql(
        """select col_name from attribs where ordinal_position > 24 AND col_name not like '%vehicle%'
            AND col_name not like '%cluster_num%'
            AND col_name not like '%value_seg%'order by ordinal_position limit 2""").collect()

    vhcl_lst = hive_context.sql(
        """select col_name from attribs where ordinal_position > 24 AND col_name not like '%vehicle%'
            AND (   col_name like '%vehicle%'
            OR col_name IN ('cluster_num', 'value_seg')
            ) order by ordinal_position""").collect()

    sqltemp ="select %s from %s.Tablename" % ((", ".join(['entity_id'] + [str(vhcl.col_name) for vhcl in vhcl_lst])),kwargs['sem_db'])
    id_tbl = hive_context.sql(sqltemp)
    attr_lst1 =   attr_lst[:len(attr_lst)//2]
    attr_lst2 = attr_lst[len(attr_lst) // 2:]
    # sqltemp = "select cast(entity_id as decimal(38,20)) , %s from %s.tablename where ud_rep = 1" % (", ".join("cast(" + str(attr.col_name) + " as decimal(38,20))" for attr in attr_lst), kwargs['sem_db'])
    sqltemp1 = "select cast(entity_id as double) , %s from %s.tablename where ud_rep = 1" % (", ".join("cast(" + str(attr.col_name) + " as double)" for attr in attr_lst1), kwargs['sem_db'])
    sqltemp2 = "select cast(entity_id as double) , %s from %s.tablename where ud_rep = 1" % (", ".join("cast(" + str(attr.col_name) + " as double)" for attr in attr_lst2), kwargs['sem_db'])

    df1 = hive_context.sql(sqltemp1)
    df1 = df1.replace(0, np.nan)

    df2 = hive_context.sql(sqltemp2)
    df2 = df2.replace(0, np.nan)
    with ThreadPoolExecutor(max_workers=2) as executor2:
        result1 = executor2.submit(generate_Quantiles, df1,attr_lst1,spark_context)
        result2 = executor2.submit(generate_Quantiles, df2,attr_lst2,spark_context)
        future_list = [result1, result2]
        for future in as_completed(future_list):
                print("completed")

    df1,df2 = result1.result()
    df3,df4 = result2.result()

    finalQuantiles = df1.join(df3,"entity_id","inner")
    quantilValuesDF = df2.union(df4)
    finalQuantiles.show()
    quantilValuesDF.show()
Vishwanath560
  • 23
  • 1
  • 6
  • Can't you use the one provided by Spark? See https://stackoverflow.com/questions/31432843/how-to-find-median-and-quantiles-using-spark – sramalingam24 Apr 10 '19 at 14:58
  • @sramalingam24 : I am using spark 1.6 which does not have quantile provided in spark by default. BTW, Question is not about using quantiles, It is related to concurent.futures not working as expected.Thanks for your response though :) – Vishwanath560 Apr 12 '19 at 04:59

0 Answers0