0

i am trying to use kafka as streamer and use spark to process data

config:

  • python3.9

  • Kubuntu 21.10

  • echo $JAVA_HOME : /usr/lib/jvm/java-8-openjdk-amd64

  • echo $SPARK_HOME: /opt/spark

  • spark version: 3.2.0

  • pyspark version: pyspark-3.2.1-py2.py3

  • downloaded kafka version: kafka_2.13-3.1.0.tgz
    kafka status:

  • :~$ sudo systemctl status kafka
    kafka.service - Apache Kafka Server
    Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset: enabled)
    Active: active (running) since Sat 2022-01-29 19:02:18 +0330; 4s ago
    Docs: http://kafka.apache.org/documentation.html
    Main PID: 5271 (java)
    Tasks: 74 (limit: 19017)
    Memory: 348.7M
    CPU: 5.188s

my python program:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import os
import findspark as fs
fs.init()


spark_version = '3.2.0'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_3.1.0:{}'.format(spark_version)
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

# os.environ['PYSPARK_SUBMIT_ARGS'] = "--master local[2] pyspark-shell"

kafka_topic_name = "bdCovid19"

kafka_bootstrap_servers = 'localhost:9092'

if __name__ == "__main__":
    print("Welcome to DataMaking !!!")
    print("Stream Data Processing Application Started ...")
    print(time.strftime("%Y-%m-%d %H:%M:%S"))

    spark = SparkSession \
        .builder \
        .appName("PySpark Structured Streaming with Kafka and Message Format as JSON") \
        .master("local[*]") \
        .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    # Construct a streaming DataFrame that reads from test-topic
    orders_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

running on pycharm

Error:

raise RuntimeError("Java gateway process exited before sending its port number") RuntimeError: Java gateway process exited before sending its port number

in this line: spark = SparkSession \

IF i remove os.environ lines from the code that error disaper but a got this :

raise converted from None pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

in this line: orders_df = spark \

I have read these:

  1. Pyspark: Exception: Java gateway process exited before sending the driver its port number

  2. Creating sparkContext on Google Colab gives: RuntimeError: Java gateway process exited before sending its port number

  3. Spark + Python - Java gateway process exited before sending the driver its port number?

  4. Exception: Java gateway process exited before sending the driver its port number #743

  5. Pyspark: Exception: Java gateway process exited before sending the driver its port number

  6. Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

  7. pyspark.sql.utils.AnalysisException: Failed to find data source: kafka

none of them worked for me! any suggestions?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
mohsen
  • 108
  • 5
  • 1
    1) The `PYSPARK_SUBMIT_ARGS` should be assigned before findspark and before spark are imported. 2) You're using the wrong version of the kafka package, should match your Spark version 3) you can add `--packages` line when you actually run spark-submit, or modify the environment variables *in Pycharm* rather than code to set PYSPARK_SUBMIT_ARGS – OneCricketeer Jan 30 '22 at 08:01
  • could you tell me what version# of kafka should i use? – mohsen Jan 30 '22 at 20:55

0 Answers0