The problem: SOAP messages can be intercepted, but at a very separate location to the original call
site. It's difficult to pass the SOAP message back to the original call site, particularly in
multithreaded or asynchronous environments.
The only solution I can see is to explicitly only have one JAX-WS Proxy, which has one Handler, per
request. Having only one Proxy for an application would be a bottleneck, so it requires using
multithreading tools to allow for parallel and async execution.
Here's my idea, in code. First I go through it step-by-step, and at the end there's a dump of all
code.
UPDATE: I've replaced the LinkedBlockingQueue
with a static ThreadLocal<SoapApiWrapper>
instance, and the executor with a newWorkStealingPool()
. See the edit history for the changes!
I've set it up to use http://www.dneonline.com/calculator.asmx. It compiles and runs, but I haven't
spent a lot of time ensuring it works correctly or is optimal. I'm sure there are issues (my CPU fan
is working hard, even though I'm not running code). Be warned!
(Does anyone know of a better public SOAP API that I can either run locally or flood with
requests?)
If you'd like to test, here are some public SOAP APIs:
https://documenter.getpostman.com/view/8854915/Szf26WHn
Step by step
Implement SOAPHandler
class that will capture messages, called SoapMessageHandler
.
public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
// capture messages in a list
private final List<SOAPMessageContext> messages = new ArrayList<>();
// get & clear messages
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages);
messages.clear();
return m;
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context); // collect message
return true;
}
@Override
public boolean handleFault(SOAPMessageContext context) {
messages.add(context); // collect error
return true;
}
}
Define a SoapApiWrapper
class that
- creates one
SoapMessageHandler
,
- creates one JAX-WS Proxy,
- and adds the Handler to the Proxy.
class SoapApiWrapper {
// 1. create a handler
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
private final CalculatorSoap connection;
public SoapApiWrapper() {
// 2. create one connection
var factoryBean = new JaxWsProxyFactoryBean();
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class);
// 3. add the Handler
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
connection = factoryBean.create(CalculatorSoap.class);
}
}
Define a SoapApiManager
that has
- an
ExecutorService
, which will manage the SOAP requests and responses
- a
ThreadLocal<SoapApiWrapper>
, so each thread has an JAX-WS Proxy (idea
from https://stackoverflow.com/a/16680215/4161471)
public class SoapApiManager {
// 1. request executor
private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
}
The SoapApiManager
has a method, submitRequest(...)
. It will return the SOAP API response **
and** the SOAP messages.
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
//...
}
The parameter is a SoapRequestRunner
, a lambda that accepts a JAX-WS Proxy and returns a SOAP
Response.
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap);
}
When invoked, submitRequest(...)
performs the following:
- Wrap the
SoapRequestRunner
with CompleteableFuture.supplyAsync(...)
, and using
our ExectutorService
- Fetches a
SoapApiWrapper
from the ThreadLocal
,
- calls the SOAP API (by applying the
SoapRequestRunner
to the SoapApiWrapper
's JAX-WS
Proxy)
- awaits the SOAP result,
- extracts the SOAP messages from the
SoapApiWrapper
's SOAPHandler
,
- and finally, bundles the SOAP result and SOAP messages in a DTO,
SoapResponseHolder
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) { // 1. use CompletableFuture & executorService
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
SoapRequestRunner<ResponseT> requestRunner
) {
return () -> {
SoapApiWrapper api = null;
try {
api = soapApiWrapperQueue.get(); // 2. fetch an API Wrapper
var response = requestRunner.sendRequest(api.connection); // 3&4. request & response
var messages = api.soapMessageHandler.collectMessages(); // 5. extract raw SOAP messages
return new SoapResponseHolder<>(response, messages); // 6. bundle into DTO
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
if (api != null) {
soapApiWrapperQueue.offer(api);
}
}
};
}
Example Usage
public class Main {
public static void main(String[] args) {
SoapApiManager apiManager = new SoapApiManager();
apiManager
.submitRequest((soapApi) -> soapApi.add(5, 4))
.thenAccept(response -> {
// we can get the SOAP API response
var sum = response.getResponse();
// and also the intercepted messages!
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml).collect(Collectors.joining("\n---\n"));
System.out.println("sum: " + sum + ",\n" + allXml);
});
}
public static String getRawXml(SOAPMessageContext context) {
try {
ByteArrayOutputStream byteOS = new ByteArrayOutputStream();
context.getMessage().writeTo(byteOS);
return byteOS.toString(StandardCharsets.UTF_8);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
}
output
sum: 105,
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
<soap:Body>
<Add xmlns="http://tempuri.org/">
<intA>73</intA>
<intB>32</intB>
</Add>
</soap:Body>
</soap:Envelope>
---
<soap:Envelope xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<soap:Header/>
<soap:Body>
<AddResponse xmlns="http://tempuri.org/">
<AddResult>105</AddResult>
</AddResponse>
</soap:Body>
</soap:Envelope>
All Code
Here's a working example, complete with validation of the responses.
It creates a lot (REQUESTS_COUNT
) of requests and submits them all to SoapApiManager
.
Each request prints out the thread's name, and the hashcode of the JAX-WS Proxy (I wanted to check
they were being reused), and the basic input/output (e.g. -9 - 99 = -108
).
There's validation to make sure each SoapResponseHolder
has the correct result and raw SOAP
messages, and that the correct number of requests were sent.
Main.java
import com.github.underscore.lodash.Xml;
import com.github.underscore.lodash.Xml.XmlStringBuilder.Step;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.xml.soap.SOAPException;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class Main implements AutoCloseable {
private final SoapApiManager apiManager = new SoapApiManager();
private static final int THREAD_COUNT = 4;
private static final int REQUESTS_COUNT = 500;
private final AtomicInteger i = new AtomicInteger();
public static void main(String[] args)
throws InterruptedException {
try (var m = new Main()) {
m.run();
}
}
private void run() throws InterruptedException {
var executor = Executors.newFixedThreadPool(THREAD_COUNT);
var tasks = Stream.generate(() -> Map.entry(randomInt(), randomInt()))
.limit(REQUESTS_COUNT)
.map(intA -> (Callable<Boolean>) () -> {
sendAndValidateRequest(intA.getKey(), intA.getValue());
i.incrementAndGet();
return true;
})
.collect(Collectors.toList());
executor.invokeAll(tasks);
var waiter = Executors.newSingleThreadScheduledExecutor();
waiter.scheduleWithFixedDelay(
() -> {
var size = i.get();
System.out.println(">waiting... (size " + size + ")");
if (size >= REQUESTS_COUNT) {
System.out.println(">finished waiting! " + size);
waiter.shutdownNow();
}
},
3, 3, TimeUnit.SECONDS
);
System.out.println("Finished sending tasks " + waiter.awaitTermination(10, TimeUnit.SECONDS));
waiter.shutdownNow();
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
executor.shutdown();
System.out.println(
"executor.awaitTermination " + executor.awaitTermination(10, TimeUnit.SECONDS));
if (!executor.isTerminated()) {
System.out.println("executor.shutdownNow " + executor.shutdownNow());
}
if (i.get() != REQUESTS_COUNT) {
throw new RuntimeException(
"Test did not execute " + REQUESTS_COUNT + " times, actual: " + i.get()
);
}
}
private int randomInt() {
return ThreadLocalRandom.current().nextInt(-100, 100);
}
private void sendAndValidateRequest(int a, int b) {
apiManager
.submitRequest((soapApi) -> {
var response = soapApi.add(a, b);
System.out.printf(
"[%-12s / %-18s] %4d %s %3d = %4d\n",
soapApi.hashCode(),
Thread.currentThread().getName(),
a,
(b >= 0 ? "+" : "-"),
Math.abs(b),
response
);
return response;
})
.thenAcceptAsync(response -> {
var sum = response.getResponse();
var messages = response.getMessages();
var allXml = messages.stream().map(Main::getRawXml)
.collect(Collectors.joining("\n---\n"));
if (sum != a + b) {
throw new RuntimeException(
"Bad sum, sent " + a + " + " + b + ", result: " + sum + ", xml: " + allXml
);
}
if (messages.size() != 2) {
throw new RuntimeException(
"Bad messages, expected 1 request and 1 response, but got " + messages.size()
+ ", xml: " + allXml
);
}
if (!allXml.contains("<AddResult>" + (a + b) + "</AddResult>")) {
throw new RuntimeException(
"Bad result, did not contain AddResult=" + (a + b) + ", actual: " + allXml
);
}
});
}
public static String getRawXml(SOAPMessageContext context) {
try (var byteOS = new ByteArrayOutputStream()) {
context.getMessage().writeTo(byteOS);
var rawSoap = byteOS.toString(StandardCharsets.UTF_8);
return Xml.formatXml(rawSoap, Step.TWO_SPACES);
} catch (SOAPException | IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() {
apiManager.close();
}
}
SoapApiManager.java
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.function.Supplier;
import javax.xml.ws.handler.soap.SOAPMessageContext;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.tempuri.CalculatorSoap;
public class SoapApiManager implements AutoCloseable {
private static final int THREAD_LIMIT = Math.min(Runtime.getRuntime().availableProcessors(), 5);
private static final ExecutorService executorService = Executors.newWorkStealingPool(THREAD_LIMIT);
private static final ThreadLocal<SoapApiWrapper> soapApiWrapper = ThreadLocal.withInitial(SoapApiWrapper::new);
@Override
public void close() {
executorService.shutdown();
}
private static class SoapApiWrapper {
private final CalculatorSoap connection;
private final SoapMessageHandler soapMessageHandler = new SoapMessageHandler();
public SoapApiWrapper() {
var factoryBean = new JaxWsProxyFactoryBean();
factoryBean.setAddress("http://www.dneonline.com/calculator.asmx");
factoryBean.setServiceClass(CalculatorSoap.class);
factoryBean.setHandlers(Collections.singletonList(soapMessageHandler));
connection = factoryBean.create(CalculatorSoap.class);
}
}
public <ResponseT> CompletableFuture<SoapResponseHolder<ResponseT>> submitRequest(
SoapRequestRunner<ResponseT> requestRunner
) {
return CompletableFuture.supplyAsync(createRequestCall(requestRunner), executorService);
}
private <ResponseT> Supplier<SoapResponseHolder<ResponseT>> createRequestCall(
SoapRequestRunner<ResponseT> requestRunner
) {
return () -> {
SoapApiWrapper api = soapApiWrapper.get();
var response = requestRunner.sendRequest(api.connection);
var messages = api.soapMessageHandler.collectMessages();
return new SoapResponseHolder<>(response, messages);
};
}
@FunctionalInterface
interface SoapRequestRunner<ResponseT> {
ResponseT sendRequest(CalculatorSoap calculatorSoap);
}
public static class SoapResponseHolder<ResponseT> {
private final List<SOAPMessageContext> messages;
private final ResponseT response;
SoapResponseHolder(
ResponseT response,
List<SOAPMessageContext> messages
) {
this.response = response;
this.messages = messages;
}
public ResponseT getResponse() {
return response;
}
public List<SOAPMessageContext> getMessages() {
return messages;
}
}
}
SoapMessageHandler.java
package org.example;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.xml.namespace.QName;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.handler.soap.SOAPHandler;
import javax.xml.ws.handler.soap.SOAPMessageContext;
public class SoapMessageHandler implements SOAPHandler<SOAPMessageContext> {
private final List<SOAPMessageContext> messages = new ArrayList<>();
public List<SOAPMessageContext> collectMessages() {
var m = new ArrayList<>(messages);
messages.clear();
return m;
}
@Override
public Set<QName> getHeaders() {
return Collections.emptySet();
}
@Override
public boolean handleMessage(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public boolean handleFault(SOAPMessageContext context) {
messages.add(context);
return true;
}
@Override
public void close(MessageContext context) {
}
}
build.gradle.kts
plugins {
java
id("com.github.bjornvester.wsdl2java") version "1.2"
}
group = "org.example"
version = "1.0-SNAPSHOT"
repositories {
mavenCentral()
}
dependencies {
implementation(enforcedPlatform("org.apache.cxf:cxf-bom:3.4.4"))
implementation("org.apache.cxf:cxf-core")
implementation("org.apache.cxf:cxf-rt-frontend-jaxws")
implementation("org.apache.cxf:cxf-rt-transports-http")
implementation("org.apache.cxf:cxf-rt-databinding-jaxb")
// implementation("org.apache.cxf:cxf-rt-transports-http-jetty")
implementation("org.apache.cxf:cxf-rt-transports-http-hc")
implementation("com.sun.activation:javax.activation:1.2.0")
implementation("javax.annotation:javax.annotation-api:1.3.2")
implementation("com.sun.xml.messaging.saaj:saaj-impl:1.5.1")
implementation("com.github.javadev:underscore:1.68")
//<editor-fold desc="JAXB">
implementation("org.jvnet.jaxb2_commons:jaxb2-basics-runtime:1.11.1")
xjcPlugins("org.jvnet.jaxb2_commons:jaxb2-basics:1.11.1")
//</editor-fold>
//<editor-fold desc="Test">
testImplementation(enforcedPlatform("org.junit:junit-bom:5.7.2")) // JUnit 5 BOM
testImplementation("org.junit.jupiter:junit-jupiter")
testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.0")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine")
//</editor-fold>
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(11))
}
}
wsdl2java {
cxfVersion.set("3.4.4")
options.addAll("-xjc-Xequals", "-xjc-XhashCode")
}
tasks.test {
useJUnitPlatform()
}