0

...and access it in a .process(exchange -> ...) once the route is running.

.process(exchange -> exchange.setProperty(...)) doesn't help in this respect since it's only done when the route is running.

I could pack my data into the .routeId(...) (in fact, it is already there to make the route IDs unique) and extract it from there whenever I need it, but that's an ugly hack.

UPDATE

Going into detail:

I have tasks in my standalone application (I use Camel's Main class.). Each task can have one or more transfers that are identified by a transferNo. Each transfer comprises up to eight steps (=routes) in a synchronous sequence. Some of them are mandatory, some optional, depending on the first route's from and properties from a properties file: save, decrypt[Optional], decompress[O], adaptEOL[O], transfer, verify[O], removeTemp, purgeSaves.

I tried to implement this as follows:

int transferNo;
boolean[] isTaskCompleted = new boolean[transferCount]; 

main.addRouteBuilder(new RouteBuilder() {

  void configure() throws Exception {
    for (transferNo = 1; transferNo <= transferCount; transferNo++) {

       from(...)
         .routeId("first-" + transferNo)
         ...

       ...

       from(...)
         .routeId("last-" + transferNo)
         ...
         // ArrayIndexOutOfBoundsException
         .process(exchange -> isTaskCompleted[transferNo - 1] = true) 
    }
  }
}
main.start();

while (!BooleanUtils.and(isTaskCompleted))
  Thread.sleep(1000);

but this apparently doesn't work since when the for ends transferNo == transferCount + 1 which leads to the ArrayIndexOutOfBoundsException.

The idea I thought of is to store the transferNo together with the route(s) at configuration time to use it as proper index for isTaskCompleted at route(s) runtime.

Another one is to add transferNo as URI parameter of endpoints, but logically it doesn't belong there and the definition is multiplied by the number of endpoints (6 atm) then.

Gerold Broser
  • 14,080
  • 5
  • 48
  • 107
  • 1
    _`".process(exchange -> exchange.setProperty(...))` doesn't help in this respect since it's only done when the route is running."_ Why does it not help? I thought you need to retrieve it at a later time during `".process((exchange -> ...)"`, so what's the problem? Can you please describe your scenario in more detail? – Morfic Nov 12 '18 at 14:26
  • @Morfic See the header: I want to _set_ it at _configuration_ time, i.e. when `RouteBuilder.configure()` runs. – Gerold Broser Nov 12 '18 at 14:33
  • I saw it but for me it's not clear why you need it specifically there. What is the use case? So I'm asking for details to either help me understand and come with a suggestion (if I can think of one), or maybe you can figure out if perhaps your initial thought/problem is not actually an issue. And btw, you only have an exchange during route execution, as it "represents" the payload passed around your routes. – Morfic Nov 12 '18 at 14:37
  • @Morfic See the update to my question. – Gerold Broser Nov 12 '18 at 18:55
  • I may have an idea, but I need to clarify some more aspects to be able to provide a sample closer to what you have. 1) How do you "feed" your task/transfer data to camel? 2) Your app does not run continuously. It starts once in a while, counts the "transfers" it needs to handle, processes them, and then stops when they're all done, right? 3) Why do you create routes for each transfer instead of having the same routes processing all of your payloads? 4) How do you route your payloads _depending on the first route's `from`_? – Morfic Nov 13 '18 at 11:36
  • 1) Via file(s) from various sources: File system, (S)FTP, FTPS, (S/)MIME body/attachments, ... 2) Exactly. 3) Not clear what you're asking for here. Do you mean just _one_ route for all 8 steps? 4) Note: Not only depending on the first route's `from` but also on properties from a properties file. I do this with a method that calls itself recursively and returns an according `to()` endpoint (of type `direct:` or `controlbus:`) for a step/route (depending on a `switch (step)` and `case: if (property) ... else ...` inside). Why do you ask? That works perfectly. – Gerold Broser Nov 13 '18 at 12:20
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/183560/discussion-between-morfic-and-gerold-broser). – Morfic Nov 13 '18 at 12:24

3 Answers3

1

I still don't have all the details, but here's one possible solution using Camel 2.22.1:

  • define one set of routes as per your endpoints and processing requirements (I don't see why it would be necessary to define one set of routes for each transferNo)
  • add a final "done" route with an aggregator that has a completionSize configured and a "shutdown processor".
  • make all route-flows eventually end up in the "done" route

When all the messages are processed, the expected "completionSize" is reached so the aggregation is complete and the shutdown processor is called.

Model class:

// naive simulation of a model class
public class Transfer {
    // id of this transfer
    private final int id;

    // whether this transfer requires decryption (used for conditional routing)
    private final boolean encrypted;

    public Transfer(int id, boolean encrypted) {
        this.id = id;
        this.encrypted = encrypted;
    }

    public int getId() {
        return id;
    }

    public boolean isEncrypted() {
        return encrypted;
    }
}

Application launcher:

import org.apache.camel.CamelContext;
import org.apache.camel.main.Main;
import org.apache.camel.main.MainListener;
import org.apache.camel.main.MainSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Launcher {

    private static final Logger log = LoggerFactory.getLogger(Launcher.class);

    public static void main(String[] args) throws Exception {
        // define number of items to simulate
        int numberOfTransfersToProcess = 100;

        // setup camel
        Main main = new Main();
        main.addRouteBuilder(new TransferRouteBuilder(numberOfTransfersToProcess));

        // use this to simulate some input when the context is up
        main.addMainListener(new InputDataSimulator(numberOfTransfersToProcess));

        // run camel
        main.run();
    }

    private static class InputDataSimulator implements MainListener {
        private final int numberOfTransfersToProcess;

        public InputDataSimulator(int numberOfTransfersToProcess) {
            this.numberOfTransfersToProcess = numberOfTransfersToProcess;
        }

        @Override
        public void beforeStart(MainSupport main) {
        }

        @Override
        public void configure(CamelContext context) {
        }

        @Override
        public void afterStart(MainSupport mainSupport) {
            try {
                new TransferProducer(mainSupport.getCamelTemplate()).send(numberOfTransfersToProcess);
            } catch (Exception e) {
                log.error("Could not send simulated data", e);
            }
        }

        @Override
        public void beforeStop(MainSupport main) {
        }

        @Override
        public void afterStop(MainSupport main) {
        }
    }
}

Route setup:

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.ThreadPoolProfile;

public class TransferRouteBuilder extends RouteBuilder {

    private int numberOfTransfersToProcess;

    public TransferRouteBuilder(int numberOfTransfersToProcess) {
        this.numberOfTransfersToProcess = numberOfTransfersToProcess;
    }

    @Override
    public void configure() throws Exception {
        //some pooling for fun, don't take these values as references
        ThreadPoolProfile threadPoolProfile = new ThreadPoolProfile();
        threadPoolProfile.setId("meh");
        threadPoolProfile.setPoolSize(5);
        threadPoolProfile.setMaxPoolSize(10);
        getContext().getExecutorServiceManager().setDefaultThreadPoolProfile(threadPoolProfile);

        // handle failed element
        onException(Exception.class)
                .handled(true)
                .log("Handling: - ${exception.message}")
                /* go to final route */
                .to("direct:done")
                .end();

        // simulate one type of input
        from("seda:from-file")
                .routeId("from-file")
                .log("Processing file element ${in.body.id}")
                /* go directly to the final route */
                .to("direct:done")
                .end();

        // simulate another type of input
        from("seda:from-sftp")
                .routeId("from-sftp")
                .log("Processing sftp element ${in.body.id}")
                /* go to an intermediate route */
                .to("direct:decompress")
                .end();

        // simulate failing elements
        from("seda:throw-exception")
                .routeId("throw exception")
                .throwException(RuntimeException.class, "Element ${in.body.id} failed by design")
                .end();

        // some intermediate route
        from("direct:decompress")
                .routeId("decompress")
                .log("Decompressing element ${in.body.id}")
                .choice()
                /* go to an intermediate route based on some condition */
                .when(simple("${in.body.encrypted}"))
                .to("direct:decrypt")
                .otherwise()
                /* or directly to the final route */
                .to("direct:done")
                .endChoice()
                .end();

        // another intermediate route
        from("direct:decrypt")
                .routeId("decrypt")
                .log("Decrypting element ${in.body.id}")
                /* eventually go to the final route */
                .to("direct:done")
                .end();

        // final route "aggregating all elements" and shutting down afterwards
        from("direct:done")
                .routeId("done")
                .log("Element ${in.body.id} successfully processed")
                /* wait for everything to finish */
                .aggregate(simple("whatever"), (oldExchange, newExchange) -> newExchange)
                .completionSize(numberOfTransfersToProcess)
                .log("All processing complete")
                /* once all expected transfers are complete, stop the app */
                .process(new ShutdownCommand())
                .end();

    }

    // https://stackoverflow.com/a/39275258/474287
    private class ShutdownCommand implements Processor {
        @Override
        public void process(Exchange exchange) throws Exception {
            final CamelContext camelContext = exchange.getContext();

            Thread shutdownThread = new Thread(() -> {
                Thread.currentThread().setName("ShutdownThread");
                try {
                    camelContext.stop();
                } catch (Exception e) {
                    log.error("Error during shutdown", e);
                }
            });

            shutdownThread.start();
        }
    }
}

Producer to simulate input:

import org.apache.camel.ExchangePattern;
import org.apache.camel.ProducerTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Random;

public class TransferProducer {

    private static final Logger log = LoggerFactory.getLogger(TransferProducer.class);
    private final ProducerTemplate template;
    private final Random random = new Random();

    public TransferProducer(ProducerTemplate template) {
        this.template = template;
    }

    public void send(long numberOfTransfersToProcess) {
        log.info("Simulating " + numberOfTransfersToProcess + " transfers");
        // simulate data input to multiple endpoints
        String[] endpoints = {"seda:from-file", "seda:from-sftp", "seda:throw-exception"};
        for (int id = 0; id < numberOfTransfersToProcess; id++) {
            // get a random endpoint
            String nextEndpoint = endpoints[random.nextInt(3)];

            // send some data to process
            template.sendBody(nextEndpoint, ExchangePattern.InOnly, new Transfer(id, random.nextBoolean()));
        }
        log.info("Simulation of " + numberOfTransfersToProcess + " transfers complete");
    }
}

Sample output:

18:26:04.627 INFO  o.a.c.i.DefaultExecutorServiceManager - Using custom DefaultThreadPoolProfile: ThreadPoolProfile[meh (null) size:5-10, keepAlive: 60 SECONDS, maxQueue: 1000, allowCoreThreadTimeOut:false, rejectedPolicy:CallerRuns]
18:26:04.774 INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.22.1 (CamelContext: camel-1) is starting
18:26:04.777 INFO  o.a.c.m.ManagedManagementStrategy - JMX is enabled
18:26:05.661 INFO  o.a.c.i.c.DefaultTypeConverter - Type converters loaded (core: 195, classpath: 1)
18:26:06.306 INFO  o.a.camel.impl.DefaultCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
18:26:06.313 INFO  o.a.c.component.seda.SedaEndpoint - Endpoint seda://from-file is using shared queue: seda://from-file with size: 1000
18:26:06.523 INFO  o.a.c.component.seda.SedaEndpoint - Endpoint seda://from-sftp is using shared queue: seda://from-sftp with size: 1000
18:26:06.547 INFO  o.a.c.component.seda.SedaEndpoint - Endpoint seda://throw-exception is using shared queue: seda://throw-exception with size: 1000
18:26:06.713 INFO  o.a.c.p.aggregate.AggregateProcessor - Defaulting to MemoryAggregationRepository
18:26:06.990 INFO  o.a.camel.impl.DefaultCamelContext - Route: from-file started and consuming from: seda://from-file
18:26:07.001 INFO  o.a.camel.impl.DefaultCamelContext - Route: from-sftp started and consuming from: seda://from-sftp
18:26:07.023 INFO  o.a.camel.impl.DefaultCamelContext - Route: throw exception started and consuming from: seda://throw-exception
18:26:07.027 INFO  o.a.camel.impl.DefaultCamelContext - Route: decompress started and consuming from: direct://decompress
18:26:07.041 INFO  o.a.camel.impl.DefaultCamelContext - Route: decrypt started and consuming from: direct://decrypt
18:26:07.059 INFO  o.a.camel.impl.DefaultCamelContext - Route: done started and consuming from: direct://done
18:26:07.060 INFO  o.a.camel.impl.DefaultCamelContext - Total 6 routes, of which 6 are started
18:26:07.075 INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.22.1 (CamelContext: camel-1) started in 2.295 seconds
18:26:07.104 INFO  com.example.TransferProducer - Simulating 100 transfers
18:26:07.234 INFO  com.example.TransferProducer - Simulation of 100 transfers complete
18:26:08.068 INFO  from-file - Processing file element 1
18:26:08.068 INFO  from-sftp - Processing sftp element 0
18:26:08.071 INFO  done - Element 1 successfully processed
18:26:08.071 INFO  decompress - Decompressing element 0
18:26:08.075 INFO  from-file - Processing file element 9
18:26:08.075 INFO  throw exception - Handling: - Element 5 failed by design
18:26:08.076 INFO  done - Element 5 successfully processed
18:26:08.076 INFO  done - Element 9 successfully processed
18:26:08.079 INFO  from-file - Processing file element 10
18:26:08.079 INFO  done - Element 10 successfully processed
18:26:08.081 INFO  throw exception - Handling: - Element 6 failed by design
18:26:08.082 INFO  from-file - Processing file element 15
18:26:08.082 INFO  done - Element 6 successfully processed
18:26:08.082 INFO  done - Element 15 successfully processed
18:26:08.083 INFO  from-file - Processing file element 17
18:26:08.084 INFO  throw exception - Handling: - Element 7 failed by design
18:26:08.085 INFO  done - Element 7 successfully processed
18:26:08.085 INFO  decrypt - Decrypting element 0
18:26:08.085 INFO  done - Element 17 successfully processed
18:26:08.086 INFO  done - Element 0 successfully processed
18:26:08.086 INFO  from-file - Processing file element 18
18:26:08.086 INFO  throw exception - Handling: - Element 14 failed by design
18:26:08.087 INFO  done - Element 18 successfully processed
18:26:08.087 INFO  done - Element 14 successfully processed
18:26:08.087 INFO  from-sftp - Processing sftp element 2
18:26:08.088 INFO  from-file - Processing file element 20
18:26:08.088 INFO  decompress - Decompressing element 2
18:26:08.088 INFO  throw exception - Handling: - Element 16 failed by design
18:26:08.089 INFO  done - Element 20 successfully processed
18:26:08.089 INFO  done - Element 16 successfully processed
18:26:08.089 INFO  done - Element 2 successfully processed
18:26:08.090 INFO  from-file - Processing file element 21
18:26:08.091 INFO  throw exception - Handling: - Element 22 failed by design
18:26:08.091 INFO  from-sftp - Processing sftp element 3
18:26:08.091 INFO  done - Element 21 successfully processed
18:26:08.092 INFO  done - Element 22 successfully processed
18:26:08.092 INFO  decompress - Decompressing element 3
18:26:08.092 INFO  from-file - Processing file element 23
18:26:08.093 INFO  done - Element 23 successfully processed
18:26:08.094 INFO  throw exception - Handling: - Element 25 failed by design
18:26:08.094 INFO  decrypt - Decrypting element 3
18:26:08.095 INFO  done - Element 3 successfully processed
18:26:08.095 INFO  done - Element 25 successfully processed
18:26:08.095 INFO  from-file - Processing file element 26
18:26:08.096 INFO  done - Element 26 successfully processed
18:26:08.096 INFO  from-sftp - Processing sftp element 4
18:26:08.096 INFO  throw exception - Handling: - Element 33 failed by design
18:26:08.096 INFO  decompress - Decompressing element 4
18:26:08.097 INFO  from-file - Processing file element 27
18:26:08.097 INFO  done - Element 33 successfully processed
18:26:08.097 INFO  done - Element 27 successfully processed
18:26:08.097 INFO  done - Element 4 successfully processed
18:26:08.098 INFO  from-file - Processing file element 28
18:26:08.099 INFO  throw exception - Handling: - Element 36 failed by design
18:26:08.099 INFO  from-sftp - Processing sftp element 8
18:26:08.100 INFO  done - Element 36 successfully processed
18:26:08.100 INFO  done - Element 28 successfully processed
18:26:08.100 INFO  decompress - Decompressing element 8
18:26:08.102 INFO  throw exception - Handling: - Element 38 failed by design
18:26:08.102 INFO  done - Element 8 successfully processed
18:26:08.104 INFO  done - Element 38 successfully processed
18:26:08.104 INFO  from-sftp - Processing sftp element 11
18:26:08.104 INFO  from-file - Processing file element 29
18:26:08.105 INFO  decompress - Decompressing element 11
18:26:08.105 INFO  done - Element 29 successfully processed
18:26:08.105 INFO  throw exception - Handling: - Element 39 failed by design
18:26:08.106 INFO  done - Element 39 successfully processed
18:26:08.106 INFO  decrypt - Decrypting element 11
18:26:08.106 INFO  from-file - Processing file element 30
18:26:08.107 INFO  done - Element 11 successfully processed
18:26:08.108 INFO  throw exception - Handling: - Element 43 failed by design
18:26:08.108 INFO  done - Element 30 successfully processed
18:26:08.109 INFO  done - Element 43 successfully processed
18:26:08.109 INFO  from-sftp - Processing sftp element 12
18:26:08.109 INFO  from-file - Processing file element 31
18:26:08.109 INFO  decompress - Decompressing element 12
18:26:08.110 INFO  done - Element 31 successfully processed
18:26:08.110 INFO  throw exception - Handling: - Element 44 failed by design
18:26:08.110 INFO  done - Element 44 successfully processed
18:26:08.110 INFO  from-file - Processing file element 34
18:26:08.111 INFO  decrypt - Decrypting element 12
18:26:08.111 INFO  done - Element 34 successfully processed
18:26:08.111 INFO  done - Element 12 successfully processed
18:26:08.111 INFO  throw exception - Handling: - Element 45 failed by design
18:26:08.112 INFO  from-file - Processing file element 35
18:26:08.112 INFO  done - Element 45 successfully processed
18:26:08.112 INFO  from-sftp - Processing sftp element 13
18:26:08.112 INFO  done - Element 35 successfully processed
18:26:08.113 INFO  decompress - Decompressing element 13
18:26:08.113 INFO  throw exception - Handling: - Element 47 failed by design
18:26:08.113 INFO  from-file - Processing file element 37
18:26:08.114 INFO  done - Element 47 successfully processed
18:26:08.114 INFO  done - Element 37 successfully processed
18:26:08.114 INFO  decrypt - Decrypting element 13
18:26:08.115 INFO  done - Element 13 successfully processed
18:26:08.115 INFO  from-file - Processing file element 41
18:26:08.115 INFO  throw exception - Handling: - Element 48 failed by design
18:26:08.116 INFO  done - Element 41 successfully processed
18:26:08.116 INFO  done - Element 48 successfully processed
18:26:08.116 INFO  from-sftp - Processing sftp element 19
18:26:08.117 INFO  decompress - Decompressing element 19
18:26:08.117 INFO  from-file - Processing file element 42
18:26:08.118 INFO  throw exception - Handling: - Element 53 failed by design
18:26:08.118 INFO  done - Element 42 successfully processed
18:26:08.118 INFO  done - Element 53 successfully processed
18:26:08.118 INFO  decrypt - Decrypting element 19
18:26:08.119 INFO  from-file - Processing file element 46
18:26:08.119 INFO  done - Element 19 successfully processed
18:26:08.119 INFO  throw exception - Handling: - Element 58 failed by design
18:26:08.120 INFO  done - Element 46 successfully processed
18:26:08.120 INFO  done - Element 58 successfully processed
18:26:08.120 INFO  from-sftp - Processing sftp element 24
18:26:08.121 INFO  from-file - Processing file element 50
18:26:08.121 INFO  decompress - Decompressing element 24
18:26:08.121 INFO  throw exception - Handling: - Element 67 failed by design
18:26:08.121 INFO  done - Element 50 successfully processed
18:26:08.122 INFO  done - Element 67 successfully processed
18:26:08.122 INFO  done - Element 24 successfully processed
18:26:08.122 INFO  from-file - Processing file element 51
18:26:08.123 INFO  done - Element 51 successfully processed
18:26:08.123 INFO  throw exception - Handling: - Element 68 failed by design
18:26:08.123 INFO  from-sftp - Processing sftp element 32
18:26:08.124 INFO  done - Element 68 successfully processed
18:26:08.124 INFO  from-file - Processing file element 55
18:26:08.124 INFO  decompress - Decompressing element 32
18:26:08.125 INFO  done - Element 55 successfully processed
18:26:08.125 INFO  throw exception - Handling: - Element 70 failed by design
18:26:08.125 INFO  decrypt - Decrypting element 32
18:26:08.126 INFO  from-file - Processing file element 57
18:26:08.126 INFO  done - Element 32 successfully processed
18:26:08.126 INFO  done - Element 70 successfully processed
18:26:08.127 INFO  done - Element 57 successfully processed
18:26:08.127 INFO  from-sftp - Processing sftp element 40
18:26:08.128 INFO  throw exception - Handling: - Element 71 failed by design
18:26:08.128 INFO  from-file - Processing file element 62
18:26:08.128 INFO  decompress - Decompressing element 40
18:26:08.129 INFO  done - Element 71 successfully processed
18:26:08.129 INFO  done - Element 62 successfully processed
18:26:08.130 INFO  decrypt - Decrypting element 40
18:26:08.130 INFO  from-file - Processing file element 65
18:26:08.130 INFO  throw exception - Handling: - Element 82 failed by design
18:26:08.131 INFO  done - Element 65 successfully processed
18:26:08.131 INFO  done - Element 82 successfully processed
18:26:08.133 INFO  from-file - Processing file element 66
18:26:08.133 INFO  throw exception - Handling: - Element 83 failed by design
18:26:08.131 INFO  done - Element 40 successfully processed
18:26:08.134 INFO  done - Element 83 successfully processed
18:26:08.134 INFO  done - Element 66 successfully processed
18:26:08.135 INFO  from-sftp - Processing sftp element 49
18:26:08.135 INFO  from-file - Processing file element 69
18:26:08.135 INFO  throw exception - Handling: - Element 84 failed by design
18:26:08.136 INFO  done - Element 69 successfully processed
18:26:08.136 INFO  decompress - Decompressing element 49
18:26:08.136 INFO  done - Element 84 successfully processed
18:26:08.137 INFO  from-file - Processing file element 73
18:26:08.137 INFO  done - Element 49 successfully processed
18:26:08.137 INFO  throw exception - Handling: - Element 85 failed by design
18:26:08.137 INFO  done - Element 73 successfully processed
18:26:08.137 INFO  done - Element 85 successfully processed
18:26:08.138 INFO  from-file - Processing file element 74
18:26:08.138 INFO  from-sftp - Processing sftp element 52
18:26:08.139 INFO  done - Element 74 successfully processed
18:26:08.139 INFO  throw exception - Handling: - Element 86 failed by design
18:26:08.139 INFO  decompress - Decompressing element 52
18:26:08.139 INFO  done - Element 86 successfully processed
18:26:08.140 INFO  from-file - Processing file element 80
18:26:08.140 INFO  decrypt - Decrypting element 52
18:26:08.140 INFO  done - Element 80 successfully processed
18:26:08.141 INFO  throw exception - Handling: - Element 87 failed by design
18:26:08.141 INFO  done - Element 52 successfully processed
18:26:08.141 INFO  from-file - Processing file element 88
18:26:08.142 INFO  done - Element 87 successfully processed
18:26:08.142 INFO  done - Element 88 successfully processed
18:26:08.142 INFO  from-sftp - Processing sftp element 54
18:26:08.143 INFO  from-file - Processing file element 89
18:26:08.143 INFO  done - Element 89 successfully processed
18:26:08.144 INFO  decompress - Decompressing element 54
18:26:08.144 INFO  from-file - Processing file element 93
18:26:08.144 INFO  throw exception - Handling: - Element 90 failed by design
18:26:08.145 INFO  done - Element 93 successfully processed
18:26:08.145 INFO  done - Element 90 successfully processed
18:26:08.145 INFO  done - Element 54 successfully processed
18:26:08.146 INFO  from-file - Processing file element 94
18:26:08.146 INFO  throw exception - Handling: - Element 97 failed by design
18:26:08.148 INFO  done - Element 97 successfully processed
18:26:08.149 INFO  from-sftp - Processing sftp element 56
18:26:08.150 INFO  decompress - Decompressing element 56
18:26:08.148 INFO  done - Element 94 successfully processed
18:26:08.152 INFO  from-file - Processing file element 96
18:26:08.153 INFO  done - Element 96 successfully processed
18:26:08.153 INFO  done - Element 56 successfully processed
18:26:08.154 INFO  from-file - Processing file element 98
18:26:08.161 INFO  done - Element 98 successfully processed
18:26:08.161 INFO  from-sftp - Processing sftp element 59
18:26:08.162 INFO  decompress - Decompressing element 59
18:26:08.164 INFO  decrypt - Decrypting element 59
18:26:08.165 INFO  done - Element 59 successfully processed
18:26:08.166 INFO  from-sftp - Processing sftp element 60
18:26:08.167 INFO  decompress - Decompressing element 60
18:26:08.168 INFO  decrypt - Decrypting element 60
18:26:08.169 INFO  done - Element 60 successfully processed
18:26:08.170 INFO  from-sftp - Processing sftp element 61
18:26:08.170 INFO  decompress - Decompressing element 61
18:26:08.171 INFO  done - Element 61 successfully processed
18:26:08.172 INFO  from-sftp - Processing sftp element 63
18:26:08.173 INFO  decompress - Decompressing element 63
18:26:08.174 INFO  decrypt - Decrypting element 63
18:26:08.175 INFO  done - Element 63 successfully processed
18:26:08.177 INFO  from-sftp - Processing sftp element 64
18:26:08.178 INFO  decompress - Decompressing element 64
18:26:08.179 INFO  done - Element 64 successfully processed
18:26:08.181 INFO  from-sftp - Processing sftp element 72
18:26:08.181 INFO  decompress - Decompressing element 72
18:26:08.183 INFO  done - Element 72 successfully processed
18:26:08.184 INFO  from-sftp - Processing sftp element 75
18:26:08.184 INFO  decompress - Decompressing element 75
18:26:08.185 INFO  decrypt - Decrypting element 75
18:26:08.186 INFO  done - Element 75 successfully processed
18:26:08.187 INFO  from-sftp - Processing sftp element 76
18:26:08.187 INFO  decompress - Decompressing element 76
18:26:08.188 INFO  decrypt - Decrypting element 76
18:26:08.189 INFO  done - Element 76 successfully processed
18:26:08.190 INFO  from-sftp - Processing sftp element 77
18:26:08.191 INFO  decompress - Decompressing element 77
18:26:08.193 INFO  decrypt - Decrypting element 77
18:26:08.193 INFO  done - Element 77 successfully processed
18:26:08.194 INFO  from-sftp - Processing sftp element 78
18:26:08.195 INFO  decompress - Decompressing element 78
18:26:08.196 INFO  done - Element 78 successfully processed
18:26:08.201 INFO  from-sftp - Processing sftp element 79
18:26:08.202 INFO  decompress - Decompressing element 79
18:26:08.204 INFO  decrypt - Decrypting element 79
18:26:08.205 INFO  done - Element 79 successfully processed
18:26:08.207 INFO  from-sftp - Processing sftp element 81
18:26:08.209 INFO  decompress - Decompressing element 81
18:26:08.212 INFO  decrypt - Decrypting element 81
18:26:08.214 INFO  done - Element 81 successfully processed
18:26:08.216 INFO  from-sftp - Processing sftp element 91
18:26:08.217 INFO  decompress - Decompressing element 91
18:26:08.218 INFO  decrypt - Decrypting element 91
18:26:08.219 INFO  done - Element 91 successfully processed
18:26:08.220 INFO  from-sftp - Processing sftp element 92
18:26:08.220 INFO  decompress - Decompressing element 92
18:26:08.221 INFO  done - Element 92 successfully processed
18:26:08.222 INFO  from-sftp - Processing sftp element 95
18:26:08.222 INFO  decompress - Decompressing element 95
18:26:08.223 INFO  decrypt - Decrypting element 95
18:26:08.224 INFO  done - Element 95 successfully processed
18:26:08.224 INFO  from-sftp - Processing sftp element 99
18:26:08.224 INFO  decompress - Decompressing element 99
18:26:08.225 INFO  decrypt - Decrypting element 99
18:26:08.226 INFO  done - Element 99 successfully processed
18:26:08.232 INFO  done - All processing complete
18:26:08.235 INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.22.1 (CamelContext: camel-1) is shutting down
18:26:08.236 INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 6 routes (timeout 300 seconds)
18:26:10.165 INFO  o.a.c.impl.DefaultShutdownStrategy - Route: done shutdown complete, was consuming from: direct://done
18:26:10.166 INFO  o.a.c.impl.DefaultShutdownStrategy - Route: decrypt shutdown complete, was consuming from: direct://decrypt
18:26:10.166 INFO  o.a.c.impl.DefaultShutdownStrategy - Route: decompress shutdown complete, was consuming from: direct://decompress
18:26:10.166 INFO  o.a.c.impl.DefaultShutdownStrategy - Route: throw exception shutdown complete, was consuming from: seda://throw-exception
18:26:10.167 INFO  o.a.c.impl.DefaultShutdownStrategy - Route: from-sftp shutdown complete, was consuming from: seda://from-sftp
18:26:10.167 INFO  o.a.c.impl.DefaultShutdownStrategy - Route: from-file shutdown complete, was consuming from: seda://from-file
18:26:10.167 INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 6 routes completed in 1 seconds
18:26:10.210 INFO  o.a.camel.main.MainLifecycleStrategy - CamelContext: camel-1 has been shutdown, triggering shutdown of the JVM.
18:26:10.237 INFO  o.a.c.m.MainSupport$HangupInterceptor - Received hang up - stopping the main instance.

Process finished with exit code 0
Morfic
  • 15,178
  • 3
  • 51
  • 61
  • Why an Aggregator? I don't want aggregate data along my routes. It's that the output of one route is the input for the next. Or is it just to use `completionSize`? – Gerold Broser Nov 14 '18 at 08:49
  • 1
    @GeroldBroser we're not actually doing an aggregation per se, it's just to count your elements and move on once they're all done. If you look at the implementation you'll see that the aggregation key is just `simple("whatever")` to include any element and the aggregator simply returns the newest exchange without any other processing logic `(oldExchange, newExchange) -> newExchange`. A different, but somewhat similar solution, could be using a [splitter](http://camel.apache.org/splitter.html) to divide a list of elements into smaller parts, wait for them all to be processed and then continue. – Morfic Nov 14 '18 at 10:47
  • I got it. Thank you for your answer including the MCVE. I will, however, stick to my solution for the moment, since it works too and implementing yours needs quite a rewrite here. – Gerold Broser Nov 14 '18 at 11:45
  • 1
    @GeroldBroser sure, whatever floats your boat. However I would at least rewrite those routes being setup in the for loop. Like I said, the routes are reusable for any similar payload, so I don't see why you need to create them for each item you have to process. Maybe you have a special situation, I don't know, but give it a thought. Good luck! – Morfic Nov 14 '18 at 11:49
0

You can inject data via the constructor to the route. The meta data gets injected when instantiating the route, e.g. via a JSONObject.

This could look like this:

class MyRouteBuilder extends RouteBuilder {
  def timezone = 'UTC'

  MyRouteBuilder() {}
  MyRouteBuilder(JSONObject params) {
    if(params) {
      if(params.has('timezone')) timezone = params.getString('timezone')
    }
  }

  void configure() throws Exception {
   //...
  }
}
metters
  • 533
  • 1
  • 8
  • 17
  • 1
    I think this is not going to work since the crucial part happens inside `RouteBuilder.configure()`. See the update to my question. – Gerold Broser Nov 12 '18 at 18:54
0

I'm extracting transferNo from the route IDs now unless no answer with an other solution is given here.

My route IDs look like: <task ID>-<transfer no.>-<step>

I use the following at the beginning of every .process(...) in which I need transferNo:

final int transferNo_ = Integer.parseInt(exchange.getFromRouteId().split("-")[1]);

Works like a charm!

Gerold Broser
  • 14,080
  • 5
  • 48
  • 107