2

I have tried this Java code with following latest maven artifact. https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer_2.10/0.4.0 As well as previous one - https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer_2.10/0.1.0 But following code doesn't allow me to compile with Java 1.8 and Eclipse Oxygen latest IDE.

---IMPORT SECTION--

import com.github.benfradet.spark.kafka.writer.DStreamKafkaWriter;

import com.github.benfradet.spark.kafka.writer.KafkaWriter;

import org.apache.kafka.common.serialization.StringSerializer;

import scala.Function1;

import scala.Option;

import scala.Serializable;

Map<String, Object> producerConfig = new HashMap<String, Object>();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("key.serializer", StringSerializer.class);
producerConfig.put("value.serializer", StringSerializer.class);

        KafkaWriter<String> kafkaWriter = new DStreamKafkaWriter<>(lines.dstream(), scala.reflect.ClassTag$.MODULE$.apply(String.class));

        Function1<String, ProducerRecord<String, String>> f = new MyFunc<String, ProducerRecord<String, String>>() {
            @Override
            public ProducerRecord<String, String> apply(final String s) {
                return new ProducerRecord<>("my-topic", s);
            }
        };

        kafkaWriter.writeToKafka(producerConfig, f,Option.empty());

KafkaWriter.writerToKafka(producerConfig,f,Option.empty) --- Line gives me a following Error on Eclipse IDE

spark-kafka-writer-error

Any help appreciate.

sunone5
  • 375
  • 2
  • 5
  • 14

1 Answers1

0

The kafka writer's writeToKafka method takes a scala.collection.Map while in this attempt, we're using a java.util.Map

The official Java example uses an asScala transformation, but I don't know where that comes from. ( It doesn't seem legit as it looks like it's using Scala implicit conversions from scala.collection.JavaConverters, but that won't work in Java)

The simplest solution would be to instantiate a scala.collection.immutable.HashMap although I would recommend migrating the job to use Scala instead.


If you are using Spark >= v2.2, the easiest way to write to kafka is to transform your data into a Dataset or DataFrame and use the DataFrameWriter like this:

data.write.format("kafka")
  .option("kafka.bootstrap.servers","...") 
  .option("topic", "abcTopic" )
  .option("checkpointLocation", "/path/to/checkpoint/loc")
  .save()

(AFAIK, this is not documented anywhere I could find.)

maasg
  • 37,100
  • 11
  • 88
  • 115
  • Yep for the "producerConfig" they have used such as "asScala" transformation. looks further this comment - https://github.com/benfradet/spark-kafka-writer/issues/59#issuecomment-308849837 And I have put message for this original ( Spark-Kafka-Writer ) creators as following https://github.com/benfradet/spark-kafka-writer/issues/59#issuecomment-333434591 – sunone5 Oct 02 '17 at 08:21
  • https://stackoverflow.com/questions/16918956/convert-java-map-to-scala-map for converting a java map to a scala map. `JavaConverters` does not rely on implicits unlike `JavaConversions`. – BenFradet Oct 02 '17 at 08:27
  • @BenFradet I'm afraid it does rely on implicits and only works in Scala. It adds an explicit `asJava`/`asScala` methods using the 'pimp my library' pattern. See: https://github.com/scala/scala/blob/2.12.x/src/library/scala/collection/convert/DecorateAsJava.scala#L100 – maasg Oct 02 '17 at 09:55
  • @BenFradet this can probably be used in the place: `JavaConverters.mapAsScalaMap(javaMap)` : http://www.scala-lang.org/api/2.12.1/scala/collection/JavaConverters$.html#mapAsScalaMap[A,B](m:java.util.Map[A,B]):scala.collection.mutable.Map[A,B] – maasg Oct 02 '17 at 09:58