In this code, I am reading a CSV containing 2000 entries. CSV contains data to hit the API. The api_hit() function is in processing_chunks function.
slice_device_list() dividing the data in 20 chunks of each from 2000 entries and then I am using the thread_pool_executor with max_chunks = 20 so that it can loop on those 20 chunks and call the api for each chunk with using thread. Bu I didn't get any improvement.
Here is the code that I am using:-
with open('/home/kapilsharma/Downloads/smpa_replace (1).csv', 'r') as read_object:
csv_reader = reader(read_object)
# to ignore the heading row
next(csv_reader)
try:
# storing rows in list
for row in csv_reader:
list.append(row)
#logger.info("Data read from csv :: {}".format(list))
# dividing the list into 20-20 chunk, this would be 3-D array
data_chunks = slice_device_list(list)
try:
for chunks in data_chunks:
with ThreadPoolExecutor(max_workers=20) as exe:
logger.info("Started executing...")
exe.submit(processing_chunks, chunks)
logger.info("Execution completed...")
except Exception as e:
logger.info("Error occurred while hitting the api :: {}".format(e))
except Exception as ex:
logger.info('Exception occurred in main part :: {}'.format(ex))
Is there any mistake in it that I am doing because the code is taking the same time as it was taking before without threading to hit the API for 2000 enteries.
Here is the processing_chunks() function:-
# Function to process the chunks
def processing_chunks(chunks):
# Hitting the api
for chunk in chunks:
serial_no = chunk[1]
oneCloudUserId = chunk[0]
# Checking if deviceId exists in device table
crsr.execute('select deviceId from device where serialNo = %s', (serial_no,))
deviceId = crsr.fetchone()
logger.info("deviceId :: {}".format(deviceId))
# If deviceId not exists
if deviceId == None:
logger.info("Device not exist in CHP DB")
# fields = ['x_user_id', 'serial_number', 'newService', 'Error']
# Writing error in csv_file
with open('false_api_hit2', 'a') as csvfile:
csvwriter = csv.writer(csvfile)
row = [chunk[0], chunk[1], chunk[2], "Device not exist in CHP DB"]
csvwriter.writerow(row)
# If deviceId exists
else:
# Check deviceId and Xid(userId) combination exist in subscriptionInfo table
# crsr.execute('select * from subscriptionInfo where deviceId=%s and oneCloudUserId=%s and cloudServiceName=%s', (deviceId[0], oneCloudUserId, "armor"))
# row = crsr.fetchone()
crsr.execute(
'select serviceId, expiryDate, serviceType from subscriptionInfo where deviceId = %s and oneCloudUserId=%s and cloudServiceName=%s',
(deviceId[0], oneCloudUserId, "armor"))
row1 = crsr.fetchone()
# If the combination does not exist
if (row1 == None):
logger.info('Device and User mapping not exist in subscriptionInfo')
# Writing error in csv_file
with open('false_api_hit2', 'a') as csvfile:
csvwriter = csv.writer(csvfile)
row = [chunk[0], chunk[1], chunk[2], "Device and User mapping not exist in subscriptionInfo"]
csvwriter.writerow(row)
# If the combination exists then hit the api
else:
service_id = row1[0]
expiry_date = row1[1]
service_type = row1[2]
api_hit(chunk, service_id, str(expiry_date), service_type, deviceId[0])
And here is the api_hit() function:-
def api_hit(data_chunks, service_id, expiry_date, service_type, deviceId):
'''
Function to hit the SMPA api
:param data_chunks: chunk of data that we will get
:return:
'''
try:
try:
payload = info
# Updating payload
payload["params"]["x_user_id"] = data_chunks[0]
payload["params"]["reason"]["serial_number"] = data_chunks[1]
payload["params"]["subscription"]["new_service"] = data_chunks[2]
if (data_chunks[3] != ''):
payload["params"]["reason"]["reason"] = data_chunks[3]
payload["params"]["reason"]["comment"] = data_chunks[4]
payload["params"]["subscription"]["service_id"] = service_id
payload["params"]["subscription"]["expiry_date"] = expiry_date
payload["params"]["subscription"]["service_type"] = service_type
except Exception as ep:
logger.info("Error occurred while updating payload :: {}".format(ep))
try:
# getting header to hit the api by calling function getWsseHeader
HEADERS_VALUE = getWsseHeader("602a12-475e-4dc0-9d05",
"BUZRKypEGGHansseAWWzDxhCk5lj9STV8i2WX")
header = {"X-WSSE": HEADERS_VALUE}
except Exception as eh:
logger.info("Error occurred while getting header :: {}".format(eh))
# Hitting api
try:
response = requests.request("POST", API_URL, json=payload, headers=header)
response.raise_for_status()
response = response.json()
logger.info("Payload :: {}".format(payload))
except requests.exceptions.HTTPError as error:
logger.info("Error occurred while getting response :: {}".format(error))
except Exception as er:
logger.info("Error while getting response :: {}".format(er))
if response["result"]["status"] == 1:
# fields = ['x_user_id', 'serial_number', 'newService', 'Error']
# Writing error in csv_file
with open('false_api_hit2', 'a') as csvfile:
csvwriter = csv.writer(csvfile)
row = [data_chunks[0], data_chunks[1], data_chunks[2], response['result']['reason']]
csvwriter.writerow(row)
# status of response = 0 means success response
elif response["result"]["status"] == 0:
# Writing in csv_file
# fields = ['x_user_id', 'serial_number', 'newService']
with open('true_api_hit2', 'a') as csvfile:
csvwriter = csv.writer(csvfile)
row = [data_chunks[0], data_chunks[1], data_chunks[2]]
csvwriter.writerow(row)
now = datetime.datetime.now().replace(microsecond=0)
# SMPA gives success reponse then update newService in - subscriptionInfo 'service' column
crsr.execute('UPDATE subscriptionInfo SET service = %s, modifiedAt = %s where deviceId = %s and oneCloudUserId=%s and cloudServiceName=%s',
(data_chunks[2], now, deviceId, data_chunks[0], "armor"))
now = datetime.datetime.now().replace(microsecond=0)
# if 'serviceId' exists in the response then update this in subscriptionInfo table and if it is not there then do nothing
if "serviceId" in response["result"]:
crsr.execute('UPDATE subscriptionInfo SET serviceId = %s, modifiedAt = %s where deviceId = %s and oneCloudUserId=%s and cloudServiceName=%s',
(response.result.serviceId, now, deviceId,data_chunks[0], "armor"))
sql_connection.commit()
except Exception as e:
logger.info("Some error occurred as :: {}".format(e))
Function getWsseHeader() will return the header for the api call.