I have a project where i'm integrating django rest framework (drf) and apache spark, so I have some endpoints and I need to execute some jobs with spark by using udf. I have the below structure of the project:
core/
udf/
├── pyspark_udf.py
├── (and the rest of the files of a django app)
api/
├── api.py
├── urls.py
├── (and the rest of the files of a django app)
udf.zip
So according with that structure:
- First, in
api.py
file i have the endpoint where I need to execute the job. - Second, inside the
pyspark_udf.py
, I call to an endpoint that is in theapi.py
file, I do some logic there and return the response. - Then, again in the
api.py
file, I use the value returned to create a new column in a pyspark dataframe. - Finally, I return the json data response in the drf endpoint.
I achieved that behavior but using requests
in the pyspark_udf.py
function, which I considerer that is not elegant or best practice, working with an api that is in my own project, instead of do a simple call to the class of the drf endpoint.
I created the minimum code below to reproduce the error and If someone can find a solution, I will appreciate it:
api/urls.py
from django.conf.urls import url
from rest_framework import routers
from .api import (ClassFooViewSet,ClassBarViewSet)
router = routers.DefaultRouter()
urlpatterns = [
url(r'^api/foo/endpointExampleA', ClassFooViewSet.as_view({'post':'endpointExampleA'}), name='endpointExampleA'),
url(r'^api/bar/endpointExampleB', ClassBarViewSet.as_view({'get':'endpointExampleB'}), name='endpointExampleB')
]
urlpatterns += router.urls
api/api.py
from rest_framework import viewsets
from rest_framework import views
from rest_framework import status
from rest_framework.decorators import action
from rest_framework.response import Response
import json
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from udf.pyspark_udf import UdfExample
class ClassFooViewSet(viewsets.ViewSet):
def __init__(self):
self.response_data = {'error': [], 'data': []}
self.data = {}
self.code = 0
@action(methods=['post'], detail=False)
def endpointExampleA(self, request, *args, **kwargs):
# I also tried to import here the udf
# from udf.pyspark_udf import UdfExample
try:
sc=SparkSession \
.builder \
.master("spark://192.168.0.105:7077") \
.appName('word_cloud') \
.config("spark.executor.memory", '2g') \
.config('spark.executor.cores', '2') \
.config('spark.cores.max', '2') \
.config("spark.driver.memory",'2g') \
.getOrCreate()
sc.sparkContext.addPyFile("core/udf.zip")
schema = StructType([
StructField("id", IntegerType(),True),
StructField("user", StringType(),True),
StructField("text", StringType(),True),
])
df = sc.createDataFrame([(1,"user1",""), (2,"user2",""), (3,"user3","")], schema)
text_udf = udf(UdfExample().msj, StringType())
text_df = df.withColumn("text",text_udf(df["text"]))
text_df.show(text_df.count(),False) # Here, I receive the error
import pdb;pdb.set_trace()
text_df = text_df.select('id','user','text').toJSON().collect()
for elem in text_df:
self.response_data['data'].append(json.loads(elem))
sc.stop()
self.code = status.HTTP_200_OK
except Exception as e:
self.code = status.HTTP_500_INTERNAL_SERVER_ERROR
return Response(self.response_data,status=self.code)
class ClassBarViewSet(viewsets.ModelViewSet):
def __init__(self):
self.response_data = {'error': [], 'data': []}
self.data = {}
@action(methods=['get'], detail=False)
def endpointExampleB(self, request, *args, **kwargs):
self.response_data['data'].append({"msj":"Hello World"})
return Response(self.response_data,status=status.HTTP_200_OK)
udf/pyspark_udf.py
# from api.api import ClassBarViewSet # This line fails
class UdfExample():
def __init__(self):
self.response_data = {'error': [], 'data': []}
self.data = {}
def msj(self, tweet):
import json
import requests
# from api.api import ClassBarViewSet # This line also fails here
try:
# This works
response = requests.get("http://localhost:8000/api/bar/endpointExampleB")
response = response.content.decode('utf-8')
json_response = json.loads(response)
return json_response['data']
# But I'd like to use this code, I can't import
# api.api so this part doesn't works
classB = ClassBarViewSet()
classB.endpointExampleB(request)
self.response_data = classB.response_data['data']
except Exception as e:
self.response_data['error'].append(str(e))
return self.response_data
The error that I receive when try to do: from api.api import ClassBarViewSet
is:
ImportError: cannot import name 'ClassBarViewSet'
Things to know about the error and the code related:
- First I created the udf folder with only the
pyspark_udf.py
file and an__init.py__
file, but I was receivingYou must either define the environment variable DJANGO_SETTINGS_MODULE or call settings.configure() before accessing settings.
error, I tried with several of OS answers related, exporting theDJANGO_SETTINGS_MODULE
variable, trying to execute the server with the--settings=core.settings
parameter (I'm using only onesettings.py
file), but I couldn't find a solution, so I created a django app and then I fixed the error. Finally, I generated the udf folder with thezip -r udf udf
command from top level, I mean the full udf folder. - In the pyspark dataframe, I have another column that is generated by another pyspark udf, in that function I don't use any drf endpoint and works good.
- If I try to import a django Model like
from api.models import (User)
inside of the msj() function in thepyspark_udf.py
file, then, I receive this error:ModuleNotFoundError: No module named 'api'
, that is related with the way of create the.zip
. If I import the model at the top of the file, this error doesn't appear, I don't understand why. - I have a frontend layer where I import the api and calls to the endpoints, without any problem, in the same way that I trying to import in the
pyspark_udf.py
file.
So why I cannot import the api in the pyspark udf app? I searched but I couldn't find another SO question related. Thanks in advance for any help!