-1

I have a large data stored in a sqlite database. I am using java (jdbc driver) to retrieve the data from the sqlite table in batches and then process the data. Finally the processed data is rewritten back as a new column in the table(database). Since the processing of the data is fairly straight forward I tried to use multi threading in java to speed up the calculations.

steps which I followed were:

  1. spawn child threads
  2. each child then reads data from the sqlite db and processes the data
  3. when the data processing is done it is rewritten to the database using a synchronised function(insert and commit).

How ever I find no improvement in processing speed(calculations). In fact as the number of threads increases the speed decreased.

no multi threading:

1000 records ~ 2 min

2 threads : 1000 records ~ 2 min: 3 sec

4 threads : 1000 records ~ 2 min: 30 sec

10 threads : 1000 records ~ 2 min: 52 sec

I am using a Mac book pro: Mountain Lion; 2.4 GHz Intel core 2 Duo (4GB 1067 MHz DDR3).

The code is as follows:

package org.openscience.jch.diversity;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openscience.cdk.DefaultChemObjectBuilder;
import org.openscience.cdk.fingerprint.MACCSFingerprinter;
import org.openscience.cdk.interfaces.IAtomContainer;
import org.openscience.jch.utilities.IteratingMolTableReader;

/**
 *
 * @author chandu
 */
public class MultiThreadCalculator {
    // Main Class
    public static void main(String args[]) throws SQLException {
        int range = 0;
        int start = 0;
        int stop = 0;
        int a = 0;
        int numberOfThreads = 4;
        int count = 10000;
        Connection connection = connectDb("Zinc.db");
        connection.setAutoCommit(false);
        range = (int) Math.ceil(count /(double)(numberOfThreads));

        // generate the child threads and assigns them the range of rows to read from the db

        for (int i = 1; i <= numberOfThreads; i++) {
            stop = range * i;
            System.out.println(start + "," + stop);
            new NewThread(start, stop, i,connection);
            start = stop + 1;
        }
        System.out.println("Main thread exiting." + a);
    }

    // method to connect to db
    private static Connection connectDb(String path) {
         Connection c = null;
        try {
            Class.forName("org.sqlite.JDBC");
            c = DriverManager.getConnection("jdbc:sqlite:" + path);
        } catch (Exception e) {
            System.err.println(e.getClass().getName() + ": " + e.getMessage());
            System.exit(0);
        }
        System.out.println("Opened database successfully");
        return c;
    }

    // Child thread
    public static class NewThread implements Runnable {
        Thread t;
        int ii;
        int tStart = 0;
        int tStop = 0;
        static int ince = 0;
        int a = 0;
        Connection connection = null;

        NewThread(int start, int stop, int threadID, Connection c) {
            tStart = start;
            tStop = stop;
            ii = threadID;
            System.out.println("child thread"+ii);
            t = new Thread(this, "Demo Thread");
            connection = c;
            t.setPriority( Thread.NORM_PRIORITY + 1 ); 
            t.start(); 
        }

        // This is the data processing part
        public void run() {
            Map< Integer, byte[]> map = new HashMap< Integer, byte[]>();

            try (Statement stmt = connection.createStatement();
                    ResultSet rs = stmt.executeQuery("SELECT * FROM MOLDATA WHERE ID>=" + tStart + " and ID<=" + tStop + ";")) {
                //SmilesGenerator sg = new SmilesGenerator(true);
                MACCSFingerprinter mp = new MACCSFingerprinter();
                while (rs.next()) {
                    IAtomContainer molecule = null;
                    int id = rs.getInt("ID");
                    InputStream is = new ByteArrayInputStream(rs.getString("STUCTURE").getBytes());
                    IteratingMolTableReader reader = new IteratingMolTableReader(is, DefaultChemObjectBuilder.getInstance(), true);
                    while (reader.hasNext()) {
                        molecule = reader.next();
                        break;
                    }

                    byte[] bi = mp.getBitFingerprint(molecule).asBitSet().toByteArray();

                    //System.out.println(bi.length);
                    //String smiles = sg.createSMILES(molecule);
                    map.put(id, bi);
                    System.out.println(id);
                }
                stmt.close();
            } catch (Exception e) {
                System.err.println(e.getClass().getName() + ": " + e.getMessage());
                System.exit(0);
            }
            try {
                writer(connection, map);
            } catch (SQLException ex) {
                Logger.getLogger(MultiThreadCalculator.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println("Exiting child thread." + a);
        }

        // Synchronised method to insert processed data and commit changes.

        public synchronized static void writer(Connection connection, Map<Integer, byte[]> mp) throws SQLException {
            String sql = "UPDATE MOLDATA SET FP = ? WHERE ID = ?";
            PreparedStatement psUpdateRecord = connection.prepareStatement(sql);
            int[] iNoRows = null;
            for (int a : mp.keySet()) {
                byte[] bi = mp.get(a);
                psUpdateRecord.setBytes(1, bi);
                psUpdateRecord.setInt(2, a);

                psUpdateRecord.addBatch();
            }
            iNoRows = psUpdateRecord.executeBatch();
            connection.commit();
            System.out.println("Commit Done");
        }
    }
}
Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
CS76
  • 1
  • 3
  • Have you done some profiling? What takes up the time; database activity or calculations? Generally, you should avoid sharing the same ``Connection`` among multiple threads and have one Connection per Thread. – qqilihq Nov 21 '13 at 18:45
  • No I havent done the profiling. I tried with one connection per thread but it is giving the error: [SQLITE_BUSY] The database file is locked (database is locked). I think Sqlite dont allow to have multiple connections among multiple threads. http://stackoverflow.com/a/10707791/2995634 . I think that the problem is not with the db query calls but the multiple threads processing the data.. – CS76 Nov 21 '13 at 19:16
  • calculations are taking long time (Example: generating the MACCS structure keys for the chemical structures in the database)... – CS76 Nov 21 '13 at 19:24

1 Answers1

0

Keep in mind, that sqlite is a very small database implementation, optimized for size and single user/single thread usage. You need to check in detail with profiling, but I expect the following behavior.

  1. Each thread reads a block of data, concurrently to other blocks. Most or even all data must be read from disk, because each thread reads another block of data. Caching in sqlite doens't help in this case, as none of the data is read twice. The threads already, effectically run serial at this point as they get serialized on accessing the disk.
  2. Each thread does some complex calculation. No matter how complex it is, it is done in memory while sqlite works (reads and writes) on disk which is by many factors (1000's) slower.
  3. The insert/update and commit at the end does the rest of serialization: The commit must write to disk and must wait till the write is done. After that step, the next thread can start to insert/update its result.

Even the deceasing speed with more threads can be explained: The more threads you use, the more overhead must be handled by sqlite - and it is not optimized for many users or threads.

Thats why some professional databases get so expensive. They handle 10000 of users and have extremely clever algorithms to have the next thing read already in memory (95% of the time).

But what can you do better now?

  • The most practical way would be to rewrite the code: Read all data from the database up front, then do the processing in threads and in the end have one thread doing all updates/inserts and commit only once at the end
  • You can change the database, which is a rather expensive solution. Well, even mySql is way better then sqlite for this kind of application. Some databases (Oracle, Teradata, ...) can run Java code directly in the database so that you would not need to transfer the data around, before and after processing (which is a common performance bottleneck, e.g. in SAS)
jboi
  • 11,324
  • 4
  • 36
  • 43