4

I have the following function which can be compiled.

  def compare(dbo: Dataset[Cols], ods: Dataset[Cols]) = {
    val j = dbo.crossJoin(ods)
    // Tried val j = dbo.joinWith(ods, func.expr("true")) too
    j.take(5).foreach(r => println(r)) 
  }

But it got a runtime error when submitting to Spark.

Join condition is missing or trivial. (if using joinWith stead of crossJoin)
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
        at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1067)
        at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1064)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257)
        at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1064)
        at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1049)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
        at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
        at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
        at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84)
        at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89)
        at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2814)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2127)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2342)
        at MappingPoint$.compare(MappingPoint.scala:43)
        at MappingPoint$.main(MappingPoint.scala:33)
        at MappingPoint.main(MappingPoint.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
ca9163d9
  • 27,283
  • 64
  • 210
  • 413
  • Why don't you use Spark 2.2 where this (and many more things) got improved a lot? Any reason to stick to 2.1.1? At least give the version a try and see if that fixes it. I think 2.2 could (as the code has changed). – Jacek Laskowski Jul 17 '17 at 21:18
  • I think I installed 2.1.1 (the newest version at that time?) a couple of months ago and didn't upgrade it. – ca9163d9 Jul 17 '17 at 21:19
  • There's no installation of Spark but defining it as a library dependency or in `PATH`. Time to upgrade. At least you'll know if we're fighting something that's been already fixed before (and you won't waste your time). – Jacek Laskowski Jul 17 '17 at 21:21
  • 1
    Tried 2.2.0 with scala 2.11.8 and still get the same error. – ca9163d9 Jul 17 '17 at 22:16

2 Answers2

9

I found the solution in How to enable Cartesian join in Spark 2.0?.

sparkConf.set("spark.sql.crossJoin.enabled", "true")
ca9163d9
  • 27,283
  • 64
  • 210
  • 413
0

The following works for me. I simplified the Cols case class so that I don't have to type so much, but it is otherwise I believe what you are attempting.

I used Spark 2.1.1:

case class Cols (
    A: Int,
    B: String
)

val dbo: Dataset[Cols] = spark.createDataset(
    Seq[Cols](
        Cols(1, "One"),
        Cols(2, "Two")
    )
)
val ods: Dataset[Cols] = spark.createDataset(
    Seq[Cols](
        Cols(3, "Three"),
        Cols(4, "Four")
    )
)

val cartesian: Dataset[(Cols,Cols)] = dbo.crossJoin(ods).map {
    case Row(lA: Int, lB: String, rA: Int, rB: String) => (Cols(lA, lB), Cols(rA, rB))
}
val result: Dataset[Int] = cartesian.map {
    case (l: Cols, r: Cols) => 0
}

As long as Cols has fewer than 11 elements, you should be OK. Otherwise, you might run into issues trying to pattern match on >22 elements after the crossJoin.

It looks to me like what you are submitting to Spark may still be using the joinWith line, which Spark apparently attempts to detect and prevent cartesian joins on.

Jack Leow
  • 21,945
  • 4
  • 50
  • 55
  • Can I always use `dbo.joinWith(ods, func.lit(true))` to avoid the issue of >22 elements of the pattern match? BTW, `dbo.joinWith(ods, func.lit(true))` already returns the value with the type of `Dataset[(Cols,Cols)]` so there will be no need to do extra mapping (in your code of `cartesian`. But I still got the error. Maybe there are some system settings to allow cross join? – ca9163d9 Jul 18 '17 at 15:10