6

I want to make a flow where:

  1. waiting for two files: file_name.xdf and file_name.dff : if both files (I want to process two files in the same time (waiting for both), the file name of those files shoud be the same)
  2. process those files convert to byte array
  3. run groovy script

How can I make the first point?

David Dossot
  • 33,403
  • 4
  • 38
  • 72
lukisp
  • 1,031
  • 3
  • 14
  • 27

3 Answers3

2

You can aggregate the files based on their "base name" (i.e. take the file name without the extension), and then process each file in the aggregated set.

Aggregation could be performed using a Collection-Aggregator or a Custom-Aggregator, here an example for each one:

Using a Collection-Aggregator:

<flow name="two-files-per-process-with-collection-aggregator">
    <file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files" >
        <file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/>
    </file:inbound-endpoint>
    <set-property propertyName="MULE_CORRELATION_ID" value="#[message.inboundProperties.originalFilename.substring(0,message.inboundProperties.originalFilename.lastIndexOf('.'))]" doc:name="Set Correlation-Id"/>
    <set-property propertyName="MULE_CORRELATION_GROUP_SIZE" value="2" doc:name="Set Correlation-Group-Size"/>
    <collection-aggregator failOnTimeout="true" doc:name="Collection Aggregator"/>
    <foreach doc:name="For Each">
        <logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
    </foreach>
</flow>


Using a Custom-Aggregator (you will need a custom java class):

<flow name="two-files-per-process-with-custom-aggregator">
    <file:inbound-endpoint path="/file-in" moveToDirectory="/file-in-process" responseTimeout="10000" doc:name="Read files">
        <file:filename-regex-filter pattern=".*\.aaa|.*\.bbb" caseSensitive="true"/>
    </file:inbound-endpoint>
    <custom-aggregator failOnTimeout="true" class="org.mnc.MatchFileNames" doc:name="Custom Aggregator"/>
    <foreach doc:name="For Each">
        <logger message="#[message.inboundProperties.originalFilename]" level="INFO" doc:name="Some process"/>
    </foreach>
</flow>

And this is a possible implementation for the custom aggregator (it could be more elegant:

package org.mnc;

import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.routing.RoutingException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.EventCorrelatorCallback;

public class MatchFileNames extends AbstractAggregator {

    @Override
    protected EventCorrelatorCallback getCorrelatorCallback(final MuleContext muleContext) {
        return new EventCorrelatorCallback() {

            @Override
            public boolean shouldAggregateEvents(EventGroup events) {
                return events.size()==2;
            }

            @Override
            public EventGroup createEventGroup(MuleEvent event, Object id) {
                String filename = event.getMessage().getInboundProperty("originalFilename");
                String commonFilename = filename.substring(0, filename.lastIndexOf('.'));
                System.out.println(filename + " -> " + commonFilename);
                return new EventGroup(commonFilename, muleContext, 2, true, storePrefix);
            }

            @Override
            public MuleEvent aggregateEvents(EventGroup events) throws RoutingException {
                return events.getMessageCollectionEvent();
            }
        };
    }
}
MarcosNC
  • 356
  • 1
  • 6
0

Using a quartz component - trigger your flow at a desired interval. Read more about that here: http://www.mulesoft.org/documentation/display/current/Quartz+Transport+Reference

When triggered - write Java code that compares the two directories and finds file pairs between the two.

Off the top of my head, I'm not sure if there is a way to set an inbound file filter dynamically. Otherwise - one could always handle the entire process in Java; read the files, convert to byte array, lastly propagate the message on to the groovy script.

seybold
  • 1
  • 1
0

You can have two file inbound endpoints, one for each of the files you wait for. When your flow read the files it copies the file to an other directory. If the other file was already processed and moved to the directory (you can track this using a variable in an Object Store) you save with name.ready and move the previously moved file to name2.ready.

You have a third flow with a file inbound endpoint that reads from that directory using *.ready wildcard pattern. And then use Requester Module to load the other file into a variable.

Ale Sequeira
  • 2,039
  • 1
  • 11
  • 19