3

I have an empty dataframe created as below. schema:

table_schema = StructType([
    StructField('bookingNumber', StringType(), True),
    StructField('bookingProfile', StringType(), True),
    StructField('json_string', StringType(), True),
    StructField('json_ingestion_time', TimestampType(), True)
])

def prepare_empty_df(schema: StructType):
    empty_rdd = spark.sparkContext.emptyRDD()
    empty_df = spark.createDataFrame(empty_rdd, schema)
    return empty_df

My data is coming from an API call. Each GET call will return on JSON and I am converting the API response, which is a JSON into a text. I am parsing this JSON for some attributes and then insert into a table. Because I have 200k jsons, I dont want to run 200k insert queries on my table and wanted to append all the results of API JSON calls to an empty dataframe and simply ingest the dataframe. The API calls I make are not sequential rather are parallell threads. i.e., I am running 4 parallell API calls at a time and trying to append the 4 outputs to an empty dataframe. Below is how I am converting API JSON and appending it into the empty dataframe.

Main method:

if __name__ == '__main__':
    spark = SparkSession.builder.appName('Raw_Check').getOrCreate()
    batch_size = 4
    booking_ids = []
    initial_df = prepare_empty_df(schema=raw_table_schema)

    initial_load = True

            
    cquery = f'select booking_id from db.table limit 20'
    booking_ids = get_booking_ids(spark=spark, query=cquery) # returns a list of bookings

    for i in range(0, len(booking_ids), batch_size):
        sub_list = booking_ids[i:i + batch_size]
        threads = []
        for index in range(batch_size):
            t = threading.Thread(target=get_json, name=str(index), args=(spark, sub_list[index], initial_df))
            threads.append(t)
            t.start()
        for index, thread in enumerate(threads):
            thread.join()

    print('Final Dataframe count')
    print(initial_df.count())
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')
    print('Final Dataframe contents')
    initial_df.show()
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')

get_json method:

def get_json(spark: SparkSession, booking_number: str, init_df: DataFrame):
    headers = {"Content-type": "some_content_type"}
    token = doing_something_to_get_token

    token_headers = {'Authorization': f"Bearer {token}"}
    api_response = requests.get(f'https://api_url?booking_number={booking_number}', headers=token_headers)
    json_data = spark.sparkContext.parallelize([api_response.text])
    df = spark.read.json(json_data)

    api_df = (df.select('id').withColumnRenamed('id', 'bookingProfile')
        .withColumn('bookingNumber', lit(booking_number))
        .withColumn('json_string', lit(api_response.text))
        .withColumn('json_ingestion_time', lit(current_timestamp()))
    )
    api_df.show()
    init_df.unionAll(api_df)

I am unioning every row from the API output to initial_df I created in the main method. I can also see data from the api_df due to api_df.show() when the script runs. Four parallell threads are launching and I can see 4 API calls running at a time. But the at the end, empty dataframe: initial_df I created is still empty by the end of the script. The count is zero and basically it prints NULL when I displayed the contents of it.

-------------------------------------------------------------------------------------------------------------------------------------------------
Final Dataframe count
0
-------------------------------------------------------------------------------------------------------------------------------------------------
Final Dataframe contents
+--------------+-------------------+-----------+------------------------+
|bookingNumber |bookingProfile     |json_string|     json_ingestion_time|
+--------------+-------------------+-----------+------------------------+
+--------------+-------------------+-----------+------------------------+

Could anyone let me know what is the mistake I am doing here and how can I correct it? Any help is massively appreciated.

Metadata
  • 2,127
  • 9
  • 56
  • 127
  • You can use `createGlobalTempView` function & to create in-memory global table and write data to it. This will be visible for all threads across the session. – Srinivas Aug 09 '23 at 13:34
  • it's always a mess when you try to share a variable across threads. Threads fight for that shared variable and only your OS manages it. Not sure how to help, I would start by putting `sleep()` in `get_json` method. If that doesn't work you should use semaphores. – Siddhant Tandon Aug 12 '23 at 22:22
  • can you please add some sample booking ids & rest api response to simulate your use case – Srinivas Aug 13 '23 at 13:15
  • do you want solution in python or scala ? – Srinivas Aug 13 '23 at 13:18
  • @Srinivas I am implementing the requirement in Python. – Metadata Aug 13 '23 at 16:53
  • @SiddhantTandon Noted, will update my code with `sleep()` – Metadata Aug 13 '23 at 16:54

8 Answers8

2

MapPartitions

As others have already pointed out, the idiomatic way in Spark is to map the booking_ids to the expected result by calling the REST api for each booking_id.

However, when mapping the booking_ids to the result, each REST call comes with a certain overhead for initiating the HTTP connection, doing the SSL handshake or getting the access token that is required to access the api. This slows down the process.

HTTP Sessions

The requests library provides a solution to reduce this overhead by using session-objects. The docs say:

So if you’re making several requests to the same host, the underlying TCP connection will be reused, which can result in a significant performance increase

Basically the HTTP(s) connection is left open after each API call (using the keep-alive header) and reused for the next call. The only requirement is that that request object is passed from one api call to the next one. So instead of initiating three HTTPS connections

requests.get('https://api_url?booking_number=1')
requests.get('https://api_url?booking_number=2')
requests.get('https://api_url?booking_number=3')

only one session this created and reused:

s = requests.Session()
s.get('https://api_url?booking_number=1')
s.get('https://api_url?booking_number=2')
s.get('https://api_url?booking_number=3')

If the logging of the requests is enabled we see in the first case that three HTTPs connections are opened and in the second case only one.

MapPartitions

Spark provides a mechanism to reuse the session object: mapPartitions. The idea is to map a complete partition of rows instead of single rows. The function that does the actual mapping gets an iterator that contains all rows of the partition and also returns an iterator containing the result partition.

Before starting to use the iterator the session object is created and then reused when iterating:

def callRestApi(rowIt):
    import requests, json, datetime
    headers = {"Content-type": "some_content_type"}
    token = "doing_something_to_get_token"
    token_headers = {'Authorization': f"Bearer {token}"}
    s = requests.Session()
    s.headers.update(headers)
    s.headers.update(token_headers)

    for row in rowIt:
        booking_number = row['booking_id']        
        api_response = s.get(f'https://api_url?booking_number={booking_number}').text
        json_data = json.loads(api_response)
        yield [booking_number, json_data['id'], api_response, datetime.datetime.now()]

Calling mapPartitions involves a (small) detour to RDDs as the Pyspark API for dataframes is missing this feature:

api_df = df.rdd.mapPartitions(callRestApi).toDF(table_schema).cache()

Enabling again the request logging we see that now only one connection per partition is created and that all requests contain the header Connection: keep-alive.

Discussion

  • mapPartitions is slightly more complex to implement than using map directly but shows better performance characteristics. Depending on the Spark cluster, the REST api server and the performance of the authentication process mapPartitions was at least 20% faster than map in my tests.
  • mapPartitions can reduce the pressure on the REST api server as fewer connections and SSL handshakes are required.
  • in a production system, some kind of error handling is required inside of callRestApi. When getting and parsing the response from the REST api server we could see connection issues or malformed json returns. These should be handled as they might break the processing of all rows within the specific partition.
werner
  • 13,518
  • 6
  • 30
  • 45
1

Why you are not trying to store api responses into a combined json file. Then read that json in a dataframe and ingest in the table you want. The part where you are -->unioning every row from the API output to initial_df. Instead of dataframe append it to a file. After each api call completes you will have a json file containing all records from all api calls . Read the json in to a dataframe , do whatever transformation youwant and insert into desired table.

Currently i am using similar approach , where i am reading around 650k of Total json rows/records from API call in batches of 10,000 records per api call . In each api call i extract required json and append it to a json file. After all api calls are done i am left with 650k records in Json file , which i use next to read into dataframe , do required flattening & transformation and store into a table.

  • You could also write each API response to a separate file, which will be much easier to parallelize as well as restart in case of an error or crash. – shadowtalker Aug 13 '23 at 08:30
  • @vivek I want each JSON output to be one row. If I create one huge JSON, do I then load this one json and loop thru each sub JSON to create a final dataframe? – Metadata Aug 13 '23 at 16:49
  • @vivek, I also have 200k-250k API calls. So is it good to create a huge JSON file? – Metadata Aug 13 '23 at 16:52
  • This sounds like the best option-- assemble all REST responses into a JSon file , then use Spark to read and parse it into a dataframe in one go. – mazaneicha Aug 18 '23 at 13:27
  • @Metadata: sorry I got busy. So what I am doing is that the output of each api call is a json response. I append each json response output separated by comma into a consolidated json file. – Vivek Kaushik Aug 19 '23 at 10:15
  • @Metadata: after you create consolidated json . You don’t need to iterate over every row. Just use sparks read json with multiline option and it will create a dataframe of that consolidated json – Vivek Kaushik Aug 19 '23 at 10:18
  • @Metadata: working fine for me now so far . My json records are going as big as 750 to 800 k and my consolidated json reaches maximum of 2.5 gb. It can be read easily in spark and I am doing a lot of joining and transformation to create flattened csv from it. Working okay for now – Vivek Kaushik Aug 19 '23 at 10:20
1

Move complete logic to udf function & spark will use multiple thread to fetch data from api.

You can increase or decrease parallel threads using repartition(<batch_size>) function.

Please check below logic & modify as per your requirement.

import pyspark
from pyspark.sql.functions import col, from_json, current_timestamp, udf
from pyspark.sql import Row
from pyspark.sql.types import StringType
import requests

# creating session object globally so that it can be re used inside udf function for multiple rows across partitions.

session = requests.Session() 

batch_size = 4 # Number of thread or tasks to fetch data from api.

# Dummy Schema, You can change as per your requirement.
table_schema = 'completed STRING,id STRING,title STRING,userId STRING'
# Dummy Function to get list of booking_id, You can change as per your requirement.
def get_booking_ids(spark):
    return spark.createDataFrame(map(lambda n: Row(n), range(1, 100)), ['booking_id'])

# Dummy Function to invoke rest api & return response as a text
# You can add header, authentications, check if session exists or not if not create new session .... etc.
def get_api_response(booking_number, session):
    return session.get(f'https://jsonplaceholder.typicode.com/todos/{booking_number}').text
# Registering get_api_response function in udf.
get_response_udf = udf(lambda booking_id: get_api_response(booking_id, session), StringType()) # passing session object so that it can be reused for every request.
df = get_booking_ids(spark)\
.repartition(batch_size)\
# Invoking api & getting response as json string format.
.withColumn('api_response', get_response_udf(col('booking_id')))\ 
# Cache is important, else It will send request to api for every transformations.
.cache()\ 
# applying schema to json message
.withColumn('api_response', from_json(col('api_response'), table_schema))\ 
# adding current_timestamp(), You can also put this in get_api_response function
.withColumn('json_ingestion_time', current_timestamp())\
.selectExpr('booking_id', 'api_response.*', 'json_ingestion_time') # extracting all properties from json response.
df.printSchema()

root
 |-- booking_id: long (nullable = true)
 |-- completed: string (nullable = true)
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- json_ingestion_time: timestamp (nullable = false)

df.show(100 , False)

+----------+---------+---+----------------------------------+------+--------------------------+
|booking_id|completed|id |title                             |userId|json_ingestion_time       |
+----------+---------+---+----------------------------------+------+--------------------------+
|3         |false    |3  |fugiat veniam minus               |1     |2023-08-13 20:23:11.425511|
|1         |false    |1  |delectus aut autem                |1     |2023-08-13 20:23:11.425511|
|4         |true     |4  |et porro tempora                  |1     |2023-08-13 20:23:11.425511|
|2         |false    |2  |quis ut nam facilis et officia qui|1     |2023-08-13 20:23:11.425511|
+----------+---------+---+----------------------------------+------+--------------------------+

Cluster enter image description here

Below screen shot which I have executed in cluster

enter image description here

Srinivas
  • 8,957
  • 2
  • 12
  • 26
  • Using UDF as suggested means opening a new http session for every row. `mapPartitions` as proposed by @werner won't carry this enormous overhead. – mazaneicha Aug 18 '23 at 14:27
  • That is why i have changed logic little bit instead of creating new session for every row or every partition.. I am passing global session to udf.. so that it can be reused for every messages. – Srinivas Aug 18 '23 at 14:29
  • I always thought HttpSession objects are not serializable, so you won't be able to pass a session between driver and executors. Have you tried your changed logic? – mazaneicha Aug 18 '23 at 14:36
  • Yes, I have executed above logic in cluster. Please check attached images. – Srinivas Aug 18 '23 at 14:53
  • I meant HttpSession (sorry for confusion), a session that you can `keep-alive` and reuse to make multiple requests once connection to a service provider is established. Your code will still be creating new HttpSession for each request. – mazaneicha Aug 18 '23 at 17:47
  • Ok,I didn't get you.. how it will create new season object every request? – Srinivas Aug 18 '23 at 20:53
0

It makes sense to execute the API requests directly by Spark. You can use a combination of parallelize and map to achieve this:

First, reduce your get_json function to only getting the JSON:

def get_json(booking_number):
    import requests
    import datetime

    response = requests.get(f"https://jsonplaceholder.typicode.com/posts/{booking_number}")
    
    json_data = response.json()
    json_data['json_ingestion_time'] = datetime.datetime.now().isoformat()
    json_data['bookingNumber'] = booking_number
    
    return json_data

Then create the following function which fetches the data from the API in parallel:

max_parallel_requests = 4

# get booking ids as a list
booking_ids = get_booking_ids(spark=spark, query=cquery)

# fetch data from API and store them directly in the data_rdd
data_rdd = spark.sparkContext.parallelize(booking_ids, numSlices=max_parallel_requests).map(get_json)

# create a df from the rdd
df = data_rdd.toDF()

df.show()

The amount of parallelism depends also on the resources that you provide to Spark.

elyptikus
  • 936
  • 8
  • 24
0

actually I was coding something for these types of problems, you can make a json template and append in the json to have a big json string to parse, maybe the problem will be in the parsing process but pandas can handle the parsing part with json.

the package is called JsonDF you can download it from GitHub or with pip install JsonDf, and you can edit on the code if there is some functionallity that it doesn't exist and it will be helpful.

the code that I suggesst you to go with is :

from JsonDF.utils.Json.Json import Json

template = Json('template', {}).objectify() # empty structure
template.insert('table_name': [1, 2, 3, 4]) # insert 'key: value' in the template

print(template)

what is above will outputs :

{
    'template': {
        'table_name': [1, 2, 3, 4],
    }
}

and you can access the template values with tempalte.template which is the way that the JsonDF handle the Json objects.

you can use it to create a json template and add to it, or even to parse an already existing json, just pass the json in the second param of the Json() object, which will parse it and create an object from it that you will have the ability to inseart, update, search, and delete from the json.

this will help you in just handle all the processing on the json only, and then pass the final json to convert to the dataframe.

hope it helps.

0

The unionAll operation does not modify the DataFrame the method is called on, but instead returns a new DataFrame. Since you are not assigning the result of the unionAll operation back to initial_df, the changes are not persistent. I don't know if its fully correct but try following the example below. Also if I am not mistaken I think unionAll is deprecated but you will have to double check that.

Best of luck!

get_json

def get_json(spark: SparkSession, booking_number: str):
    headers = {"Content-type": "some_content_type"}
    token = doing_something_to_get_token

    token_headers = {'Authorization': f"Bearer {token}"}
    api_response = requests.get(f'https://api_url?booking_number={booking_number}', headers=token_headers)
    json_data = spark.sparkContext.parallelize([api_response.text])
    df = spark.read.json(json_data)

    api_df = (df.select('id').withColumnRenamed('id', 'bookingProfile')
        .withColumn('bookingNumber', lit(booking_number))
        .withColumn('json_string', lit(api_response.text))
        .withColumn('json_ingestion_time', lit(current_timestamp()))
    )
    return api_df

main.py

if __name__ == '__main__':
    spark = SparkSession.builder.appName('Raw_Check').getOrCreate()
    batch_size = 4
    booking_ids = []
    initial_df = prepare_empty_df(schema=raw_table_schema)

    initial_load = True
            
    cquery = f'select booking_id from db.table limit 20'
    booking_ids = get_booking_ids(spark=spark, query=cquery) # returns a list of bookings

    for i in range(0, len(booking_ids), batch_size):
        sub_list = booking_ids[i:i + batch_size]
        threads = []
        results = []

        for index in range(batch_size):
            t = threading.Thread(target=get_json, name=str(index), args=(spark, sub_list[index]))
            threads.append(t)
            t.start()

        for index, thread in enumerate(threads):
            thread.join()
            results.append(thread.result())  # Collect the result DataFrame

        for result_df in results:
            initial_df = initial_df.unionAll(result_df)  # Union collected DataFrames

    print('Final Dataframe count')
    print(initial_df.count())
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')
    print('Final Dataframe contents')
    initial_df.show()
    print('-------------------------------------------------------------------------------------------------------------------------------------------------')
SomeSimpleton
  • 350
  • 1
  • 2
  • 12
0

Thanks to everyone for helping, I took another approach by creating a class and initialising my dataframe in the constructor as given below:

class PrepareApiData:
    def __init__(self, df=None):
        self.finalDf = df

    def get_json(self, spark, api_param):
        try:
            # Generate a token => token_headers
            api_response = requests.get(f'API_URL={api_param}', headers=token_headers)
            json_data = spark.sparkContext.parallelize([api_response.text])
            df = spark.read.json(json_data)
            self.finalDf = self.finalDf.unionAll(df)
        except Exception as error:
            traceback.print_exc()

    def api_calls(self, spark, batch_size, booking_ids):
        try:
            for i in range(0, len(booking_ids), batch_size):
                sub_list = booking_ids[i:i + batch_size]
                threads = []
                for index in range(len(sub_list)):
                    t = threading.Thread(target=self.get_json, name=str(index), args=(spark, sub_list[index]))
                    threads.append(t)
                    t.start()
                for index, thread in enumerate(threads):
                    thread.join()
            return self.finalDf
        except Exception as e:
            traceback.print_exc()

Here is how I am calling the objects from the class above:

init_df = prepare_empty_df(schema=some_schema, spark=spark) => This method return me an empty DF with a schema
pad = PrepareApiData(init_df)
booking_ids = [LIST_OF_MY_BOOKING_IDs]
pad.api_calls(spark, batch_size, booking_ids)
Metadata
  • 2,127
  • 9
  • 56
  • 127
-1

My take is, instead of passing dataframe to argument of thread, use starmap of python multiprocessing library. Get the output as the list of dataframe

Then iterate the dataframe to union the results in empty dataframe

I hope this will solve your problem

Tejas
  • 391
  • 3
  • 11