0

In this code, I am reading a CSV containing 2000 entries. CSV contains data to hit the API. The api_hit() function is in processing_chunks function.

slice_device_list() dividing the data in 20 chunks of each from 2000 entries and then I am using the thread_pool_executor with max_chunks = 20 so that it can loop on those 20 chunks and call the api for each chunk with using thread. Bu I didn't get any improvement.

Here is the code that I am using:-

with open('/home/kapilsharma/Downloads/smpa_replace (1).csv', 'r') as read_object:
csv_reader = reader(read_object)

# to ignore the heading row
next(csv_reader)
try:
    # storing rows in list
    for row in csv_reader:
        list.append(row)

    #logger.info("Data read from csv :: {}".format(list))

    # dividing the list into 20-20 chunk, this would be 3-D array
    data_chunks = slice_device_list(list)

    try:
        for chunks in data_chunks:
            with ThreadPoolExecutor(max_workers=20) as exe:
                logger.info("Started executing...")
                exe.submit(processing_chunks, chunks)
                logger.info("Execution completed...")

    except Exception as e:
        logger.info("Error occurred while hitting the api :: {}".format(e))


except Exception as ex:
    logger.info('Exception occurred in main part :: {}'.format(ex))

Is there any mistake in it that I am doing because the code is taking the same time as it was taking before without threading to hit the API for 2000 enteries.

Here is the processing_chunks() function:-

# Function to process the chunks
def processing_chunks(chunks):
    # Hitting the api
    for chunk in chunks:
        serial_no = chunk[1]
        oneCloudUserId = chunk[0]

        # Checking if deviceId exists in device table
        crsr.execute('select deviceId from device where serialNo = %s', (serial_no,))
        deviceId = crsr.fetchone()
        logger.info("deviceId :: {}".format(deviceId))

        # If deviceId not exists
        if deviceId == None:
            logger.info("Device not exist in CHP DB")
            # fields = ['x_user_id', 'serial_number', 'newService', 'Error']

            # Writing error in csv_file
            with open('false_api_hit2', 'a') as csvfile:
                csvwriter = csv.writer(csvfile)
                row = [chunk[0], chunk[1], chunk[2], "Device not exist in CHP DB"]
                csvwriter.writerow(row)

        # If deviceId exists
        else:
            # Check deviceId and Xid(userId) combination exist in subscriptionInfo table
            # crsr.execute('select * from subscriptionInfo where deviceId=%s and oneCloudUserId=%s and cloudServiceName=%s', (deviceId[0], oneCloudUserId, "armor"))
            # row = crsr.fetchone()

            crsr.execute(
                'select serviceId, expiryDate, serviceType from subscriptionInfo where deviceId = %s and oneCloudUserId=%s and cloudServiceName=%s',
                (deviceId[0], oneCloudUserId, "armor"))
            row1 = crsr.fetchone()

            # If the combination does not exist
            if (row1 == None):
                logger.info('Device and User mapping not exist in subscriptionInfo')
                # Writing error in csv_file
                with open('false_api_hit2', 'a') as csvfile:
                    csvwriter = csv.writer(csvfile)
                    row = [chunk[0], chunk[1], chunk[2], "Device and User mapping not exist in subscriptionInfo"]
                    csvwriter.writerow(row)


            # If the combination exists then hit the api
            else:
                service_id = row1[0]
                expiry_date = row1[1]
                service_type = row1[2]
                api_hit(chunk, service_id, str(expiry_date), service_type, deviceId[0])

And here is the api_hit() function:-

    def api_hit(data_chunks, service_id, expiry_date, service_type, deviceId):
    '''
    Function to hit the SMPA api
    :param data_chunks: chunk of data that we will get
    :return:
    '''

    try:
        try:
            payload = info
            # Updating payload
            payload["params"]["x_user_id"] = data_chunks[0]
            payload["params"]["reason"]["serial_number"] = data_chunks[1]
            payload["params"]["subscription"]["new_service"] = data_chunks[2]

            if (data_chunks[3] != ''):
                payload["params"]["reason"]["reason"] = data_chunks[3]

            payload["params"]["reason"]["comment"] = data_chunks[4]
            payload["params"]["subscription"]["service_id"] = service_id
            payload["params"]["subscription"]["expiry_date"] = expiry_date
            payload["params"]["subscription"]["service_type"] = service_type


        except Exception as ep:
            logger.info("Error occurred while updating payload :: {}".format(ep))

        try:
            # getting header to hit the api by calling function getWsseHeader
            HEADERS_VALUE = getWsseHeader("602a12-475e-4dc0-9d05",
                                          "BUZRKypEGGHansseAWWzDxhCk5lj9STV8i2WX")
            header = {"X-WSSE": HEADERS_VALUE}
        except Exception as eh:
            logger.info("Error occurred while getting header :: {}".format(eh))

        # Hitting api
        try:
            response = requests.request("POST", API_URL, json=payload, headers=header)
            response.raise_for_status()
            response = response.json()
            logger.info("Payload :: {}".format(payload))

        except requests.exceptions.HTTPError as error:
            logger.info("Error occurred while getting response :: {}".format(error))
        except Exception as er:
            logger.info("Error while getting response :: {}".format(er))

        if response["result"]["status"] == 1:
            # fields = ['x_user_id', 'serial_number', 'newService', 'Error']

            # Writing error in csv_file
            with open('false_api_hit2', 'a') as csvfile:
                csvwriter = csv.writer(csvfile)
                row = [data_chunks[0], data_chunks[1], data_chunks[2], response['result']['reason']]
                csvwriter.writerow(row)

        # status of response = 0 means success response
        elif response["result"]["status"] == 0:

            # Writing in csv_file
            # fields = ['x_user_id', 'serial_number', 'newService']
            with open('true_api_hit2', 'a') as csvfile:
                csvwriter = csv.writer(csvfile)
                row = [data_chunks[0], data_chunks[1], data_chunks[2]]
                csvwriter.writerow(row)

            now = datetime.datetime.now().replace(microsecond=0)
            # SMPA gives success reponse then update newService in - subscriptionInfo 'service' column
            crsr.execute('UPDATE subscriptionInfo SET service = %s, modifiedAt = %s where deviceId = %s and oneCloudUserId=%s and cloudServiceName=%s',
                         (data_chunks[2], now, deviceId, data_chunks[0], "armor"))

            now = datetime.datetime.now().replace(microsecond=0)
            # if 'serviceId' exists in the response then update this in subscriptionInfo table and if it is not there then do nothing
            if "serviceId" in response["result"]:
                crsr.execute('UPDATE subscriptionInfo SET serviceId = %s, modifiedAt = %s where deviceId = %s and oneCloudUserId=%s and cloudServiceName=%s',
                             (response.result.serviceId, now, deviceId,data_chunks[0], "armor"))
            sql_connection.commit()

    except Exception as e:
        logger.info("Some error occurred as :: {}".format(e))

Function getWsseHeader() will return the header for the api call.

Kapil Sharma
  • 146
  • 2
  • 12

1 Answers1

1

Like Corralien suggested, your first attempt has to be to determine whether processing_chunks is I/O-bound or CPU-bound.

For CPU-bound tasks, one can generally improve performance using a ProcessPoolExecutor which span the computing among multiple CPU cores you have to.

For I/O-bound tasks, you are basically out of luck. ProcessPoolExecutor only spawns more processes waiting for the chance to read or write the same file.

Due to Python's GIL, threads in Python are merely a way to schedule multiple pieces of programs to run in time slices. Threading doesn't automatically improve speed.

I'm guessing the bottleneck in processing_chunks is a combination of database operation (which leads to disk I/O) and network operation (in requests). And since you said the times are the same, I assume the network part is local. Threading might improve speed only when your network part is very slow.

Some additional readings: