I try to convert java map to scala map in java static function.
OS: windows 11
jdk: 17
spark : spark-3.4.1-bin-hadoop3
ide : visual studio code
Below codes are the converting function.
import scala.Predef;
import scala.Tuple2;
import scala.collection.JavaConverters;
private static <A,B> scala.collection.immutable.Map<A, B> toScalaMap(java.util.Map<A, B> m) {
return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap(Predef.<Tuple2<A,B>>$conforms());
}
But the toMap function throws the exception.
The method toMap($less$colon$less<Tuple2<A,B>,Tuple2<K$,V$>>) in the type IterableOnceOps<Tuple2<A,B>,Iterable,Map<A,B>> is not applicable for the arguments (Function1<Tuple2<A,B>,Tuple2<A,B>>)
It seems that the parameters of toMap function are not correct. But I am a newbie of scala and I have no idea how to correct the toMap function parameters. Kindly inform me what the correct parameters have to be.
== Updated Part
Kindly check my codes. Below codes throw Errors.
private static <A,B> scala.collection.immutable.Map<A, B> toScalaMap(java.util.Map<A, B> m) {
List<Tuple2<A,B>> javaTuples = m.entrySet().stream()
.map(x -> Tuple2.apply(x.getKey(), x.getValue()))
.collect(Collectors.toList());
scala.collection.Map<A,B> scalaMap = scala.collection.Map$.MODULE$.apply(JavaConversions.asScalaBuffer(javaTuples).toSeq());
return scalaMap;
}
The errors are
The method asScalaBuffer(List<Tuple2<A,B>>) is undefined for the type JavaConversions.
== Updated Part 2
These are my java codes.
private static <A,B> scala.collection.immutable.Map<A, B> toScalaMap(Map<A, B> m) {
return JavaConverters.mapAsScalaMapConverter(m).asScala().toMap(Predef.$conforms());
}
SparkSession spark = SparkSession.builder().master("local[*]").appName(appName).getOrCreate();
Dataset<Row> df = spark.read().format("kafka")
.options(kafkaParams)
.load()
.selectExpr("CAST(value AS STRING) as column").filter(not(col("column").startsWith("date")));
Map<String, String> options = new HashMap<String, String>();
Dataset<Row> dfs = df.select(from_csv(col("column"), StoredColumnPojo.getStructType(), toScalaMap(options))
.as("entityStoredPojo"))
.selectExpr("entityStoredPojo.date", "entityStoredPojo.value", "entityStoredPojo.id",
"entityStoredPojo.title", "entityStoredPojo.state", "entityStoredPojo.frequency_short",
"entityStoredPojo.units_short", "entityStoredPojo.seasonal_adjustment_short").toDF();
As you see, toScalaMap(options)
method is parameter function to execute spark dataframe api. The above codes works successfully with spark-core_2.12, version 3.2.1. But I upgrade the spark-core_2.13 and the toScalaMap function throws the exception. I think the error is the matter of version problem, but I have no idea why.