-1

I am new to PySpark. What will be the below sql's PySpark equivalent code ? I understand that I can create a temp view and run this sql, however, I am looking for PySpark specific solution.

SELECT 
     MAX(CASE WHEN col1 = 'A' THEN 1
              WHEN col2 = 'B' THEN 2
              ELSE 3
         END) AS my_col
FROM mytab

How can I represent the above mentioned sql code into PySpark code ?

Thanks in advance.

Matthew
  • 315
  • 3
  • 5
  • 16
  • 1
    Does this answer your question? [Spark Equivalent of IF Then ELSE](https://stackoverflow.com/questions/39048229/spark-equivalent-of-if-then-else) – samkart Aug 28 '23 at 16:17

3 Answers3

2

I would do it like this

  1. test setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, max, column

spark = (
    SparkSession
    .builder
    .getOrCreate()
)

columns = ["col1","col2"]
data = [("A", "B"), ("C", "B") ,("A", None), (None, "B"), ("B", "A")]
df = spark.createDataFrame(data).toDF(*columns)
  1. create a new column "my_col_values"
df = df.withColumn(
    "my_col_values", 
    when(df.col1 == "A", 1)
    .when(df.col2 == "B", 2)
    .otherwise(None)
)
df.show()

which will result in:

+----+----+-------------+                                                       
|col1|col2|my_col_values|
+----+----+-------------+
|   A|   B|            1|
|   C|   B|            2|
|   A|null|            1|
|null|   B|            2|
|   B|   A|         null|
+----+----+-------------+
  1. select the maximum
df.select(max(df.my_col_values).alias("my_col").show()

which will result in

+------+
|my_col|
+------+
|     2|
+------+

Of course you can do it in one go as well:

df = df.withColumn(
    "my_col_values", 
    when(df.col1 == "A", 1)
    .when(df.col2 == "B", 2)
    .otherwise(None)
).select(max(column("my_col_values")).alias("my_col")).show()
kahobe
  • 36
  • 4
1

try this :

from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, max as max_

spark = SparkSession.builder.appName("SQLtoPySpark").getOrCreate()

result = mytab.select(
    max_(
        when(col("col1") == 'A', 1)
        .when(col("col2") == 'B', 2)
        .otherwise(3)
    ).alias("my_col")
)

result.show()
Aymen Azoui
  • 369
  • 2
  • 4
1
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import *
import sys
from pyspark.sql.functions import when, max, greatest

sc = SparkContext('local')
sqlContext = SQLContext(sc)

df = sc.parallelize([
    [1, 'A', 'B'],
    [2, 'B', 'A'],
    [3, 'D', 'C'],
    [4, 'A', 'N'],
    [5, 'N', 'B'],
    [6, 'A', 'A'],
    [6, 'B', 'B']
]).toDF('id: integer, col1: string, col2: string')
df.show(20, False)

df = df.withColumn("my_col", when(df.col1 == 'A', 1).when(df.col2 == 'B', 2).otherwise(3))
df.show(20, False)

df_answer = df.select(max(col("my_col")))
df_answer.show(20, False)

Outputs:

+---+----+----+
|id |col1|col2|
+---+----+----+
|1  |A   |B   |
|2  |B   |A   |
|3  |D   |C   |
|4  |A   |N   |
|5  |N   |B   |
|6  |A   |A   |
|6  |B   |B   |
+---+----+----+

+---+----+----+------+
|id |col1|col2|my_col|
+---+----+----+------+
|1  |A   |B   |1     |
|2  |B   |A   |3     |
|3  |D   |C   |3     |
|4  |A   |N   |1     |
|5  |N   |B   |2     |
|6  |A   |A   |1     |
|6  |B   |B   |2     |
+---+----+----+------+

+-----------+
|max(my_col)|
+-----------+
|3          |
+-----------+
samkart
  • 6,007
  • 2
  • 14
  • 29
user238607
  • 1,580
  • 3
  • 13
  • 18