0

There is this syntax: df.withColumn('new', regexp_replace('old', 'str', ''))

this is for replacing a string in a column.

My question is what if ii have a column consisting of arrays and string. Meaning a row could have either a string , or an array containing this string. Is there any way of replacing this string regardless of if it's alone or inside an array?

Wiktor Stribiżew
  • 607,720
  • 39
  • 448
  • 563
Barushkish
  • 69
  • 2
  • 9
  • What’s ur spark version ? With 2.4 u can use higher order function transform to do inside array – murtihash Apr 05 '20 at 17:33
  • I am not sure about the version. Can you show me how its done? – Barushkish Apr 05 '20 at 17:36
  • no need to explode array to apply regex in 2.4.u can apply on array(string) column `df.withColumn("new", F.expr("""transform(col1, x-> regexp_replace(x,'str',''))"""))` – murtihash Apr 06 '20 at 05:31
  • got an error u"cannot resolve '`df.my.ou`' given input columns: [my]; line 1 pos 10;\n'Project [my#914196, 'transform('df.my.ou, lambdafunction('regexp_replace(lambda 'x, yes, ), lambda 'x, false)) AS new#914198]\n+- Filter (my#914196.Source = 5)\n +- Project [my#914196]\n +- Generate explode(contents#914193), false, [my#914196]\n +- Relation[contents#914193] parquet\n" – Barushkish Apr 06 '20 at 07:31

2 Answers2

2

Having a column with multiple types is not currently supported. However, the column contained an array of string, you could explode the array (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=explode#pyspark.sql.functions.explode), which creates a row for each element in the array, and apply the regular expression to the new column. Example:

from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

sql_context = SQLContext(spark.sparkContext)

df = sql_context.createDataFrame([("hello world",),
                                  ("hello madam",),
                                  ("hello sir",),
                                  ("hello everybody",),
                                  ("goodbye world",)], schema=['test'])

df = df.withColumn('test', F.array(F.col('test')))

print(df.show())

df = df.withColumn('test-exploded', F.explode(F.col('test')))

df = df.withColumn('test-exploded-regex', F.regexp_replace(F.col('test-exploded'), "hello", "goodbye"))


print(df.show())

Output:

+-----------------+
|             test|
+-----------------+
|    [hello world]|
|    [hello madam]|
|      [hello sir]|
|[hello everybody]|
|  [goodbye world]|
+-----------------+

+-----------------+---------------+-------------------+
|             test|  test-exploded|test-exploded-regex|
+-----------------+---------------+-------------------+
|    [hello world]|    hello world|      goodbye world|
|    [hello madam]|    hello madam|      goodbye madam|
|      [hello sir]|      hello sir|        goodbye sir|
|[hello everybody]|hello everybody|  goodbye everybody|
|  [goodbye world]|  goodbye world|      goodbye world|
+-----------------+---------------+-------------------+

And if you wanted to put the results back in an array:

df = df.withColumn('test-exploded-regex-array', F.array(F.col('test-exploded-regex')))

Output:

+-----------------+---------------+-------------------+-------------------------+
|             test|  test-exploded|test-exploded-regex|test-exploded-regex-array|
+-----------------+---------------+-------------------+-------------------------+
|    [hello world]|    hello world|      goodbye world|          [goodbye world]|
|    [hello madam]|    hello madam|      goodbye madam|          [goodbye madam]|
|      [hello sir]|      hello sir|        goodbye sir|            [goodbye sir]|
|[hello everybody]|hello everybody|  goodbye everybody|      [goodbye everybody]|
|  [goodbye world]|  goodbye world|      goodbye world|          [goodbye world]|
+-----------------+---------------+-------------------+-------------------------+

Hope this helps!

Update

Updated to include case where the array column has several strings:

from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

sql_context = SQLContext(spark.sparkContext)

df = sql_context.createDataFrame([("hello world", "foo"),
                                  ("hello madam", "bar"),
                                  ("hello sir", "baz"),
                                  ("hello everybody", "boo"),
                                  ("goodbye world", "bah")], schema=['test', 'test2'])

df = df.withColumn('test', F.array(F.col('test'), F.col('test2'))).drop('test2')

df = df.withColumn('id', F.monotonically_increasing_id())

print(df.show())

df = df.withColumn('test-exploded', F.explode(F.col('test')))

df = df.withColumn('test-exploded-regex', F.regexp_replace(F.col('test-exploded'), "hello", "goodbye"))

df = df.groupBy('id').agg(F.collect_list(F.col('test-exploded-regex')).alias('test-exploded-regex-array'))


print(df.show())

Output:

+--------------------+-----------+
|                test|         id|
+--------------------+-----------+
|  [hello world, foo]|          0|
|  [hello madam, bar]| 8589934592|
|    [hello sir, baz]|17179869184|
|[hello everybody,...|25769803776|
|[goodbye world, bah]|25769803777|
+--------------------+-----------+

+-----------+-------------------------+
|         id|test-exploded-regex-array|
+-----------+-------------------------+
| 8589934592|     [goodbye madam, bar]|
|          0|     [goodbye world, foo]|
|25769803776|     [goodbye everybod...|
|25769803777|     [goodbye world, bah]|
|17179869184|       [goodbye sir, baz]|
+-----------+-------------------------+

Just drop the id column when you're finished processing!

danielcahall
  • 2,672
  • 8
  • 14
  • Thanx. But, when I try to convert it back to array it doesn't realy turn it to the original column but rather it stays exploded. It just takes each element and convert it array but it does it for each elemnt even though two rows are supposed to be one. Like before exploding – Barushkish Apr 06 '20 at 07:15
  • You could potentially add an `id` column such that when you `explode` rows that were derived from the same array have the same `id`, then just perform a `groupBy('id').agg(F.collect_list(F.col('test-exploded-regex')))` to ensure all strings are added to the same array. – danielcahall Apr 06 '20 at 13:48
  • thanks but it doesn't seem to work quite smoothly. As what you did gives an object only with id column and "test-exploded-regex' but it deletes all previously existing columns. Also, when I try to get the newly grouped column back to the first dataframe it givres me an error. It seem like this squeezed column doesn't fit exactly to the original one – Barushkish Apr 07 '20 at 10:29
  • How did you try to merge the new dataframe with the original? An inner join on the '`id` column should work, since the number of rows in the generated dataframe should match the number of rows in the original dataframe. – danielcahall Apr 07 '20 at 15:00
  • Ok this finally worked for me I upvoted ur answer and now accept it.. Can you answer me this question https://stackoverflow.com/questions/61138777/valueerror-cannot-convert-column-into-bool-please-use-for-and-foron how to drop a nested column? – Barushkish Apr 10 '20 at 10:55
  • Great! What was the issue? And sure thing – danielcahall Apr 10 '20 at 12:19
0

I think it is not possible in a dataframe in spark since the dataframe does not allow having multiple types for a column. It will give error while making the dataframe.

Though, you can do it using RDD's.

scala> val seq = Seq((1,"abc"),(2,List("abcd")))
seq: Seq[(Int, java.io.Serializable)] = List((1,abc), (2,List(abcd)))

scala> val rdd1 = sc.parallelize(seq)
rdd1: org.apache.spark.rdd.RDD[(Int, java.io.Serializable)] = ParallelCollectionRDD[2] at parallelize at <console>:26

scala> rdd1.take(2)
res1: Array[(Int, java.io.Serializable)] = Array((1,abc), (2,List(abcd)))

scala> val rdd2 = rdd1.map(x => x._2 match {
     | case v: String => (x._1, v.replaceAll("abc","def"))
     | case p: List[String] => (x._1, p.map(s => s.replaceAll("abc","def")))
     | }
     | )
rdd2: org.apache.spark.rdd.RDD[(Int, java.io.Serializable)] = MapPartitionsRDD[3] at map at <console>:25

scala> rdd2.take(2)
res2: Array[(Int, java.io.Serializable)] = Array((1,def), (2,List(defd)))

Let me know if it helps!!

Anand Sai
  • 1,566
  • 7
  • 11