0

I am using Pyspark 2.4, want to write data to SQL Server, which isn't working.

I've placed the jar file downloaded from here in the spark path:

D:\spark-2.4.3-bin-hadoop2.7\spark-2.4.3-bin-hadoop2.7\jars\

But, to no avail. Following is the pyspark code to write the data into the SQL Server.

sql_server_dtls = {'user': 'john', 'password': 'doe'}

ports_budget_joined_DF.write.jdbc(url="jdbc:sqlserver://endpoint:1433;databaseName=poc", table='dbo.test_tmp', mode='overwrite', properties=sql_server_dtls)

Getting below error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\sql\readwriter.py", line 982, in jdbc
    self.mode(mode)._jwrite.jdbc(url, table, jprop)
  File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
  File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o45.jdbc.
: java.sql.SQLException: No suitable driver

Am I missing out on something? Also, I want to truncate the table first before writing the new data to it. Does mode='overwrite' in the DF writer handle the same for SQL Server target system as well?

Aakash Basu
  • 1,689
  • 7
  • 28
  • 57
  • Possible duplicate of [The infamous java.sql.SQLException: No suitable driver found](https://stackoverflow.com/questions/1911253/the-infamous-java-sql-sqlexception-no-suitable-driver-found) – Max Sep 18 '19 at 07:38

1 Answers1

1

You just need com.mysql.cj.jdbc.Driver, which Spark can automatically download in whatever directory it is looking for it in.

Use this function:

def connect_to_sql(
    spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):

    jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)

    connection_details = {
        "user": username,
        "password": password,
        "driver": "com.mysql.cj.jdbc.Driver",
    }

    df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
    return df

ADDITION:

You can use the below function (you can edit it to your use) to pass packages while declaring your sparkSession(). You can pass the artifact ID's of the packages in a list, or as comma separated string. You can get them from The central repository

def create_spark_session(master_url, packages=None):
    """
    Creates a spark session
    :param master_url: IP address of the cluster you want to submit the job to or local with all cores
    :param packages: Any external packages if needed, only when called. This variable could be a string of the package
        specification or a list of package specifications.
    :return: spark session object
    """
    if packages:
        packages = ",".join(packages) if isinstance(packages, list) else packages
        spark = (
            SparkSession.builder.master(master_url)
            .config("spark.io.compression.codec", "snappy")
            .config("spark.ui.enabled", "false")
            .config("spark.jars.packages", packages)
            .getOrCreate()
        )
    else:
        spark = (
            SparkSession.builder.master(master_url)
            .config("spark.io.compression.codec", "snappy")
            .config("spark.ui.enabled", "false")
            .getOrCreate()
        )

    return spark
pissall
  • 7,109
  • 2
  • 25
  • 45
  • Sorry for late update, I just did that. But, it is very slow for bigger datasets, probably because a row-wise load is happening. Can the load performance be increased? – Aakash Basu Sep 18 '19 at 14:54
  • Second point is, even if I'm giving driver option, it is failing if I don't pass the jar as --jars parameter when I submit the job or start the pyspark shell. To my understanding, when we provide driver, it downloads from internet and doesn't look for the jar in jars path inside spark or the jars parameter, but its not happening in my local machine's test pyspark shell. Any idea, why? – Aakash Basu Sep 18 '19 at 14:58
  • @AakashBasu Spark will download and save it to a jar path, and next time it will look for it in the same path (unless the environment doesn't change). If we pass the `--jars` option in the console, I think it looks straight into the jars directory (without trying to download it). I will update my answer to include something more. – pissall Sep 18 '19 at 15:04
  • Can you pls help me with the first comment? It is very slow, how to speed up the write to SQL Server? – Aakash Basu Sep 19 '19 at 07:00
  • @AakashBasu Are you filtering the data after importing it into spark? – pissall Sep 19 '19 at 10:21
  • No. I read two tables, do a join on some key and then write it. What can be done? – Aakash Basu Sep 19 '19 at 12:18
  • @AakashBasu Have you figured if your reading per se takes a lot of time or is it the join that takes more time? – pissall Sep 19 '19 at 12:22
  • I am running in pyspark shell, so since it is line by line (REPL), I can understand where it takes time. Join is smooth and fast, but when I hit the line to save, its very slow with 6 stages created and runs for approx 1 - 1.5 mins for around 75,000 rows (which I feel is quite slow): [Stage 4:=============================> (3 + 3) / 6] Count: 76206 66.1410448551178 secs taken. – Aakash Basu Sep 23 '19 at 07:08
  • @AakashBasu You need to call an ‘action’ after reading the data and after joining the data to know exactly which one takes time. As you know Apache Spark is a lazy evaluator – pissall Sep 23 '19 at 12:36
  • I'm doing exactly that my friend, as I'm quite aware of Spark's lazy execution model. This is my code post .show() on the DF. from time import time t1 = time() ports_budget_joined_DF.write.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").jdbc(url="xyz", table='dbo.test', mode='overwrite', properties=sql_server_dtls) t2 = time() print(t2 - t1, ' secs taken.') – Aakash Basu Sep 23 '19 at 12:41
  • Since you’re doing it in console, when you save.. all your stages get executed all over again. – pissall Sep 23 '19 at 12:42
  • But when I'm doing in batch mode, as a submit, it takes the same time. Test seems to be consistent. That's why, I wanted to know from you or anyone else if there's a parallel loader than a sequencial one. Like in Redshift (PostgreSQL), when I load through python, 1 million records get pushed in 30 - 40 secs approx. Don't really know why this is so slow. I read some discussions about Azure driver for SQL Server. How can I use it? any idea? – Aakash Basu Sep 23 '19 at 12:48