Questions tagged [apache-bahir]

Apache Bahir provides extensions to multiple distributed analytic platforms, extending their reach with a diversity of streaming connectors and SQL data sources.

What is Apache Bahir

Apache Bahir provides extensions to multiple distributed analytic platforms, extending their reach with a diversity of streaming connectors and SQL data sources.

Currently, Bahir provides extensions for Apache Spark and Apache Flink.

Apache Spark extensions

  • Spark data source for Apache CounchDB/Cloudant
  • Spark Structured Streaming data source for Akka
  • Spark Structured Streaming data source for MQTT
  • Spark DStream connector for Apache CounchDB/Cloudant
  • Spark DStream connector for Akka
  • Spark DStream connector for Google Cloud Pub/Sub
  • Spark DStream connector for MQTT
  • Spark DStream connector for Twitter
  • Spark DStream connector for ZeroMQ

Apache Flink extensions

  • Flink streaming connector for ActiveMQ
  • Flink streaming connector for Flume
  • Flink streaming connector for Redis
  • Flink streaming connector for Akka
  • Flink streaming connector for Netty

The Apache Bahir community welcomes the proposal of new extensions.

Contact the Bahir community

For Bahir updates and news, subscribe to our development mailing list. Check out All Mailing Lists.

22 questions
3
votes
1 answer

DataFrame returned by getBatch from MQTTTextStreamSource did not have isStreaming=true

I try to use MQTT together with PySpark Structured Streaming. from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("Test") \ …
2
votes
1 answer

Causes of "java.lang.NoSuchMethodError: org.eclipse.paho.client.mqttv3.MqttConnectOptions.setAutomaticReconnect(Z)V"

I am trying to run spark structured streaming MQTT using Apache Bahir by modifying the sample wordcount example provided. SPARK version: spark-2.2.0-bin-hadoop2.7. I am using this command to run the program: bin\spark-submit --packages…
nbbk
  • 1,102
  • 2
  • 14
  • 32
1
vote
0 answers

KuduSink fails to start

I'm trying to write a ETL pipeline from kafka to HDFS using flink. I'm using the bahir KuduSink and a PojoOperationMapper It throws an exception before starting. I've included my code, pom, and exception stack trace. Is there something obvious I'm…
1
vote
0 answers

How to send data from spark-structured-streaming to a topic of ActiveMQ

I am a beginner in Spark and want to send a dataframe from my spark-structured-streaming application to a defined topic in ActiveMQ. How can I do this? EDIT: My versions: activeMQ-5.16, spark-2.4.0, bahir-2.4.0, scala-2.11.11 Currently, I am able to…
1
vote
1 answer

MqttException while connecting to ActiveMq via SparkStructedStreaming

I am trying to connect my spark application to ActiveMQ, but while running the application I am getting this error: [error] at org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence.checkIsOpen(MqttDefaultFilePersistence.java:130) My…
WinterSoldier
  • 393
  • 4
  • 15
1
vote
1 answer

Unable to start Spark application with Bahir

I am trying to run a Spark application in Scala to connect to ActiveMQ. I am using Bahir for this purpose format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider"). When I am using Bahir2.2 in my built.sbt the application is running…
WinterSoldier
  • 393
  • 4
  • 15
1
vote
0 answers

Databricks MQTT Streaming AbstractMethodError

i'm trying to make a SQL structured streaming from a MQTT Broker: test = spark.readStream.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\ .option("clientId",…
1
vote
1 answer

pyspark MQTT structured streaming with apache bahir

I'm using spark 2.4 and I've run pyspark like this: ./bin/pyspark --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.2 pyspark runs successfully. (But when I run spark-sql-streaming-mqtt_2.11:2.4.0-SNAPSHOT, got an error) I'm trying to…
1
vote
1 answer

Why does query throw ClassCastException "SerializedOffset cannot be cast to org.apache.spark.sql.execution.streaming.LongOffset" with MQTT Source?

I am getting the following exception when Spark Structured Streaming code 18/12/05 15:00:38 ERROR StreamExecution: Query [id = 48ec92a0-811a-4d57-a65d-c0b9c754e093, runId = 5e2adff4-855e-46c6-8592-05e3557544c6] terminated with error …
Hasif Subair
  • 436
  • 2
  • 13
  • 29
1
vote
1 answer

On adding a jar using mvn install: install-file: On project standalone-pom: The artifact information is incomplete or not valid

I want to push the bahir jar to my local m2 repository. I'm using maven-3.5.0 downloaded tar.gz and jdk8, both are set as environment variables and are working fine. I built apache bahir for spark from the available download from git using maven,…
awisha
  • 93
  • 8
1
vote
1 answer

Apache Bahir, send stuff to ActorReceiver

I am trying to setup a simple process with Spark Streaming, using Apache Bahir to connect to Akka. I tried to follow their example together with this older one. I have a simple forwarder actor class ForwarderActor extends ActorReceiver { def…
ticofab
  • 7,551
  • 13
  • 49
  • 90
0
votes
1 answer

java.lang.ClassNotFoundException: Failed to find data source: org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider. Please find packages

I have added "org.apache.bahir" %% "spark-streaming-mqtt" % "2.4.0" to my build.sbt, and using df .writeStream .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSinkProvider") .outputMode("complete") .option("topic", "mytopic") …
Eljah
  • 4,188
  • 4
  • 41
  • 85
0
votes
1 answer

What is the difference between two Flink Redis sink dependencies with groupId `org.apache.bahir` and `org.apache.flink`?

I am learning Flink. I try to add a Redis sink. In the official Flink doc, it links to a Apache Bahir doc, which is using org.apache.bahir flink-connector-redis_2.11
Hongbo Miao
  • 45,290
  • 60
  • 174
  • 267
0
votes
1 answer

Large data with Spark and CouchDB

I used spark 2.4.0 with "org.apache.bahir - spark-sql-cloudant - 2.4.0" I have to download all json files from couchDB to hdfs. val df = spark .read .format("org.apache.bahir.cloudant") .load("demo") df.persist(StorageLevel.MEMORY_AND_DISK) …
0
votes
1 answer

Scala 2.11 compilation for the Apache Bahir Library

Not sure if this is the right place to post this question. (Apologies if it isn't. And if so, please point me in the right direction.) I am attempting to compile Apache Bahir to generate scala 2.11 artifacts (as mvn clean install -P scala-2.11…
decimus phostle
  • 1,040
  • 2
  • 13
  • 28
1
2