I am new to DDS, and trying to write a simple Java program in Intellij-IDEA that consists of 3 parts:
- Client Simulator that sends data.
- My program simulator that receive data from the client, manipulate it and sends it back to the client.
- Client Simulator that reads the manipulated data.
All the data that I am trying to send in my example is a simple String.
I am using RTI Code Gen to auto-generate most of the code.
Without and unboundedSupport
flag (the string is limited to 255 characters) everything worked just fine. However, when applying the unboundedSupport
flag, I am getting the following out-of-memory error:
java.lang.OutOfMemoryError: Java heap space
at com.rti.dds.cdr.CdrBuffer.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.cdr.CdrOutputStream.<init>(Unknown Source)
at com.rti.dds.infrastructure.EntityImpl.DDS_Entity_enable(Native Method)
at com.rti.dds.infrastructure.EntityImpl.enable(Unknown Source)
at com.rti.dds.infrastructure.NativeFactoryMixin.create_entityI(Unknown Source)
at com.rti.dds.subscription.SubscriberImpl.create_datareader(Unknown Source)
at json_dds.JsonMessageSubscriber.<init>(JsonMessageSubscriber.java:71)
at results_consumers.ResultsConsumersMain.main(ResultsConsumersMain.java:10)
create_datareader error
I am activating the client simulator that reads data first.
This is my .idl file:
struct JsonMessage {
string msg;
};
This is my main program (line 10 is the initialization of subscriber1
):
public static void main(String... args) {
ClientResultsConsumer clientResultsConsumer = new ClientResultsConsumer();
JsonMessageSubscriber subscriber1 = new JsonMessageSubscriber(0, clientResultsConsumer,
Topics.CLIENT_TOPIC_OUTPUT_1);
subscriber1.consume();
ClientResultsConsumer2 clientResultsConsumer2 = new ClientResultsConsumer2();
JsonMessageSubscriber subscriber2 = new JsonMessageSubscriber(0, clientResultsConsumer2,
Topics.CLIENT_TOPIC_OUTPUT_1);
subscriber2.consume();
ClientResultsConsumer3 clientResultsConsumer3 = new ClientResultsConsumer3();
JsonMessageSubscriber subscriber3 =
new JsonMessageSubscriber(0, clientResultsConsumer3, Topics.CLIENT_TOPIC_OUTPUT_2);
subscriber3.consume();
}
This is my ClientResultsConsumer class:
public class ClientResultsConsumer implements Consumer {
@Override
public void consume(String msg) {
System.out.println("Client results consumer got " + msg);
}
}
This is my JsonMessageSubscriber class (line 71 is subscriber.create_datareader(
):
public class JsonMessageSubscriber implements DataConsumer {
ExecutorService executor = Executors.newSingleThreadExecutor();
public JsonMessageSubscriber(int domainId, Consumer consumer, String topicName) {
DomainParticipant participant = DomainParticipantFactory.TheParticipantFactory
.create_participant(domainId,
DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT,
null /* listener */,
StatusKind.STATUS_MASK_NONE);
if (participant == null) {
System.err.println("create_participant error\n");
System.exit(-1);
}
// --- Create subscriber --- //
/* To customize subscriber QoS, use
the configuration file USER_QOS_PROFILES.xml */
Subscriber subscriber = participant.create_subscriber(
DomainParticipant.SUBSCRIBER_QOS_DEFAULT, null /* listener */,
StatusKind.STATUS_MASK_NONE);
if (subscriber == null) {
System.err.println("create_subscriber error\n");
System.exit(-1);
}
// --- Create topic --- //
/* Register type before creating topic */
String typeName = JsonMessageTypeSupport.get_type_name();
JsonMessageTypeSupport.register_type(participant, typeName);
/* To customize topic QoS, use
the configuration file USER_QOS_PROFILES.xml */
Topic topic = participant.create_topic(
topicName,
typeName, DomainParticipant.TOPIC_QOS_DEFAULT,
null /* listener */, StatusKind.STATUS_MASK_NONE);
if (topic == null) {
System.err.println("create_topic error\n");
System.exit(-1);
}
// --- Create reader --- //
DataReaderListener listener = new JsonMessageListener(consumer);
/* To customize data reader QoS, use
the configuration file USER_QOS_PROFILES.xml */
JsonMessageDataReader reader = (JsonMessageDataReader)
subscriber.create_datareader(
topic, Subscriber.DATAREADER_QOS_DEFAULT, listener,
StatusKind.STATUS_MASK_ALL);
if (reader == null) {
System.err.println("create_datareader error\n");
System.exit(-1);
}
}
// -----------------------------------------------------------------------
@Override
public void consume() {
final long scanTimeMillis = 1000;
Runnable task = () -> {
while (true) {
try {
TimeUnit.MILLISECONDS.sleep(scanTimeMillis);
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
};
executor.submit(task);
}
}
Unfortunately, I didn't find a solution to that except limiting the sequence size, but I understood that limiting it to a large enough number will solve my problem, it will also require a lot of memory, and I would rather it not taking more than the minimum required for each message.
Any help will be appreciated, Thanks