I am using Apache HttpClient 4 to communicate with a REST API and most of the time I do lengthy PUT operations. Since these may happen over an unstable Internet connection I need to detect if the connection is interrupted and possibly need to retry (with a resume request).
To try my routines in the real world I started a PUT operation and then I flipped the Wi-Fi switch of my laptop, causing an immediate total interruption of any data flow. However it takes a looong time (maybe 5 minutes or so) until eventually a SocketException is thrown.
How can I speed up to process? I'd like to set a timeout of maybe something around 30 seconds.
Update:
To clarify, my request is a PUT operation. So for a very long time (possibly hours) the only operation is a write() operation and there are no read operations. There is a timeout setting for read() operations, but I could not find one for write operations.
I am using my own Entity implementation and thus I write directly to an OutputStream which will pretty much immediately block once the Internet connection is interrupted. If OutputStreams had a timeout parameter so I could write out.write(nextChunk, 30000);
I could detect such a problem myself. Actually I tried that:
public class TimeoutHttpEntity extends HttpEntityWrapper {
public TimeoutHttpEntity(HttpEntity wrappedEntity) {
super(wrappedEntity);
}
@Override
public void writeTo(OutputStream outstream) throws IOException {
try(TimeoutOutputStreamWrapper wrapper = new TimeoutOutputStreamWrapper(outstream, 30000)) {
super.writeTo(wrapper);
}
}
}
public class TimeoutOutputStreamWrapper extends OutputStream {
private final OutputStream delegate;
private final long timeout;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
public TimeoutOutputStreamWrapper(OutputStream delegate, long timeout) {
this.delegate = delegate;
this.timeout = timeout;
}
@Override
public void write(int b) throws IOException {
executeWithTimeout(() -> {
delegate.write(b);
return null;
});
}
@Override
public void write(byte[] b) throws IOException {
executeWithTimeout(() -> {
delegate.write(b);
return null;
});
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
executeWithTimeout(() -> {
delegate.write(b, off, len);
return null;
});
}
@Override
public void close() throws IOException {
try {
executeWithTimeout(() -> {
delegate.close();
return null;
});
} finally {
executorService.shutdown();
}
}
private void executeWithTimeout(final Callable<?> task) throws IOException {
try {
executorService.submit(task).get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new IOException(e);
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw new Error(cause);
} catch (InterruptedException e) {
throw new Error(e);
}
}
}
public class TimeoutOutputStreamWrapperTest {
private static final byte[] DEMO_ARRAY = new byte[]{1,2,3};
private TimeoutOutputStreamWrapper streamWrapper;
private OutputStream delegateOutput;
public void setUp(long timeout) {
delegateOutput = mock(OutputStream.class);
streamWrapper = new TimeoutOutputStreamWrapper(delegateOutput, timeout);
}
@AfterMethod
public void teardown() throws Exception {
streamWrapper.close();
}
@Test
public void write_writesByte() throws Exception {
// Setup
setUp(Long.MAX_VALUE);
// Execution
streamWrapper.write(DEMO_ARRAY);
// Evaluation
verify(delegateOutput).write(DEMO_ARRAY);
}
@Test(expectedExceptions = DemoIOException.class)
public void write_passesThruException() throws Exception {
// Setup
setUp(Long.MAX_VALUE);
doThrow(DemoIOException.class).when(delegateOutput).write(DEMO_ARRAY);
// Execution
streamWrapper.write(DEMO_ARRAY);
// Evaluation performed by expected exception
}
@Test(expectedExceptions = IOException.class)
public void write_throwsIOException_onTimeout() throws Exception {
// Setup
final CountDownLatch executionDone = new CountDownLatch(1);
setUp(100);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
executionDone.await();
return null;
}
}).when(delegateOutput).write(DEMO_ARRAY);
// Execution
try {
streamWrapper.write(DEMO_ARRAY);
} finally {
executionDone.countDown();
}
// Evaluation performed by expected exception
}
public static class DemoIOException extends IOException {
}
}
This is somewhat complicated, but it works quite well in my unit tests. And it works in real life as well, except that the HttpRequestExecutor
catches the exception in line 127 and tries to close the connection. However when trying to close the connection it first tries to flush the connection which again blocks.
I might be able to dig deeper in HttpClient and figure out how to prevent this flush operation, but it is already a not too pretty solution, and it is just about to get even worse.
UPDATE:
It looks like this can't be done on the Java level. Can I do it on another level? (I am using Linux).