1

I need to delete rows that are duplicated in one column based on duplicates in another column using sparklyr.

The iris data set has a number of observations for which 4 features are identical. The values for Sepal.Width, Petal.Length, Petal.Width and Species are similar (rows only differ for Sepal.Length column).

Let's create a copy of iris in spark

library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3") 

iris_spark <- copy_to(sc, iris)

Base R method

This is the base R method that would remove duplicated rows keeping only the row with the largest value for Sepal.Length:

iris_order = iris[order(iris[,'Sepal.Length'],-iris[,'Sepal.Length']),] ### sort first
iris_subset = iris_order[!duplicated(iris_order$Sepal.Length),] ### Keep highest
dim(iris_subset) # 35 5

but this doesn't work on tbl_spark object:

iris_spark_order = iris_spark[order(iris_spark[,'Sepal.Length'],-iris_spark[,'Sepal.Length']),]

Error in iris_spark[, "Sepal.Length"] : incorrect number of dimensions

Tidyverse

There are two possible dplyr solutions that I can think of which work for a data.frame but not tbl_spark:

1)

library(dplyr)
iris %>% distinct()
iris_spark %>% distinct()

Error: org.apache.spark.sql.AnalysisException: cannot resolve '`Sepal.Length`' given input columns: [iris.Sepal_Length, iris.Sepal_Width, iris.Petal_Width, iris.Petal_Length, iris.Species]; line 1 pos 16;
'Distinct
+- 'Project ['Sepal.Length]
   +- SubqueryAlias iris
      +- LogicalRDD [Sepal_Length#13, Sepal_Width#14, Petal_Length#15, Petal_Width#16, Species#17], false

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:92)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at sparklyr.Invoke.invoke(invoke.scala:147)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:123)
    at sparklyr.StreamHandler.read(stream.scala:66)
    at sparklyr.BackendHandler.channelRead0(handler.scala:51)
    at sparklyr.BackendHandler.channelRead0(handler.scala:4)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Unknown Source)

2)

iris_order <- arrange(iris, Sepal.Length)
iris_subset <- iris_order [!duplicated(iris_order $Sepal.Length),]

but doesn't work on tbl_spark object:

library(dplyr)
iris_order <- arrange(iris_spark, Sepal.Length)
iris_subset <- iris_order [!duplicated(iris_order$Sepal.Length),]

Error in iris_order[!duplicated(iris_order$Sepal.Length), ] : incorrect number of dimensions

data.table

The DT solution for a data.frame

library(data.table)
df <- iris # iris resides in package that is locked so copy to new object
unique(setDT(df)[order(Sepal.Length, -Species)], by = "Sepal.Length")

but doesn't work on tbl_spark object:

unique(setDT(iris_spark)[order(Sepal.Length)], by = "Sepal.Length")

Error in setDT(iris_spark) : All elements in argument 'x' to 'setDT' must be of same length, but the profile of input lengths (length:frequency) is: [1:1, 2:1] The first entry with fewer than 2 entries is 1

So how does one actually accomplish this task in Spark with sparklyr?

  • I guess `filter` would work `library(dplyr); iris_spark %>% group_by(Sepal.Length) %>% filter(n() ==1)` or try with `distinct` `iris_spark %>% distinct(Sepal.Length)` – akrun Jan 12 '20 at 18:40
  • Thanks @akrun. The `library(dplyr); iris_spark %>% group_by(Sepal.Length) %>% filter(n() == 1)` solution does the trick. I was trying the `iris_spark %>% distinct(Sepal.Length)` method last night which also tosses an error; however, I forgot to add it to this post (I'll do that now). – Matthew J. Oldach Jan 12 '20 at 18:44
  • Yes, some methods are not yet compatible – akrun Jan 12 '20 at 18:47
  • @Matthew: If there are two or more rows that have the same value for Sepal.length column, akrun's solution will remove both rows. Is this what you want, or do you want to keep at least one row? – Xiaojie Zhou Aug 11 '20 at 16:25

2 Answers2

1

The filter would work with sparklyr

library(dplyr)
library(sparklyr)
iris_spark %>% 
    group_by(Sepal.Length) %>% 
    filter(n() ==1)
akrun
  • 874,273
  • 37
  • 540
  • 662
  • Seems like you forgot to include `arrange` ‒ right now it will take arbitrary row, not "the row with the largest value for Sepal.Length". Also `distinct` is compatible with sparklyr ([though not very useful here](https://stackoverflow.com/a/33878701/10938362)). Based on traceback it seems that OP used wrong column name (`sparklyr` doesn't support dotted names, so columns are renamed). – user10938362 Jan 13 '20 at 19:52
  • @user10938362 Thanks, I couldn't test because I had the java issue comptability. Perhaps if it is backquoted, may be it works – akrun Jan 13 '20 at 19:57
1

If the problem was as simple as stated in the question, where you want to take highest value of one column, given n - 1 grouping columns, a simple aggregation would suffice:

iris_spark %>% 
  group_by(Sepal_Width, Petal_Length, Petal_Width, Species) %>% 
  summarise(Sepal_Length=max(Sepal_Length))

If you didn't care which value you'd get*, and number of columns would differ, you could could drop duplicates (this internally uses first, which cannot be used in dplyr without window):

iris_spark %>% 
  spark_dataframe() %>% 
  invoke(
    "dropDuplicates",
    list("Sepal_Width", "Petal_Length" ,"Petal_Width", "Species")) %>% 
  sdf_register()

If you care about the order, arkun's solution is technically correct, but not very scalable. Instead you could combine remaining columns into a struct and take its max (structs use lexicographic ordering).

iris_spark %>%
  group_by(Sepal_Width, Petal_Length, Petal_Width, Species) %>% 
  # You can put additional values in the struct
  summarise(values=max(struct(Sepal_Length))) %>% 
  mutate(Sepal_Length=values.Sepal_Length) 

* It is important to stress out that any preceding ordering is ignored, even if toy examples might indicate otherwise.

10465355
  • 4,481
  • 2
  • 20
  • 44