2

Can anyone please tell me if there is any way in apache spark to store a JavaRDD on mysql database? I am taking input from 2 csv files and then after doing join operations on their contents I need to save the output(the output JavaRDD) in the mysql database. I am already able to save the output successfully on hdfs but I am not finding any information related to apache Spark-MYSQL connection. Below I am posting the code for spark sql. This might serve as a reference to those who are looking for an example for spark-sql.

package attempt1;

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;


public class Spark_Mysql {
    @SuppressWarnings("serial")
    public static class CompleteSample implements Serializable {
        private String ASSETNUM;
        private String ASSETTAG;
        private String CALNUM;



        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getASSETTAG() {
            return ASSETTAG;
        }
        public void setASSETTAG(String aSSETTAG) {
            ASSETTAG = aSSETTAG;
        }
        public String getCALNUM() {
            return CALNUM;
        }
        public void setCALNUM(String cALNUM) {
            CALNUM = cALNUM;
        }


      }

    @SuppressWarnings("serial")
    public static class ExtendedSample implements Serializable {

        private String ASSETNUM;
        private String CHANGEBY;
        private String CHANGEDATE;


        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getCHANGEBY() {
            return CHANGEBY;
        }
        public void setCHANGEBY(String cHANGEBY) {
            CHANGEBY = cHANGEBY;
        }
        public String getCHANGEDATE() {
            return CHANGEDATE;
        }
        public void setCHANGEDATE(String cHANGEDATE) {
            CHANGEDATE = cHANGEDATE;
        }
    }

    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {

          JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
          JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

          JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
                  new Function<String, CompleteSample>() {
                    public CompleteSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      CompleteSample cs = new CompleteSample();
                      cs.setASSETNUM(parts[0]);
                      cs.setASSETTAG(parts[1]);
                      cs.setCALNUM(parts[2]);

                      return cs;
                    }
                  });

          JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
                  new Function<String, ExtendedSample>() {
                    public ExtendedSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      ExtendedSample es = new ExtendedSample();
                      es.setASSETNUM(parts[0]);
                      es.setCHANGEBY(parts[1]);
                      es.setCHANGEDATE(parts[2]);

                      return es;
                    }
                  });

          JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
            complete.registerAsTable("cs");

          JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
          extended.registerAsTable("es");

          JavaSchemaRDD fs= sqlCtx.sql("SELECT cs.ASSETTAG, cs.CALNUM, es.CHANGEBY, es.CHANGEDATE FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");


          JavaRDD<String> result = fs.map(new Function<Row, String>() {
              public String call(Row row) {
                return row.getString(0);
              }
            });

              result.saveAsTextFile("hdfs://path/to/hdfs/dir-name");          //instead of hdfs I need to save it on mysql database, but I am not able to find any Spark-MYSQL connection

    }



}

Here at the end I am saving the result successfully in HDFS. But now I want to save into MYSQL database. Kindly help me out. Thanks

Amitabh Ranjan
  • 1,500
  • 3
  • 23
  • 39

2 Answers2

4

There are two approaches you can use for writing your results back to the database. One is to use something like DBOutputFormat and configure that, and the other is to use foreachPartition on the RDD you want to save and pass in a function which creates a connection to MySQL and writes the result back.

Holden
  • 7,392
  • 1
  • 27
  • 33
  • Hi.. Thanks for the answer. I will try these options and let you know. Meanwhile I have found out about "JavaRDD" (link :https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/rdd/JdbcRDDSuite.scala).. This might work. But this is in scala and I am completely unaware of scala. Trying to figure out this eample. Thanks – Amitabh Ranjan Jul 24 '14 at 07:18
  • Here "DBOutputFormat" is not working but "foreachPartition" worked. There is one other way I figured. Use "ForeachRDD" and then breakdown the rdd to arrays and using jdbc we can feed the input or retrieve the output. I am posting a sample code as answer just in case anyone else need it. – Amitabh Ranjan Jul 26 '14 at 07:43
  • Hi Amit. You said you would be posting a sample code. Please post it if you can. It will be greatly appreciated. Thanks – Akshay Hazari Nov 03 '14 at 06:07
  • @RED I found another question with a sample foreachPartition sample here: http://stackoverflow.com/questions/24916852/how-can-i-connect-to-a-postgresql-database-into-apache-spark-using-scala – G-Mac Nov 08 '14 at 23:12
0

Here is an example using DBOutputFormat.

Create a class that represents your table row -

public class TableRow implements DBWritable
{
    public String column1;
    public String column2;

    @Override
    public void write(PreparedStatement statement) throws SQLException
    {
        statement.setString(1, column1);
        statement.setString(2, column2);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException
    {
        throw new RuntimeException("readFields not implemented");
    }
}

Then configure your job and write a mapToPair function. The value doesn't appear to be used. If anyone knows, please post a comment.

String tableName = "YourTableName";
String[] fields = new String[] { "column1", "column2" };

JobConf job = new JobConf();
DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost/DatabaseNameHere", "username", "password");
DBOutputFormat.setOutput(job, tableName, fields);

// map your rdd into a table row
JavaPairRDD<TableRow, Object> rows = rdd.mapToPair(...);

rows.saveAsHadoopDataset(job);
Josh Unger
  • 6,717
  • 6
  • 33
  • 55