2

I have a case where I am using PySpark (or Spark if I can't do it with Python and instead need to use Scala or Java) to pull data from several hundred database tables that lack primary keys. (Why Oracle would ever create an ERP product that contains tables with primary keys is a different subject... but regardless, we need to be able to pull the data and save the data from each database table into a Parquet file.) I originally tried using Sqoop instead of PySpark, but due to a number of issues we ran into, it made more sense to try using PySpark/Spark instead.

Ideally, I'd like to have each task node in my compute cluster: take the name of a table, query that table from the database, and save that table as a Parquet file (or set of Parquet files) in S3. My first step is to get it working locally in standalone mode. (If I had a primary key for each given table, then I could partition the query and file saving process across different sets of rows for the given table and distribute the row partitions across the task nodes in the compute cluster to perform the file saving operation in parallel, but because Oracle's ERP product lacks primary keys for the tables of concern, that's not an option.)

I'm able to successfully query the target database with PySpark, and I'm able to successfully save the data into a parquet file with multithreading, but for some reason, only a single thread does anything. So, what happens is that only a single thread takes a tableName, queries the database, and saves the file to the desired directory as a Parquet file. Then the job ends as if no other threads were executed. I'm guessing that there may be some type of locking issue taking place. If I correctly understood the comments here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark? then what I'm trying to do should be possible unless there are specific issues related to executing parallel JDBC SQL queries.

Edit: I'm specifically looking for a way that allows me to use a thread pool of some type so that I don't need to manually create a thread for each one of the tables that I need to process and manually load-balance them across the task nodes in my cluster.

Even when I tried setting:

--master local[*]

and

--conf 'spark.scheduler.mode=FAIR'

the problem remained.

Also, to briefly explain my code, I needed to use a custom JDBC driver, and I'm running the code in a Jupyter notebook on Windows, so I'm using a workaround to ensure that PySpark starts with the correct parameters. (For the record, I have nothing against other operating systems, but my Windows machine is my fastest workstation, so that's why I'm using it.)

Here's my setup:

driverPath = r'C:\src\NetSuiteJDBC\NQjc.jar'
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--driver-class-path '{0}' --jars '{0}' --master local[*] --conf 'spark.scheduler.mode=FAIR' --conf 'spark.scheduler.allocation.file=C:\\src\\PySparkConfigs\\fairscheduler.xml' pyspark-shell".format(driverPath)
)

import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Column, Row, SQLContext
from pyspark.sql.functions import col, split, regexp_replace, when
from pyspark.sql.types import ArrayType, IntegerType, StringType

spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
spark.sparkContext.setLogLevel("INFO")
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")
sc = SparkContext.getOrCreate()

Then, to test the multiprocessing, I created the file sparkMethods.py in the directory where I'm running my Jupyter notebook and put this method in it:

def testMe(x):
    return x*x

When I run:

from multiprocessing import Pool
import sparkMethods

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes

    # print "[0, 1, 4,..., 81]"
    print(pool.map(sparkMethods.testMe, range(10)))

in my Jupyter notebook, I get the expected output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Now, before anyone reviles against the way I wrote the next method, please know that I initially tried passing the spark context via a closure and then ran into a Pickling error, as documented here: I can "pickle local objects" if I use a derived class? So, I included all of the Spark context in this next method that I put into the sparkMethods.py file (at least until I can find a better way). The reason that I put the methods into the external file (instead of including them just in the Jupyter Notebook) was to deal with this problem: https://bugs.python.org/issue25053 as discussed here: Multiprocessing example giving AttributeError and here: python multiprocessing: AttributeError: Can't get attribute "abc"

This is that method that contains the logic for making the JDBC connection:

# In sparkMethods.py file:
def getAndSaveTableInPySpark(tableName):
    import os
    import os.path
    from pyspark.sql import SparkSession, SQLContext
    spark = SparkSession.builder.appName("sparkNetsuite").getOrCreate()
    spark.sparkContext.setLogLevel("INFO")
    spark.sparkContext.setLocalProperty("spark.scheduler.pool", "production")

    jdbcDF = spark.read \
        .format("jdbc") \
        .option("url", "OURCONNECTIONURL;") \
        .option("driver", "com.netsuite.jdbc.openaccess.OpenAccessDriver") \
        .option("dbtable", tableName) \
        .option("user", "USERNAME") \
        .option("password", "PASSWORD") \
        .load()

    filePath = "C:\\src\\NetsuiteSparkProject\\" + tableName + "\\" + tableName + ".parquet"
    jdbcDF.write.parquet(filePath)
    fileExists = os.path.exists(filePath)
    if(fileExists):
        return (filePath + " exists!")
    else:
        return (filePath + " could not be written!")

Then, back in my Jupyter notebook, I run:

import sparkMethods
from multiprocessing import Pool

if __name__ == '__main__':
    with Pool(5) as p:
        p.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)

The problem is that only one thread seems to execute.

When I execute it, in the console output, I see that it includes this initially:

The process cannot access the file because it is being used by another process. The system cannot find the file C:\Users\DEVIN~1.BOS\AppData\Local\Temp\spark-class-launcher-output-3662.txt. . . .

which leads me to suspect that perhaps there is some type of locking taking place.

Regardless, one of the threads will always run to completion successfully and successfully query its corresponding table and save it to a Parquet file as desired. There is some non-determinism in the process because different executions result in a different thread winning the race and consequently processing a different table. Interestingly, only a single job is getting executed, as shown in the Spark UI: Spark UI picture that shows that only one Spark Job was executed However, the article here: https://medium.com/@rbahaguejr/threaded-tasks-in-pyspark-jobs-d5279844dac0 implies that I should be expecting to see multiple jobs in the Spark UI if they were successfully started.

Now, if the problem is that PySpark is not actually capable of running multiple JDBC queries in parallel across different task nodes, then perhaps my solution would be to use a JDBC connection pool or even just open a connection for each table (as long as I close the connection at the end of the thread). When getting the list of tables to process, I had success with connecting to the database through the jaydebeapi library like this:

import jaydebeapi
conn = jaydebeapi.connect("com.netsuite.jdbc.openaccess.OpenAccessDriver",  
                          "OURCONNECTIONURL;", 
                          ["USERNAME", "PASSWORD"], 
                          r"C:\src\NetSuiteJDBC\NQjc.jar")

top5Tables = list(pd.read_sql("SELECT TOP 5 TABLE_NAME FROM OA_TABLES WHERE TABLE_OWNER != 'SYSTEM';", conn)["TABLE_NAME"].values)
conn.close()
top5Tables

Output is:

['SALES_TERRITORY_PLAN_PARTNER',
 'WORK_ORDER_SCHOOLS_TO_INSTALL_MAP',
 'ITEM_ACCOUNT_MAP',
 'PRODUCT_TRIAL_STATUS',
 'ACCOUNT_PERIOD_ACTIVITY']

So, conceivably, if the problem is that PySpark cannot be used to distribute multiple queries across task nodes like this, then perhaps I can use the jaydebeapi library to make the connection. However, in that case, I'd still need a way to be able to write the output of the JDBC SQL query to a Parquet file (which ideally would leverage Spark's schema inference capability), but I'm open to taking that approach if it's feasible.

So, how do I successfully query the database and save the output to Parquet files in parallel (i.e. distributed across the task nodes) without the master node performing all of the querying sequentially?

devinbost
  • 4,658
  • 2
  • 44
  • 57
  • 1
    I think, the problem is you shouldn't use `multiprocess` but `multithread`. As you don't and shouldn't create multiple spark context. What you need is multiple workers but not multiple drivers. – Sraw Nov 21 '18 at 02:12
  • Possible duplicate of [How to run independent transformations in parallel using PySpark?](https://stackoverflow.com/questions/38048068/how-to-run-independent-transformations-in-parallel-using-pyspark) – 10465355 Nov 21 '18 at 02:18
  • What does this have to do with Netsuite? FWIW All tables in Netsuite have a primary key ( generally a sequence) – bknights Nov 21 '18 at 07:14
  • @bknights When was the last time you joined OA_TABLES to OA_FKEYS? – devinbost Nov 21 '18 at 17:02
  • recently. What are you trying to find? – bknights Nov 21 '18 at 20:17
  • @bknights Try executing: "SELECT DISTINCT COL1.TABLE_NAME FROM OA_COLUMNS COL1 WHERE COL1.TABLE_OWNER != 'SYSTEM' AND NOT EXISTS (SELECT OA_FKEYS.PKTABLE_NAME FROM OA_FKEYS WHERE COL1.TABLE_NAME = OA_FKEYS.PKTABLE_NAME);" and tell me what you find. – devinbost Nov 21 '18 at 20:25
  • sure. That gives you a list of tables that are not part of a declared foreign key constraint. i.e. the database doesn't have declarative referential integrity. That's not the same as not having primary keys. I get that it's annoying for tooling but it might be useful to remember that mysql conquered the internet without referential integrity or transactions. – bknights Nov 22 '18 at 17:34
  • @bknights that may be true, but if I can't computationally derive the primary key columns, then I can't computationally split the columns by ( (max_value - min_value ) / num_partitions ) to distribute the partitions of rows across the Spark nodes in my cluster, nor do I have a way to computationally compare values to identify deletes for incremental imports. If they don't expose the primary keys, then for my purposes, they don't have them available. – devinbost Nov 23 '18 at 05:33

2 Answers2

1

With some hints provided by the comments in response to my question, as well as the answer here: How to run independent transformations in parallel using PySpark? I investigated the use of threading instead of multiprocessing. I took a more careful look at one of the answers here: How to run multiple jobs in one Sparkcontext from separate threads in PySpark? and noticed the use of:

from multiprocessing.pool import ThreadPool

I was able to make it work, like this:

from multiprocessing.pool import ThreadPool
pool = ThreadPool(5)
results = pool.map(sparkMethods.getAndSaveTableInPySpark, top5Tables)
pool.close() 
pool.join() 
print(*results, sep='\n')

which prints:

C:\src\NetsuiteSparkProject\SALES_TERRITORY_PLAN_PARTNER\SALES_TERRITORY_PLAN_PARTNER.parquet exists!
C:\src\NetsuiteSparkProject\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP\WORK_ORDER_SCHOOLS_TO_INSTALL_MAP.parquet exists!
C:\src\NetsuiteSparkProject\ITEM_ACCOUNT_MAP\ITEM_ACCOUNT_MAP.parquet exists!
C:\src\NetsuiteSparkProject\PRODUCT_TRIAL_STATUS\PRODUCT_TRIAL_STATUS.parquet exists!
C:\src\NetsuiteSparkProject\ACCOUNT_PERIOD_ACTIVITY\ACCOUNT_PERIOD_ACTIVITY.parquet exists!
devinbost
  • 4,658
  • 2
  • 44
  • 57
0

Basically, Spark takes care of parallelization under the hood and doesn't require using the multiprocessing package, in fact it probably interferes with Spark and is completely unnecessary. But a few things must be done to take advantage of this. The key is to build the queries and transformations first, but don't do any Actions . Also make sure your spark cluster is setup with multiple worker nodes where the work is distributed to. An easy way to do this is to use DataBricks notebooks or other services available from the large cloud vendors which set all this up for you.

Spark has two modes. TRANSFORMATIONS (which don’t execute anything but simply set up the queries and transformation sort of like SQL). And ACTIONS which actually execute the query and act on the results. count() is an action. show() is an action. A query is a transformation and a table addition is a transformation.

To use the inherent parallelism built into Spark, write several queries and transformations in Spark to different tables but don’t collect() or count() or show() the results (Don't perform any actions at this point, only transformations). This will internally arrange the queries but will not execute them ( this is sparks lazy mode) .

Then later in the code when you run an action (like count or show or collect) it will automatically distribute the work to all the available nodes in parallel. That’s the whole Beauty of spark. No special multi-processing is required on your local device, it is all handled by Spark.

Here is a pySpark example:

    # First build the queries but don't collect any data.
        part1_sdf = spark.sql(
          "SELECT UtcTime, uSecDelay, sender, Recipient, date , ID "
          "FROM Delay_table "
          "WHERE date between DATE_ADD(now(), - 60) AND DATE_ADD(now(), -59) "
          "AND ID = 'my_id' "
          "ORDER BY UtcTime DESC "
        )
        part2_sdf = spark.sql(
          "SELECT UtcTime, uSecDelay, sender, Recipient, date, ID "
          "FROM Delay_table "
          "WHERE date between DATE_ADD(now(), -58) AND DATE_ADD(now(), -57) "
          "AND ID = 'my_id' "
          "ORDER BY UtcTime DESC "
        )
        # Peform a Transformation on the 2 queries.  No data is pulled up to this point
        transformed_df = part1_sdf.union(part2_sdf)
        # Finally when an action is called, the data is pulled in parallel:
        transformed_df.show(10)
        ### Output
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        |UtcTime|                   uSecDelay|                              
 sender|Recipient|                    date|                                 ID|
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        |      2020-01-05 01:39:...|                                    69|                4|                  28|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    65|                4|                  26|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    62|                4|                   0|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                   108|                4|                  16|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    68|                4|                  27|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    71|                4|                  53|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    68|                4|                   7|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    65|                4|                  57|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    64|                4|                  56|2020-01-05|  my_id|
        |      2020-01-05 01:39:...|                                    66|                4|                  44|2020-01-05|  my_id|
        +--------------------------+--------------------------------------+-----------------+--------------------+----------+--------+
        only showing top 10 rows
Gerard G
  • 171
  • 2
  • 4