this is a continuation of Stateful Rsocket Application thread. Here is my example.
We currently have three CORBA objects that are used as demonstrated in the diagram:
- LoginObject (to which a reference is retrieved via NamingService). Clients can call a login() method to obtain a session
- The Session object has various methods for query details about the current serivce context and most importatly to obtain a Transaction object
- The Transaction object can be used to execute various commands via a generic method that take a commandName and a list of key-value pairs as parameters. After the client executed n commands he can commit or rollback the transaction (also via methods on the Transaction object).
so here we use the session object to execute transactions on our service. In order to replace this with rsocket, we wrote a simple protobuf:
message LoginRequest {
string username = 1;
string password = 2;
}
message LoginResponse {
bool success = 1;
}
message Command {
string command = 1;
}
message TransactionResult {
string result = 1;
}
service SimpleService {
rpc login (LoginRequest) returns (LoginResponse) {}
rpc transaction (stream Command) returns (TransactionResult) {}
}
The idea is that once the user logs in, he will start streaming the commands to the servers. In the end, the client will either commit or drop the changes.
So here is the client code to start emiting values to the server:
public class SimpleClientWrapper {
SimpleServiceClient client;
...
public void runClient() {
SimpleServiceProto.LoginRequest login_request= SimpleServiceProto.LoginRequest.newBuilder().setUsername(this.username).setPassword(this.pass).build();
System.out.println("Trying to log in...");
client.login(login_request).doOnNext(response->{
if(response.getSuccess()) {
runCommands();
}else {
disconnect();
}
}).block();
}
public void runCommands() {
System.out.println("Login successfull. About to run some commands.");
Flux<Command> requests =
Flux.range(1, 11)
.map(i -> "sending -> " + i)
.map(s -> Command.newBuilder().setCommand(s).build());
TransactionResult response = client.transaction(requests).block();
System.out.println("Result was: " + response.getResult());
}
...
}
As you might see, the client will login in the runClient method, and if the login is successfull, the client will execute runCommands method, which will just emit some values.
On my server, according to the protobuf, i have created the transaction method:
@Override
public Mono<TransactionResult> transaction(Publisher<Command> messages, ByteBuf metadata) {
return Flux.from(messages)
.windowTimeout(10, Duration.ofSeconds(500))
.take(1)
.flatMap(Function.identity())
.reduce(
new ConcurrentHashMap<Character, AtomicInteger>(),
(map, s) -> {
char[] chars = s.getCommand().toCharArray();
for (char c : chars) {
map.computeIfAbsent(c, _c -> new AtomicInteger()).incrementAndGet();
}
return map;
})
.map(
map -> {
StringBuilder builder = new StringBuilder();
map.forEach(
(character, atomicInteger) -> {
builder
.append("character -> ")
.append(character)
.append(", count -> ")
.append(atomicInteger.get())
.append("\n");
});
String s = builder.toString();
return TransactionResult.newBuilder().setResult(s).build();
});
}
Now at this point when i run it, it won't work because of the blocking errors on the client:
Exception in thread "main" java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-tcp-nio-4
However my problem is not that it doesnt work, instead, i want each client to have a session with the server, and for each of them, the server is supposed to hold the transaction state. Here is the git link for the full code: https://github.com/oe19fyfa/rsocket-clientstream-example
So after all this writing, my question is: what is a correct way of establishing the transaction session? I know my code is very amateur so I am open for any kinds of proposals?