2

I've a StructuredStreaming program running on GCP Dataproc, when i import KafkaProducer, i get error shown below:

Imports :

import sys, datetime, time, os
from pyspark.sql.functions import col, rank, dense_rank, to_date, to_timestamp, format_number, row_number, lead, lag,monotonically_increasing_id
from google.cloud import storage
import datetime
from pyspark.sql.functions import to_timestamp,split as split_df, broadcast as s_broad, window
from pyspark.sql import SparkSession, Window, Row
import calendar
from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType, LongType, DateType, TimestampType
from pyspark.sql.functions import lit, col, concat, regexp_replace, when, unix_timestamp, count as sql_count
import pandas as pd
import json
import copy
from google.cloud import bigquery
from kafka import KafkaProducer

The last import is causing error shown below :

Traceback (most recent call last):
  File "/tmp/1b6daf68f83749b88536401dbcf7eb71/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 14, in <module>
    from kafka import KafkaProducer
  File "/opt/conda/default/lib/python3.8/site-packages/kafka/__init__.py", line 23, in <module>
    from kafka.producer import KafkaProducer
  File "/opt/conda/default/lib/python3.8/site-packages/kafka/producer/__init__.py", line 4, in <module>
    from .simple import SimpleProducer
  File "/opt/conda/default/lib/python3.8/site-packages/kafka/producer/simple.py", line 54
    return '<SimpleProducer batch=%s>' % self.async
                                              ^
SyntaxError: invalid syntax

While creation of the DataProc cluster, as part of 'initialization-actions', i install the following:

pip install pypi
pip install kafka-python
pip install google-cloud-storage
pip install pandas

Any ideas what needs to be done to debug/fix this ? tia!

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Karan Alang
  • 869
  • 2
  • 10
  • 35
  • You **do not need** that kafka Python package. It's not how you use Spark with Kafka. similarly, pandas functions should mostly all be done by Spark itself – OneCricketeer Feb 20 '22 at 08:05

1 Answers1

-1

On Dataproc package kafka-python does not exist not installed as standard

sudo su - to root and install it as above

as root

pip list|grep kafka

root@ctpcluster-m:~# pip install kafka-python
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
     |████████████████████████████████| 246 kB 22.0 MB/s
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2

As a normal user

hduser@ctpcluster-m: /home/hduser> pip list|grep kafka
kafka-python                      2.0.2
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Mich Talebzadeh
  • 117
  • 2
  • 12
  • While this answers the question, I don't think it's what OP really wants. This isn't needed for dataproc. Also, you shouldn't need to use sudo for pip – OneCricketeer Feb 20 '22 at 08:08
  • yes indeed this answers the question raised. I am answering a question not offering a solution on how to deploy Kafka with Spark or the validity of this approach. – Mich Talebzadeh Feb 21 '22 at 09:11
  • Okay, and can you explain why sudo is needed? OP already claims they ran `pip install kafka-python` (which will not give the error in the post, by the way, since it's from `site-packages/kafka` import) – OneCricketeer Feb 21 '22 at 15:14
  • the OP opened a thread in https://lists.apache.org/list.html?user@spark.apache.org I've a GCP Dataproc cluster, and I'm running a Spark StructuredStreaming job on this. I'm trying to use KafkaProducer to push aggregated data into a Kafka topic, however when i import KafkaProducer (from kafka import KafkaProducer), it gives error ....... As part of the initialization actions, i'm installing the following : --- pip install pypi pip install kafka-python pip install google-cloud-storage pip install pandas Hopefully you got i! – Mich Talebzadeh Feb 21 '22 at 16:15
  • 1
    Hi All - the issue was due to incorrect pip_install.sh file i was using, using the correct pip_install.sh (as initialization-action) resolved.. contents of pip_install.sh ```pip install pypi pip install kafka-python pip install google-cloud-storage pip install pandas ``` – Karan Alang Feb 21 '22 at 22:33