12

I am pretty new in WSO2 ESB and I have the implement a custom message processor with this specific behavior: perform an operation after that an element is retrieved from the message store and before the sequence related to this message processor is performed.

I try to explain it in details.

This is my ESB message processor definition:

<?xml version="1.0" encoding="UTF-8"?>
<!---<messageProcessor class="org.apache.synapse.message.processor.impl.sampler.SamplingProcessor" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">-->
<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="sequence">transferProcessorSequence</parameter>
    <parameter name="interval">1000</parameter>
    <parameter name="is.active">true</parameter>
    <parameter name="concurrency">1</parameter>
</messageProcessor>

It retrieve some elements (XML documents) form the transferFromMessageStore (a queue) and pass this object to the transferProcessorSequence.xml sequence that use it. As you can see at this time I have implemented a custom message processor SamplingProcessorHeaderRateLimit that simply extends the org.apache.synapse.message.processor.impl.sampler.SamplingProcessor WSO2 class. At this time it only show a log when the init() method is performed. I deployed it on my Carbon server and it works.

Here you can find the entire project code.

Ok but from what I have understood to obtain the desired behavior I have not to simply extend the SamplingProcessor class because in order to do custom implementation between every message consumption and dispatch to the sequence, need to extend the class SamplingService class, this one.

I think that I need to override execute() or fetch(MessageConsumer msgConsumer).

At this time should be ok also insert a log, something that write into the log file each time that an element is retrieved from the message store and before that the sequence related to the message processor is performed.

Is it possible?

So my main main doubs are:

1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).

2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).

Community
  • 1
  • 1
AndreaNobili
  • 40,955
  • 107
  • 324
  • 596
  • 2 question why don't you override getTask method (it has protected modifer) in com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation and return you own class extending SamplingService ? – simar Aug 04 '17 at 07:04
  • @simar What do you exactly mean? At this time I have override the getTask() method simply putting into it some log\println message (only to test puropose). The problem is that I never obtain this output into my log files (at the contrary I obtain the init() method log). So it seems to me that this getTask() method is never performed for some reason. do you have some idea about it? Maybe is the protected modifier? – AndreaNobili Aug 04 '17 at 07:23
  • Call log.info() is visible only in you logging properly configured to be visible in file. Call System.out.println is visible only in console. Did you try to run in console mode and look for println message in getTask method ? Have you tryiend to run wsoesb in debuging mode and try debug to see what is going wrong – simar Aug 04 '17 at 07:26
  • @simar mmm but the log\println is correctly printed inside my log file for the init() method. The problem is only related the getTask() method – AndreaNobili Aug 04 '17 at 07:28

3 Answers3

1
1) Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).

Your custom SamplingProcessorHeaderRateLimitation will only consume messages coming in to transferFromMessageStore and will inject messages it consumed and processed only to sequence transferProcessorSequence. All other paths will not get processed by this message processor.

2) Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).

If you look at the Source Code you implemented SamplingProcessorHeaderRateLimitation.getTask() You have tied your custom SamplingService2 with your custom SamplingProcessorHeaderRateLimitation

@Override
protected Task getTask() {
    logger.info("getTask() START");
    System.out.println("getTask() START");
    logger.info("getTask() END");
    System.out.println("getTask() END");
    return (Task) new SamplingService2(this, synapseEnvironment, CONCURRENCY, SEQUENCE, isProcessorStartAsDeactivated());
}
shazin
  • 21,379
  • 3
  • 54
  • 71
  • My doubt is: as you can see I put some logger\println when the getTask() method starts and befor it returns but these log doesn't appear into the log file (differently from the log put into the init() method that are present into my log file). So it seems to me that for some reason this getTask() method is never performed (it is only an hypotesis) and it is still using the standard SamplingService implementation. Do you have some further idea about it? Thanks – AndreaNobili Jul 28 '17 at 11:43
  • Something wrong in here (last checkout source from github). SamplingService2 is not impelementing Task interface so it can't be cast to Task. It suppose to throw ClassCastException – simar Oct 18 '17 at 09:59
0

Did you configure Task in XML ?

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask"> 
  <trigger interval="5000"/>
</task>

or

<task name="CheckPrice" class="org.wso2.esb.tutorial.tasks.PlaceStockOrderTask">
  <trigger interval="5000" count="10"/>
</task>
vaquar khan
  • 10,864
  • 5
  • 72
  • 96
0

Q1:

Have I to create a class extending the SamplingService class into the same project in which I am implementing my custom message processor (this behavior have to be used only for this specific message processor in my WSO2 ESB project, all the other message processor used in this project have to use the standard SamplingService implementation).

<messageProcessor class="com.mycompany.toolkit.messageprocessor.SamplingProcessorHeaderRateLimitation" messageStore="transferFromMessageStore" name="transferFromMessageProcessor" xmlns="http://ws.apache.org/ns/synapse">
<parameter name="sequence">transferProcessorSequence</parameter>

Custom processor and service will be only used when it is specified as the processor class, as above in your example.

Q2:

Another doubt is related about how this custom SamplingService implementation is passed to my custom message processor. Into the the SamplingProcessor WSO2 class (how to associate a specific custom message processor implementation with a custom SamplingService implementation handling its lifecycle).

You can directly extend / implement ScheduledMessageProcessor rather than SamplingProcessor, since it only implements getTask() and initialize view object in init() method. Also your SamplingService2 class should extend the Task, ManagedLifecycle interfaces.

Muralidharan.rade
  • 2,226
  • 1
  • 24
  • 34