12

I use spark-shell to do the below operations.

Recently loaded a table with an array column in spark-sql .

Here is the DDL for the same:

create table test_emp_arr{
    dept_id string,
    dept_nm string,
    emp_details Array<string>
}

the data looks something like this

+-------+-------+-------------------------------+
|dept_id|dept_nm|                     emp_details|
+-------+-------+-------------------------------+
|     10|Finance|[Jon, Snow, Castle, Black, Ned]|
|     20|     IT|            [Ned, is, no, more]|
+-------+-------+-------------------------------+

I can query the emp_details column something like this :

sqlContext.sql("select emp_details[0] from emp_details").show

Problem

I want to query a range of elements in the collection :

Expected query to work

sqlContext.sql("select emp_details[0-2] from emp_details").show

or

sqlContext.sql("select emp_details[0:2] from emp_details").show

Expected output

+-------------------+
|        emp_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

In pure Scala, if i have an array something as :

val emp_details = Array("Jon","Snow","Castle","Black")

I can get the elements from 0 to 2 range using

emp_details.slice(0,3)

returns me

Array(Jon, Snow,Castle)

I am not able to apply the above operation of the array in spark-sql.

Thanks

zero323
  • 322,348
  • 103
  • 959
  • 935
serious_black
  • 437
  • 2
  • 4
  • 14

8 Answers8

13

Since Spark 2.4 you can use slice function. In Python):

pyspark.sql.functions.slice(x, start, length)

Collection function: returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.

...

New in version 2.4.

from pyspark.sql.functions import slice

df = spark.createDataFrame([
    (10, "Finance", ["Jon", "Snow", "Castle", "Black", "Ned"]),
    (20, "IT", ["Ned", "is", "no", "more"])
], ("dept_id", "dept_nm", "emp_details"))

df.select(slice("emp_details", 1, 3).alias("empt_details")).show()
+-------------------+
|       empt_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

In Scala

def slice(x: Column, start: Int, length: Int): Column

Returns an array containing all the elements in x from index start (or starting from the end if start is negative) with the specified length.

import org.apache.spark.sql.functions.slice

val df = Seq(
    (10, "Finance", Seq("Jon", "Snow", "Castle", "Black", "Ned")),
    (20, "IT", Seq("Ned", "is", "no", "more"))
).toDF("dept_id", "dept_nm", "emp_details")

df.select(slice($"emp_details", 1, 3) as "empt_details").show
+-------------------+
|       empt_details|
+-------------------+
|[Jon, Snow, Castle]|
|      [Ned, is, no]|
+-------------------+

The same thing can be of course done in SQL

SELECT slice(emp_details, 1, 3) AS emp_details FROM df

Important:

Please note, that unlike Seq.slice, values are indexed from zero and the second argument is length, not end position.

zero323
  • 322,348
  • 103
  • 959
  • 935
  • for a large database, what if you want an array of values, ie. values that could be anywhere in the column? – sAguinaga Feb 28 '20 at 12:42
9

Here is a solution using a User Defined Function which has the advantage of working for any slice size you want. It simply builds a UDF function around the scala builtin slice method :

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val slice = udf((array : Seq[String], from : Int, to : Int) => array.slice(from,to))

Example with a sample of your data :

val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details")
df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show

Produces the expected output

+--------------------+-------------------+
|         emp_details|              slice|
+--------------------+-------------------+
|[Jon, Snow, Castl...|[Jon, Snow, Castle]|
+--------------------+-------------------+

You can also register the UDF in your sqlContext and use it like this

sqlContext.udf.register("slice", (array : Seq[String], from : Int, to : Int) => array.slice(from,to))
sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice(array('Jon‌​','Snow','Castle','Black','Ned'),0,3)")

You won't need lit anymore with this solution

cheseaux
  • 5,187
  • 31
  • 50
  • Thanks, it works but doesn't exactly suit my case . I have been trying to get everything in a single SQL statement . something like this : `sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned'),slice((array('Jon', 'Snow', 'Castle', 'Black', 'Ned')),0,3)").show` . I have to agree that i can make a workaround with the solution you gave , though it would make me go for a two-step process . Also i have been wondering why `lit` is needed (sorry just started up with scala) . Including `lit` in my above code throws not found exception , else throws not found exception for slice – serious_black Oct 20 '16 at 03:34
  • you have to import `[...]functions.lit` as stated in the answer. `lit` creates a column which contains the same value in all rows (it stands for "literal"). It's needed for type-checking (try without to see). You can avoid it using a higher order function circumvolved: `def slice(from : Int, to : Int) = udf((array : Seq[String]) => array.slice(from,to))`, which would enable the following synthax: `df.withColumn("slice", slice(0, 3)($"emp_details") ).show` but it is not necessarily simpler. – Wilmerton Oct 20 '16 at 06:24
  • @Wilmerton : I am sorry , but i am thinking that this is not an `import` issue since i can `val df = sqlContext.sql("select array('Jon', 'Snow', 'Castle', 'Black', 'Ned') as emp_details") df.withColumn("slice", slice($"emp_details", lit(0), lit(3))).show` as pointed by @cheseaux . The `lit` works in this case , what i am trying to achive this two step process in a single step , which might be something like : `sqlContext.sql("select array('Jon','Snow','Castle','Black','Ned'),slice((array('Jon','Snow','Castle','Black','Ned')),lit(0),lit(3))")` . – serious_black Oct 20 '16 at 06:43
  • 1
    ha, I misunderstood. You can register udf's to the spark context so it would be available in `sql` http://stackoverflow.com/questions/31278110/how-do-i-register-a-function-to-sqlcontext-udf-in-scala – Wilmerton Oct 20 '16 at 06:47
  • @cheseaux : thanks a lot , it works , just one last thing , what exactly does `lit` do [a simple example would suffice] ? – serious_black Oct 20 '16 at 06:56
  • @thinkinbee Wilmerton already explained to you, this creates a column with a constant value. You can read more [here](http://spark.apache.org/docs/2.0.1/api/scala/index.html#org.apache.spark.sql.functions$) – cheseaux Oct 20 '16 at 06:59
  • @cheseaux : Thanks , how can i `sum up` the values of this array using `spark sql` in case my data has some `long or integers` ? . Expecting something like `sqlContext.sql("select sum(slice(Array_of_Ints,0,4)) from mytable")` to work , but it doesn't – serious_black Oct 20 '16 at 09:29
  • `sum` is an aggregate function (sums the element of a column). Please create a new question here on SO. – cheseaux Oct 20 '16 at 09:32
3

Edit2: For who wants to avoid udf at the expense of readability ;-)

If you really want to do it in one step, you will have to use Scala to create a lambda function returning an sequence of Column and wrap it in an array. This is a bit involved, but it's one step:

val df = List(List("Jon", "Snow", "Castle", "Black", "Ned")).toDF("emp_details")

df.withColumn("slice", array((0 until 3).map(i => $"emp_details"(i)):_*)).show(false)    


+-------------------------------+-------------------+
|emp_details                    |slice              |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+

The _:* works a bit of magic to pass an list to a so-called variadic function (array in this case, which construct the sql array). But I would advice against using this solution as is. put the lambda function in a named function

def slice(from: Int, to: Int) = array((from until to).map(i => $"emp_details"(i)):_*))

for code readability. Note that in general, sticking to Column expressions (without using `udf) has better performances.

Edit: In order to do it in a sql statement (as you ask in your question...), following the same logic you would generate the sql query using scala logic (not saying it's the most readable)

def sliceSql(emp_details: String, from: Int, to: Int): String = "Array(" + (from until to).map(i => "emp_details["+i.toString+"]").mkString(",") + ")"
val sqlQuery = "select emp_details,"+ sliceSql("emp_details",0,3) + "as slice from emp_details"

sqlContext.sql(sqlQuery).show

+-------------------------------+-------------------+
|emp_details                    |slice              |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
+-------------------------------+-------------------+

note that you can replace until by to in order to provide the last element taken rather than the element at which the iteration stops.

Wilmerton
  • 1,448
  • 1
  • 12
  • 31
1

You can use the function array to build a new Array out of the three values:

import org.apache.spark.sql.functions._

val input = sqlContext.sql("select emp_details from emp_details")

val arr: Column = col("emp_details")
val result = input.select(array(arr(0), arr(1), arr(2)) as "emp_details")

val result.show()
// +-------------------+
// |        emp_details|
// +-------------------+
// |[Jon, Snow, Castle]|
// |      [Ned, is, no]|
// +-------------------+
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • downvoters: care to explain? This is a working solution, and the shortest suggested here. ? – Tzach Zohar Oct 19 '16 at 16:18
  • 4
    I didn't down-vote, but my guess is that it is not as `splice`-like behaviour, like asked in the question. If you want to take a slice of the type `[4:400]`, your solution (as-is) would be cumbersome. – Wilmerton Oct 20 '16 at 06:30
0

use selecrExpr() and split() function in apache spark.

for example :

fs.selectExpr("((split(emp_details, ','))[0]) as e1,((split(emp_details, ','))[1]) as e2,((split(emp_details, ','))[2]) as e3);
PradhanKamal
  • 540
  • 4
  • 18
  • 1
    This doesn't even compile (seems to be missing `")` at the end?) and doesn't work even when fixed - the `emp_details` column is an Array, not a comma-separated String column. – Tzach Zohar Oct 19 '16 at 15:29
0

Here is my generic slice UDF, support array with any type. A little bit ugly because you need to know the element type in advance.

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

def arraySlice(arr: Seq[AnyRef], from: Int, until: Int): Seq[AnyRef] =
  if (arr == null) null else arr.slice(from, until)

def slice(elemType: DataType): UserDefinedFunction = 
  udf(arraySlice _, ArrayType(elemType)

fs.select(slice(StringType)($"emp_details", 1, 2))
Bewang
  • 453
  • 3
  • 18
0

For those of you stuck using Spark < 2.4 and don't have the slice function, here is a solution in pySpark (Scala would be very similar) that does not use udfs. Instead it uses the spark sql functions concat_ws, substring_index, and split.

This will only work with string arrays. To make it work with arrays of other types, you will have to cast them into strings first, then cast back to the original type after you have 'sliced' the array.

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (SparkSession.builder
    .master('yarn')
    .appName("array_slice")
    .getOrCreate()
)

emp_details = [
    ["Jon", "Snow", "Castle", "Black", "Ned"],
    ["Ned", "is", "no", "more"]
]

df1 = spark.createDataFrame(
    [tuple([emp]) for emp in emp_details],
    ["emp_details"]
)

df1.show(truncate=False)
+-------------------------------+
|emp_details                    |
+-------------------------------+
|[Jon, Snow, Castle, Black, Ned]|
|[Ned, is, no, more]            |
+-------------------------------+
last_string = 2

df2 = (
    df1
    .withColumn('last_string', (F.lit(last_string)))
    .withColumn('concat', F.concat_ws(" ", F.col('emp_details')))
    .withColumn('slice', F.expr("substring_index(concat, ' ', last_string + 1)" ))
    .withColumn('slice', F.split(F.col('slice'), ' '))
    .select('emp_details', 'slice')
)

df2.show(truncate=False)
+-------------------------------+-------------------+
|emp_details                    |slice              |
+-------------------------------+-------------------+
|[Jon, Snow, Castle, Black, Ned]|[Jon, Snow, Castle]|
|[Ned, is, no, more]            |[Ned, is, no]      |
+-------------------------------+-------------------+
Clay
  • 2,584
  • 1
  • 28
  • 63
-1

Use nested split:

split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',')

scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession

scala> val spark=SparkSession.builder().getOrCreate()
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@1d637673

scala> val df = spark.read.json("file:///Users/gengmei/Desktop/test/test.json")
18/12/11 10:09:32 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]

scala> df.createOrReplaceTempView("raw_data")

scala> df.show()
+-------+-------+--------------------+
|dept_id|dept_nm|         emp_details|
+-------+-------+--------------------+
|     10|Finance|[Jon, Snow, Castl...|
|     20|     IT| [Ned, is, no, more]|
+-------+-------+--------------------+


scala> val df2 = spark.sql(
     | s"""
     | |select dept_id,dept_nm,split(split(concat_ws(',',emp_details),concat(',',emp_details[3]))[0],',') as emp_details from raw_data
     | """)
df2: org.apache.spark.sql.DataFrame = [dept_id: bigint, dept_nm: string ... 1 more field]

scala> df2.show()
+-------+-------+-------------------+
|dept_id|dept_nm|        emp_details|
+-------+-------+-------------------+
|     10|Finance|[Jon, Snow, Castle]|
|     20|     IT|      [Ned, is, no]|
+-------+-------+-------------------+
Nick
  • 138,499
  • 22
  • 57
  • 95