5

I have a camel application which receives a json array request from a jms queue upto size 13000,the structure of the json array request is as below. I would like to stream and split the json array with a group of 5. For example if I receive a json array of size 100 I would like to group as 5 and split it as 20 requests. Is there a inbuilt camel functionality to group and split json array or do I need to write a custom splitter?

I'm using camel 2.17 version.

Sample json array:

[{
    "name": "Ram",
    "email": "ram@gmail.com",
    "age": 23
 }, {
    "name": "Shyam",
    "email": "shyam23@gmail.com",
    "age": 28
 }, {
    "name": "John",
    "email": "john@gmail.com",
    "age": 33
 }, {
    "name": "Bob",
    "email": "bob32@gmail.com",
    "age": 41
 }, {
    "name": "test1",
    "email": "test1@gmail.com",
    "age": 41
 }, {
    "name": "test2",
    "email": "test2@gmail.com",
    "age": 41
 }, {
    "name": "test3",
    "email": "test3@gmail.com",
    "age": 41
 }, {
    "name": "test4",
    "email": "test4@gmail.com",
    "age": 41
}]
kometen
  • 6,536
  • 6
  • 41
  • 51
Selva Shanmugam
  • 53
  • 1
  • 2
  • 8

6 Answers6

10

You could try something like this:

@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
    return new RouteBuilder() {

        @Override
        public void configure() throws Exception {
            from("direct:start")
                .split().jsonpath("$")
                    .streaming()
                    .aggregate(AggregationStrategies.groupedExchange())
                    .constant("true")
                    .completionSize(5)
                    .completionTimeout(1000)
                    .log("${body}")
                .to("mock:result");
        }
    };
}

If the message doesn't have a size multiple of five, the route should wait 1 sec before aggregating and go ahead. Using your input, the result will be two messages with 5 and 3 items respectively:

INFO 5419 --- [           main] route1                                   : List<Exchange>(5 elements)
INFO 5419 --- [eTimeoutChecker] route1                                   : List<Exchange>(3 elements) 

A full sample could be viewed in here.

EDIT:

As requested, a Spring DSL example:

<camel:route>
    <camel:from uri="direct:start" />
    <camel:split streaming="true">
        <camel:jsonpath>$</camel:jsonpath>
        <camel:aggregate completionSize="5"
            completionTimeout="1000" groupExchanges="true">
            <camel:correlationExpression>
                <camel:constant>true</camel:constant>
            </camel:correlationExpression>
            <camel:log message="${body}"></camel:log>
            <camel:to uri="mock:result"></camel:to>
        </camel:aggregate>
    </camel:split>
</camel:route>
Ricardo Zanini
  • 1,041
  • 7
  • 12
  • Thanks a lot Ricardo i already tried the same,in our case we need to split a json array of 20000 to a completionsize of 20 does the completionTimeout 1000 ms holds good or do i need to revisit the value? – Ravi Jan 08 '18 at 12:12
  • I think you should run some load tests to assert this value. 1000 ms is a magic number, though. :) – Ricardo Zanini Jan 08 '18 at 12:22
  • Hi Ricardo,Iam able to split the json message to List(elements) how can i convert it in to json string in camel?i have done but it doesnt work – Ravi Jan 08 '18 at 15:12
  • You could implement your own aggregate strategy and concat each message into a string builder. – Ricardo Zanini Jan 08 '18 at 15:20
1

Try this

from("{{queue.endpoint}}")
.split().tokenize("},", 5)
.log("Incoming request : ${body} ")
;
Dharman
  • 30,962
  • 25
  • 85
  • 135
Rajkumar
  • 122
  • 7
1

I faced a similar problem not long ago. I had a JSON array containing over 2000 items and I needed to send it to a REST Service via PUT.

I'm using a later version of camel though, 3.17.0. This is what my pipeline looks like.

    from("direct-vm:someRoute")
            .process(new ListObjectCreator())
            .split(body())
            .aggregate(constant(true), new GroupedBodyAggregationStrategy())
            .completionSize(100)
            .completionTimeout(1000)
            .marshal().json(JsonLibrary.Jackson)
            .log("${body}")
            .to("direct:updateAPI")
    ;

What's going on here is that in my process the body is read from the current Exchange and adds it to POJOs in an ArrayList.

Then the body in the pipeline is split, which is splitted into single POJO objects.

The aggregate groups the splitted body into a list of objects.

With completionSize(100) the aggregation of each list should only consist of 100 objects.

With completionTimeout(1000) the aggregation to waits 1 second before each aggregation completes.

Marshal().json(JsonLibrary.Jackson) converts the list of objects to JSON. Since it's a list the JSON will be an array of 100 or less objects.

Log("${body}"), well, just logging the result which should be one log line per list/array.

Then the messages are routed to another pipeline that handles the headers and functions related to the REST call.

Hope this can be of some use.

0

This will work

@Autowired
@EndpointInject(uri = "direct://splitted-queue")
ProducerTemplate producerTemplate;

@Component
  class Router extends RouteBuilder {

    @Override
    public void configure() throws Exception {
      from("direct://direct-queue").split(ExpressionBuilder.languageExpression("jsonpath","$")).convertBodyTo(String.class).process(new Processor() {
        List<String> jsons = new ArrayList<>();

        @Override
        public void process(Exchange exchange) throws Exception {
          jsons.add(exchange.getIn().getBody().toString());
          if(jsons.size() == 5) {
            producerTemplate.sendBody(jsons);
            jsons.clear();
          }
        }
      });
    }  

You need camel-jsonpath dependency for this

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-jsonpath</artifactId>
        <version>2.19.0</version>
    </dependency>
pvpkiran
  • 25,582
  • 8
  • 87
  • 134
0
public class JSONArraySplitterBean {

 public static List<JSONObject> convertToListOfJsonObjects(String input) {
        JSONArray array = new JSONArray(input);
        return arrayToStream(array).parallel().map(JSONObject.class::cast).collect(Collectors.toList());
    }

}

Then in your route:

private JSONArraySplitterBean myFunkySplitterBean = new JSONArraySplitterBean();

    from("bla")
    .split().method(myFunkySplitterBean, "convertToListOfJsonObjects")
    .to("bla2");
Dharman
  • 30,962
  • 25
  • 85
  • 135
Johnny Alpha
  • 758
  • 1
  • 8
  • 35
0

This post is old: the question has been asked 5 years and 15 days ago, but as I don't like to let a question unanswered, I am trying to here.

Camel 2.17: The collate function iterates the message body and groups the data into sub lists of specified size. This can be used with the Splitter EIP to split a message body and group/batch the splitted sub message into a group of N sub lists. This method works similar to the collate method in Groovy.

How to use it:

<split ...>
    <!-- supposed an ArrayList of Json  -->        
    <simple>${collate(3)}</simple>
    
    <log message="Splited Body: ${body}" />
    <log message="Split dataSplitIndex: ${headers.dataSplitIndex}, isLast: ${exchangeProperty.CamelSplitComplete}" />
    
</split>
Meziane
  • 1,586
  • 1
  • 12
  • 22