i am trying to use kafka as streamer and use spark to process data
config:
python3.9
Kubuntu 21.10
echo $JAVA_HOME : /usr/lib/jvm/java-8-openjdk-amd64
echo $SPARK_HOME: /opt/spark
spark version: 3.2.0
pyspark version: pyspark-3.2.1-py2.py3
downloaded kafka version: kafka_2.13-3.1.0.tgz
kafka status::~$ sudo systemctl status kafka
kafka.service - Apache Kafka Server
Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
Active: active (running) since Sat 2022-01-29 19:02:18 +0330; 4s ago
Docs: http://kafka.apache.org/documentation.html
Main PID: 5271 (java)
Tasks: 74 (limit: 19017)
Memory: 348.7M
CPU: 5.188s
my python program:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import os
import findspark as fs
fs.init()
spark_version = '3.2.0'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_3.1.0:{}'.format(spark_version)
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
# os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell"
kafka_topic_name = "bdCovid19"
kafka_bootstrap_servers = 'localhost:9092'
if __name__ == "__main__":
print("Welcome to DataMaking !!!")
print("Stream Data Processing Application Started ...")
print(time.strftime("%Y-%m-%d %H:%M:%S"))
spark = SparkSession \
.builder \
.appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
.master("local[*]") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Construct a streaming DataFrame that reads from test-topic
orders_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", kafka_topic_name) \
.option("startingOffsets", "latest") \
.load()
running on pycharm
Error:
raise RuntimeError("Java gateway process exited before sending its port number") RuntimeError: Java gateway process exited before sending its port number
in this line: spark = SparkSession \
IF i remove os.environ
lines from the code that error disaper but a got this :
raise converted from None pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
in this line: orders_df = spark \
I have read these:
Pyspark: Exception: Java gateway process exited before sending the driver its port number
Spark + Python - Java gateway process exited before sending the driver its port number?
Exception: Java gateway process exited before sending the driver its port number #743
Pyspark: Exception: Java gateway process exited before sending the driver its port number
Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka
none of them worked for me! any suggestions?