0

I have the following code which takes names as input and uses a pandas_udf to achieve vectorization by sending it to the insert function that makes an http request for each name in the row to get a series of results:

spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()

def insert(row):

    url = insertURL
    try:
        payload = json.dumps({
            "records": [
                {
                    "fields": {
                        "name": row["name"]
                    }
                }
            ]
        })

        headers = {
            'Content-Type': 'application/json',
            'Accept': 'application/json',
            'Authorization': f'Bearer {bearerToken}'
        }
        response = requests.request(
            "POST", url, headers=headers, data=payload)
        dataRes = response.json()
        return dataRes

    except Exception as e:
        print("Error Occurred:", e)

@pandas_udf("string")
def insertUDF(name: pd.Series) -> pd.Series :
    tok_df = pd.DataFrame({"name":name})
    tok_df["Tok_Name"] = tok_df.apply(insert, axis=1)
    return tok_df["Tok_Name"]


def main():

    match args.subparser:

        case 'Insert-Records':

            fake = Faker()
            global numberOfRecords
            global insertURL
            global bearerToken
            bearerToken="abc"

            numberOfRecords = args.numberOfRecordsToBeInserted
            insertURL = args.insertURL

            columns = ["seqno", "name"]
            data = []
            for i in range(numberOfRecords):
                data.append((i, fake.name()))

            df = spark.createDataFrame(data=data, schema=columns)

            df.withColumn("Tok_Name", insertUDF(df["name"]))
            df.show()
        
        case _:
            print("Invalid operation")


if __name__ == "__main__":

    main()

The df.show() does not display the Tok_Name column and the http request also does not get successfully sent since it does not get inserted at the endpoint. The code does not throw any error as well.

This is the output to it:

+-----+--------------------+                                                    
|seqno|                name|
+-----+--------------------+
|    0|        James Medina|
|    1|         Lisa Rivera|
|    2|           Jill Park|
+-----+--------------------+

I had a look at this post as well.

For some reason, the line

tok_df["Tok_Name"] = tok_df.apply(insert, axis=1)

does not invoke the insert function.

Navidk
  • 612
  • 6
  • 14

0 Answers0