3

Is there a way to cancel the copying process started by calling copyIn() method in a separate thread?

Say, I have a list of csv-files which I need to copy from, getting maximum of database server power. So I create n-threads-connections for n-files, but I cannot find a way to abort a single operation if, for example, a wrong file has been chosen.

Killing threads does not work - COPY just keeps running.

The FutureTask<> class is used to create the threads, so there is a list of them - one for each csv.

Calling task.cancel(true) makes nothing in terms of copying process on the server. Only System.exit() can kill it with fire.

Any ideas?

Some of my code:

Uploader.java implements Callable

public static long uploadFile(final File file, final String tableName) {

    long status = 0;
    try {
        CopyManager copyManager = 
           new CopyManager((BaseConnection) new DataSource().connect());
        FileReader reader = new FileReader(file);
        status = copyManager.copyIn(sql, reader);
    } catch (SQLException | IOException e) {
       ...
    }
    return status;
}

@Override
public Long call() throws Exception {
    return uploadFile(file, tableName);
}

Upload files method body

for (File file : files) {
        FutureTask<Long> ftask =
                new FutureTask<>(
                        new Uploader(defaultTableName, file)
                );
        tasks.add(ftask);
        execService.execute(ftask);
    }

SOLVED:

The solution was found, however it required some changes in code.

Upload files method body looks like this now

for (File file : files) {
    Uploader uploader = new Uploader(defaultTableName, file);
    uploaders.add(uploader);
    Future<Long> f = execService.submit(uploader);

    //save the Future to get the copy result when finished

}

Having this, we can easily call some Uploader's method where it is possible to just close the database connection and handle the exception properly. It will stop copying on the server.

I accept that the solution might not be the most elegant one, however it works, it works fast, and not much code is needed.

phil_g
  • 516
  • 6
  • 13

2 Answers2

2

PostgreSQL doesn't actually support in-band query cancels.

When you request a query cancel from the JDBC driver it makes a new connection to send the cancel message. (This means that if you're at max_connections a cancel will fail, which is kind of perverse).

The upshot of this is that you can do the same thing yourself:

  • Use pg_backend_pid() to get the process ID of the worker before starting the copy operation;

  • When you want to cancel a copy, open a new connection and issue pg_cancel_backend(?) with the pid recorded earlier. If it doesn't stop, you can wait a bit then do a pg_terminate_backend(?).

These are ordinary SQL-level functions.

The only real issue is that the cancel and terminate requests are session-level not statement level. So they can race with statement completion and the start of a new statement, eg:

  • client1: COPY starts
  • client2: connects to send cancel message
  • client1: copy finishes
  • client1: new separate copy starts
  • client2 sends pg_cancel_backend(...)

At this point, the second copy will be terminated, which may not be what you wanted. So you must make sure to use appropriate exclusion client-side to prevent this from happening, making sure any outstanding cancel requests are finished before starting a new statement.

IIRC the JDBC driver has the same issue internally anyway. It's one of the reasons the team really want a way to cancel a particular unique per-session statement sequence number, like a (currently non-existent) pg_cancel_backend(pid, statementnumber) that aborts with an error if the statement has already terminated, instead of sending the cancel anyway.

Craig Ringer
  • 307,061
  • 76
  • 688
  • 778
1

Disclaimer: I haven't tried this and I just got the idea by looking at the source code

There is a CopyManager.copyIn(String sql) method that returns an instance of the CopyIn interface which in turn is a descendant of CopyOperation. That interface has a cancelCopy() method.

See the JavaDocs here: http://jdbc.postgresql.org/documentation/publicapi/org/postgresql/copy/CopyOperation.html#cancelCopy%28%29

But the methods that take a stream to copy the data only return a long value, so no way of using the instance of the CopyOperation used there.

However, when looking at the source code of the copyIn() method this seems quite easy to do on your own.

So instead of calling copyIn(String, Reader) you basically use the code from that method in your code:

// your code 
CopyManager copyManager = 
       new CopyManager((BaseConnection) new DataSource().connect());
FileReader from = ...  // different name!
int bufferSize = 65536;

// here starts the copy of the driver's implementation of the copyIn() method.

char[] cbuf = new char[bufferSize];
int len;

// if you store the instance of the CopyIn interface in an instance variable you 
// should be able to call cancelCopy() on it
CopyIn cp = copyManager.copyIn(sql);  

try {
    while ( (len = from.read(cbuf)) > 0) {
        byte[] buf = encoding.encode(new String(cbuf, 0, len));
        cp.writeToCopy(buf, 0, buf.length);
    }
    return cp.endCopy();
} finally { // see to it that we do not leave the connection locked
    if(cp.isActive())
        cp.cancelCopy();
}
  • I've just tried that. Unfortunately, it does not work and here's why: I can return `CopyIn` instance only via the `FutureTask` `get()` method (remember multithreading), which returns its result only when the thread is done with the task. So, obviously, it will give me an instance of the `CopyIn` when the copy operation is finished. – phil_g Sep 04 '13 at 11:00