I am trying to learn PySpark and have written a simple script that loads some JSON files from one of my HDFS directories, loads each in as a python dictionary (using json.loads() ) and then for each object, extracts some fields.
The relevant info is stored in a Spark Dataframe and I want to insert this data into a MySQL Table (I created this locally).
But, when I run this, I get an error with my connection URL.
It says "java.lang.RuntimeException: [1.5] failure: ``.'' expected but `:' found"
At this point:
jdbc:mysql://localhost:3306/bigdata?user=root&password=pwd
^
Database name is "bigdata"
username and password are included above
Port number I believe is correct
Here's the full script I have....:
import json
import pandas as pd
import numpy as np
from pyspark import SparkContext
from pyspark.sql import Row, SQLContext
SQL_CONNECTION="jdbc:mysql://localhost:3306/bigdata?user=root&password=pwd"
sc=SparkContext()
sqlContext = SQLContext(sc)
cols = ['Title', 'Site']
df = pd.DataFrame(columns=cols)
#First, load my files as RDD and convert them as JSON
rdd1 = sc.wholeTextFiles("hdfs://localhost:8020/user/ashishu/Project/sample data/*.json")
rdd2 = rdd1.map(lambda kv: json.loads(kv[1]))
#Read in the RDDs and do stuff
for record in rdd2.take(2):
title = record['title']
site = record['thread']['site_full']
vals = np.array([title, site])
df.loc[len(df)] = vals
sdf = sqlContext.createDataFrame(df)
sdf.show()
sdf.insertInto(SQL_CONNECTION, "sampledata")
SQL_CONNECTION is the connection URL at the beginning, and "sampledata" is the name of the table I want to insert into in MySQL. The specific database to use was specified in the connection url ("bigdata").
This is my spark-submit statement:
./bin/spark-submit /Users/ashishu/Desktop/sample.py --driver-class-path /Users/ashishu/Documents/Spark/.../bin/mysql-connector-java-5.1.42/mysql-connector-java-5.1.42-bin.jar
I am using Spark 1.6.1
Am I missing something stupid here about the MySQL connection? I tried replacing the ":" (between jdbc and mysql) with a "." but that obviously didn't fix anything and generated a different error...
Thanks
EDIT
I modified my code as per suggestions so that instead of using sdf.InsertInto, I said...
sdf.write.jdbc(SQL_CONNECTION, table="sampledata", mode="append")
However, now I get a new error after using the following submit command in terminal:
./bin/spark-submit sample.py --jars <path to mysql-connector-java-5.1.42-bin.jar>
The error is basically saying "an error occurred while calling o53.jdbc, no suitable driver found".
Any idea about this one?