I understand JOINS of two different dataframes are not supported in Spark 2.2.0 but I am trying to do self-join so only one stream. Below is my code
val jdf = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "join_test")
.option("startingOffsets", "earliest")
.load();
jdf.printSchema
which print the following
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
Now I run the join query below after reading through this SO post
jdf.as("jdf1").join(jdf.as("jdf2"), $"jdf1.key" === $"jdf2.key")
And I get the following Exception
org.apache.spark.sql.AnalysisException: cannot resolve '`jdf1.key`' given input columns: [timestamp, value, partition, timestampType, topic, offset, key];;
'Join Inner, ('jdf1.key = 'jdf2.key)
:- SubqueryAlias jdf1
: +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@f662b5,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#243, value#244, topic#245, partition#246, offset#247L, timestamp#248, timestampType#249]
+- SubqueryAlias jdf2
+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@f662b5,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> localhost:9092),None), kafka, [key#243, value#244, topic#245, partition#246, offset#247L, timestamp#248, timestampType#249]