I have a large data stored in a sqlite database. I am using java (jdbc driver) to retrieve the data from the sqlite table in batches and then process the data. Finally the processed data is rewritten back as a new column in the table(database). Since the processing of the data is fairly straight forward I tried to use multi threading in java to speed up the calculations.
steps which I followed were:
- spawn child threads
- each child then reads data from the sqlite db and processes the data
- when the data processing is done it is rewritten to the database using a synchronised function(insert and commit).
How ever I find no improvement in processing speed(calculations). In fact as the number of threads increases the speed decreased.
no multi threading:
1000 records ~ 2 min
2 threads : 1000 records ~ 2 min: 3 sec
4 threads : 1000 records ~ 2 min: 30 sec
10 threads : 1000 records ~ 2 min: 52 sec
I am using a Mac book pro: Mountain Lion; 2.4 GHz Intel core 2 Duo (4GB 1067 MHz DDR3).
The code is as follows:
package org.openscience.jch.diversity;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openscience.cdk.DefaultChemObjectBuilder;
import org.openscience.cdk.fingerprint.MACCSFingerprinter;
import org.openscience.cdk.interfaces.IAtomContainer;
import org.openscience.jch.utilities.IteratingMolTableReader;
/**
*
* @author chandu
*/
public class MultiThreadCalculator {
// Main Class
public static void main(String args[]) throws SQLException {
int range = 0;
int start = 0;
int stop = 0;
int a = 0;
int numberOfThreads = 4;
int count = 10000;
Connection connection = connectDb("Zinc.db");
connection.setAutoCommit(false);
range = (int) Math.ceil(count /(double)(numberOfThreads));
// generate the child threads and assigns them the range of rows to read from the db
for (int i = 1; i <= numberOfThreads; i++) {
stop = range * i;
System.out.println(start + "," + stop);
new NewThread(start, stop, i,connection);
start = stop + 1;
}
System.out.println("Main thread exiting." + a);
}
// method to connect to db
private static Connection connectDb(String path) {
Connection c = null;
try {
Class.forName("org.sqlite.JDBC");
c = DriverManager.getConnection("jdbc:sqlite:" + path);
} catch (Exception e) {
System.err.println(e.getClass().getName() + ": " + e.getMessage());
System.exit(0);
}
System.out.println("Opened database successfully");
return c;
}
// Child thread
public static class NewThread implements Runnable {
Thread t;
int ii;
int tStart = 0;
int tStop = 0;
static int ince = 0;
int a = 0;
Connection connection = null;
NewThread(int start, int stop, int threadID, Connection c) {
tStart = start;
tStop = stop;
ii = threadID;
System.out.println("child thread"+ii);
t = new Thread(this, "Demo Thread");
connection = c;
t.setPriority( Thread.NORM_PRIORITY + 1 );
t.start();
}
// This is the data processing part
public void run() {
Map< Integer, byte[]> map = new HashMap< Integer, byte[]>();
try (Statement stmt = connection.createStatement();
ResultSet rs = stmt.executeQuery("SELECT * FROM MOLDATA WHERE ID>=" + tStart + " and ID<=" + tStop + ";")) {
//SmilesGenerator sg = new SmilesGenerator(true);
MACCSFingerprinter mp = new MACCSFingerprinter();
while (rs.next()) {
IAtomContainer molecule = null;
int id = rs.getInt("ID");
InputStream is = new ByteArrayInputStream(rs.getString("STUCTURE").getBytes());
IteratingMolTableReader reader = new IteratingMolTableReader(is, DefaultChemObjectBuilder.getInstance(), true);
while (reader.hasNext()) {
molecule = reader.next();
break;
}
byte[] bi = mp.getBitFingerprint(molecule).asBitSet().toByteArray();
//System.out.println(bi.length);
//String smiles = sg.createSMILES(molecule);
map.put(id, bi);
System.out.println(id);
}
stmt.close();
} catch (Exception e) {
System.err.println(e.getClass().getName() + ": " + e.getMessage());
System.exit(0);
}
try {
writer(connection, map);
} catch (SQLException ex) {
Logger.getLogger(MultiThreadCalculator.class.getName()).log(Level.SEVERE, null, ex);
}
System.out.println("Exiting child thread." + a);
}
// Synchronised method to insert processed data and commit changes.
public synchronized static void writer(Connection connection, Map<Integer, byte[]> mp) throws SQLException {
String sql = "UPDATE MOLDATA SET FP = ? WHERE ID = ?";
PreparedStatement psUpdateRecord = connection.prepareStatement(sql);
int[] iNoRows = null;
for (int a : mp.keySet()) {
byte[] bi = mp.get(a);
psUpdateRecord.setBytes(1, bi);
psUpdateRecord.setInt(2, a);
psUpdateRecord.addBatch();
}
iNoRows = psUpdateRecord.executeBatch();
connection.commit();
System.out.println("Commit Done");
}
}
}