1

I have a rdd which is distributed accross multiple machines in a spark environment. I would like to execute a function on each worker machine on this rdd. I do not want to collect the rdd and then execute a function on the driver. The function should be executed seperately on each executors for their own rdd. How can I do that

Update (adding code) I am running all this in spark shell

import org.apache.spark.sql.cassandra.CassandraSQLContext
import java.util.Properties

 val cc = new CassandraSQLContext(sc)
 val rdd  = cc.sql("select * from sams.events where appname = 'test'");
 val df = rdd.select("appname", "assetname");

Here I have a df with 400 rows. I need to save this df to sql server table. When I try to use df.write method it gives me errors which I have posted in a separate thread spark dataframe not appending to the table

I can open a driverManager conection and insert rows but that will be done in the driver module of spark

import java.sql._
import com.microsoft.sqlserver.jdbc.SQLServerDriver
// create a Statement from the connection
Statement statement = conn.createStatement();

// insert the data
statement.executeUpdate("INSERT INTO Customers " + "VALUES (1001, 'Simpson', 'Mr.', 'Springfield', 2001)");
String connectionUrl = "jdbc:sqlserver://localhost:1433;" +
   "databaseName=AdventureWorks;user=MyUserName;password=*****;";
Connection con = DriverManager.getConnection(connectionUrl);

I need to do this writing in the executor machine. How can I achieve this?

Community
  • 1
  • 1
Nipun
  • 4,119
  • 5
  • 47
  • 83
  • 1
    that's a normal Spark operation. Would you post the code you're working on? You might just need some guidance in the good direction. – maasg Jul 14 '15 at 10:01
  • I have dataframe which contains collections of sql rows. Now I am getting this dataframe from cassandracql. I Process on this dataframe and need to write this dataframe to sql server. I am not able to use jdbc method of dataframewriter, so I am using insert into statement. – Nipun Jul 14 '15 at 10:04
  • after reading about spark, I read that code generally runs in the driver and some operation which is needed to be performed is executed on the executor. – Nipun Jul 14 '15 at 10:07
  • 1
    Could you add your code to the question to make it clear? – maasg Jul 14 '15 at 10:12

1 Answers1

0

In order to setup connections from workers to other systems, we should use rdd.foreachPartitions(iter => ...)

foreachPartitions lets you execute an operation for each partition, giving you access to the data of the partition as a local iterator. With enough data per partition, the time of setting up resources (like db connections) is amortized by using such resources over a whole partition.

abstract eg.

rdd.foreachPartition(iter => 
   //setup db connection
   val dbconn = Driver.connect(ip, port)
   iter.foreach{element => 
       val query = makeQuery(element)
       dbconn.execute(query)
   }
   dbconn.close
}

It's also possible to create singleton resource managers that manage those resources for each JVM of the cluster. See also this answer for a complete example of such local resource manager: spark-streaming and connection pool implementation

Community
  • 1
  • 1
maasg
  • 37,100
  • 11
  • 88
  • 115
  • Thanks a lot maasg. I have data from Cassandra connector which is populated in a dataframe. I process it and now I need to write it a sqlserver. For this I have created a separate dataframe. Do you know how can I copy one dataframe to another and then write this to sqlserver? – Nipun Jul 14 '15 at 15:17