-4

I am new to spark(2.x.x) using spark-sql , I created a dataframe using spark sql context.

dff = sqlCtx.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema","true").option("delimiter","\t").load("/home/hduser/Desktop/allFromDesktop/pyspark/creditData.csv")

dff.show()

Income|Limit|Rating|Cards|Age|Education|Gender|Student|Married|       Ethnicity|Balance|Age_class|
+---+------------------+-----+------+-----+---+---------+------+-------+-------+----------------+-------+---------+
|  0|14.890999999999998| 3606|   283|    2| 34|       11|  Male|     No|    Yes|       Caucasian|    333|    25-34|
|  1|           106.025| 6645|   483|    3| 82|       15|Female|    Yes|    Yes|           Asian|    903|      65+|
|  2|104.59299999999999| 7075|   514|    4| 71|       11|  Male|     No|     No|           Asian|    580|      65+|

i tried unit testing the code

tab=dff.select(['Age_class','Balance','Limit']).groupby('Age_class').agg
  (F.count('Limit')  ,F.mean('Limit').alias('Limit_avg'),F.min('Limit').alias('Limit_min'),F.max('Limit').alias('Limit_max')).withColumn('total',sum(col('Limit')).
  over(Window)).withColumn('Percent',col('Limit')*100/col('total')).drop(c
  ol('total'))
  tab.show()

and found that as soon as agg(),count function executes the original column gets replaced by new .allias column names unit test #1 (first part of the python code executed succesfully)

tab=dff.select(['Age_class','Balance','Limit']).groupby('Age_class').agg(F.count('Limit_count')
    ,F.mean('Limit').alias('Limit_avg'),
    F.min('Limit').alias('Limit_min'),
    F.max('Limit').alias('Limit_max'))
    tab.show()

With output:

output:
---------+------------+------------------+---------+---------+
|Age_class|count(Limit)|         Limit_avg|Limit_min|Limit_max|
+---------+------------+------------------+---------+---------+
|    45-54|          65| 4836.630769230769|      855|    11200|
|      <25|          11|3932.6363636363635|     2120|     6375|
|    55-64|          68|            4530.0|     1311|    11966|
##Here you can see i lost my original 'Limit' column ##

Original Dataframe which has a column 'Limit' got deleted (why??) replaced by new column i.e i loose my original dataframe column when second part of the code runs it is unable to find original column in the dataframe . soo.. Rest of the part of the statment i.e *.withColumn('total',sum(col('Limit')). over(Window)).withColumn('Percent',col('Limit')100/col('total')).drop(col('total'))

Shows the error :

Py4JJavaError: An error occurred while calling o2104.withColumn.
: org.apache.spark.sql.AnalysisException: cannot resolve '`Limit`' given input columns: [Age_class, Limit_max, Limit_min, Limit_avg, count(Limit)];;

Now when i changed my groupby clause to -->>.groupby('Age_class','Limit') insted of .groupby('Age_class') my code gets executed with desired result ** QUESTION 1: Why i need to add "Limit" column to groupby() clause ?? When i alredy mentioned it in SELECT statement QUESTION 2:After execution the "Age_class" column did not get converted to groups(bins) even though iam using "groupby" see expected result table below i was expecting some thing like ??

+---------+----------+------------------+----------+----------+-------+
|age_class|Limit|count(Limit)|Limit_avg|Limit_min|Limit_max|Percentage
+---------+----------+------------------+----------+----------+-------+
|
  45-54     |120|3183.0666666666666|338|12612|12.0||
  <25       |150| 2970.733333333333|276|15672|15.0||
  55-64     |56| 3493.660714285714|385|15945|5.6||
  35-44     |254| 3403.771653543307|250|15857|25.4||
  25-34     |397| 3298.823677581864|343|18424|39.7||
  65+       |23|3210.1739130434785|571|14896|2.3|
+---------+----------+------------------+----------+----------+-------+
tab=dff.select(['Age_class','Balance','Limit']).groupby('Age_class','Limit').agg(F.count('Limit')
,F.mean('Limit').alias('Limit_avg'),
F.min('Limit').alias('Limit_min'),
F.max('Limit').alias('Limit_max')).withColumn('total',sum(col('Limit')).over(Window)).withColumn('Percent',col('Limit')*100/col('total')).drop(col('total'))
tab.show()

Actual output('Age_class' Not converted into groups(a.k.a bins)):

+---------+-----+------------+---------+---------+---------+-------------------+
|Age_class|Limit|count(Limit)|Limit_avg|Limit_min|Limit_max|            Percent|
+---------+-----+------------+---------+---------+---------+-------------------+
|    45-54| 7838|           1|   7838.0|     7838|     7838| 0.4137807247233719|
|    35-44|  886|           1|    886.0|      886|      886|0.04677337612974069|
|    45-54| 4632|           1|   4632.0|     4632|     4632|  0.244530788073317|
|    55-64| 1448|           1|   1448.0|     1448|     1448|0.07644226708336853|
|    55-64| 5107|           1|   5107.0|     5107|     5107| 0.2696068080074331|
|    45-54| 2586|           1|   2586.0|     2586|     2586| 0.1365191316834192|
|    35-44| 4159|           1|   4159.0|     4159|     4159| 0.2195603513810288|
|    45-54| 4943|           1|   4943.0|     4943|     4943| 0.2609489821775488|
|    45-54| 2558|           1|   2558.0|     2558|     2558|0.13504096629782922|
|    25-34| 3969|           1|   3969.0|     3969|     3969|0.20952994340738237|
|    35-44| 5319|           1|   5319.0|     5319|     5319| 0.2807986316411859|
|    45-54| 8100|           1|   8100.0|     8100|     8100| 0.4276121294028212|
|    45-54| 6040|           1|   6040.0|     6040|     6040| 0.3188613903201284|
|    45-54| 4673|           1|   4673.0|     4673|     4673|0.24669524453078806|
|      65+| 2330|           1|   2330.0|     2330|     2330| 0.1230044767294535|
|    45-54| 6922|           1|   6922.0|     6922|     6922| 0.3654235999662134|
|      65+| 4263|           1|   4263.0|     4263|     4263|0.22505067995607736|
|    25-34| 4391|           1|   4391.0|     4391|     4391|0.23180800743306024|
|      65+| 7499|           1|   7499.0|     7499|     7499|0.39588436523355014|
|    45-54| 8732|           1|   8732.0|     8732|     8732|  0.460976433820424|
+---------+-----+------------+---------+---------+---------+-------------------+
  • Because you grouped by `Age_class`. Do you want to group by both? – pault May 15 '19 at 20:04
  • no only Age_class. – vikas singh May 15 '19 at 20:06
  • 2
    @vikassingh: Please don't add new questions (especially not when somebody already answered, answers will get out-of-date then). In this case however the reason it's not grouping as you want is due to you grouping on two columns, `Age_class` and `Limit`. You say you want to keep the `Limit` rows after groupnig but the new dataframe (grouping by only `Age_class`) will have a different number fo rows. You need to think about which value of `Limit` would you like to keep. You already have min, max and mean. – Shaido May 16 '19 at 02:41

1 Answers1

1

Like @pault said, because you only grouped by Age_class. If you use aggregation functions, the resulted dataframe will only return the aggregated columns (i.e the targets you are trying to aggregate : count(Limit) Limit_avg Limit_min Limit_max()) and the dimension columns against which you apply aggregation functions (i.e Age_class).

If you want to preserve 'Limit', you should at least apply some aggregation function against it, for example :

tab=dff.select(['Age_class','Balance','Limit']) \
.groupby('Limit', 'Age_class') \
.agg(F.count('Limit_count'),
     F.mean('Limit').alias('Limit_avg'),
     F.min('Limit').alias('Limit_min'),
     F.max('Limit').alias('Limit_max'))

As for why you should include 'Limit' in groupby(), generally speaking, you will write something like :

df.select(col1, col2, col3, col4) \
.groupBy(col1, col3) \
.aggregate(F.count(col1), F.sum(col3))

You can think what you did in this way :

For given PySpark DataFrame df, we select part of its columns col1... col4 to get a smaller DataFrame df.select(col1, col2, col3, col4).

And for this smaller DataFrame, we want to check some aggregated results against some dimensions : we want to know for the dimension we care, that is col1 and col3, how many rows of the dimension col1 (i.e F.count(col1)) and what is the total sum for the dimension col3 (i.e F.sum(col3).

And what are those dimensions I care about? they are defined in .groupBy(col1, col3).

helloworld
  • 613
  • 8
  • 24
  • plz look at question 2 :After execution the "Age_class" column did not get converted to groups(bins) even though iam using "groupby" see expected result table below i was expecting some thing like ?? .........Age _class data type is string – vikas singh May 15 '19 at 22:09