1

I have a requirement to import a large amount of data from a mysql database and index documents (about 1000 documents). During indexing process I need to do a special processing of a field by sending a enhancement requests to an external Apache Stanbol server. I have configured my dataimport-handler in solrconfig.xml to use the StanbolContentProcessor in the update chain, as below;

<updateRequestProcessorChain name="stanbolInterceptor">
    <processor class="com.solr.stanbol.processor.StanbolContentProcessorFactory"/>
    <processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>

<requestHandler name="/dataimport" class="solr.DataImportHandler">   
    <lst name="defaults">  
        <str name="config">data-config.xml</str>
        <str name="update.chain">stanbolInterceptor</str>
    </lst>  
</requestHandler>

My sample data-config.xml is as below;

<dataConfig>
    <dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver" 
                url="jdbc:mysql://localhost:3306/solrTest" 
                user="test" password="test123" batchSize="1" />
    <document name="stanboldata">
        <entity name="stanbolrequest" query="SELECT * FROM documents">
            <field column="id" name="id" />
            <field column="content" name="content" />
            <field column="title" name="title" />
        </entity>
    </document>
</dataConfig>

When running a large import with about 1000 documents, my stanbol server goes down, I suspect due to heavy load from the above Solr Stanbolnterceptor. I would like to throttle the dataimport in batches, so that Stanbol can process a manageable number of requests concurrently.

Is this achievable using batchSize parameter in dataSource element in the data-config?

Can someone please give some ideas to throttle the dataimport load in Solr?

This is my custom UpdateProcessor class handling Stanbol requests during /dataimport

public class StanbolContentProcessorFactory extends
        UpdateRequestProcessorFactory {

    public static final String NLP_ORGANIZATION = "nlp_organization";
    public static final String NLP_PERSON = "nlp_person";
    public static final String[] STANBOL_REQUEST_FIELDS = { "title", "content" };
    public static final String STANBOL_ENDPOINT = "http://localhost:8080/enhancer";

    @Override
    public UpdateRequestProcessor getInstance(SolrQueryRequest req,
            SolrQueryResponse res, UpdateRequestProcessor next) {

        return new StanbolContentProcessor(next);
    }

    class StanbolContentProcessor extends UpdateRequestProcessor {

        public StanbolContentProcessor(UpdateRequestProcessor next) {
            super(next);
        }

        @Override
        public void processAdd(AddUpdateCommand cmd) throws IOException {
            SolrInputDocument doc = cmd.getSolrInputDocument();
            String request = "";
            for (String field : STANBOL_REQUEST_FIELDS) {
                if (null != doc.getFieldValue(field)) {
                    request += (String) doc.getFieldValue(field) + ". ";
                }

            }
            try {
                EnhancementResult result = stanbolPost(request, getBaseURI());
                Collection<TextAnnotation> textAnnotations = result
                        .getTextAnnotations();
                // extracting text annotations
                Set<String> personSet = new HashSet<String>();
                Set<String> orgSet = new HashSet<String>();
                for (TextAnnotation text : textAnnotations) {
                    String type = text.getType();
                    String selectedText = text.getSelectedText();

                    if (null != type && null != selectedText) {
                        if (type.equalsIgnoreCase(StanbolConstants.DBPEDIA_PERSON)
                                || type.equalsIgnoreCase(StanbolConstants.FOAF_PERSON)) {
                            personSet.add(selectedText);

                        } else if (type
                                .equalsIgnoreCase(StanbolConstants.DBPEDIA_ORGANIZATION)
                                || type.equalsIgnoreCase(StanbolConstants.FOAF_ORGANIZATION)) {
                            orgSet.add(selectedText);

                        }
                    }
                }
                for (String person : personSet) {
                    doc.addField(NLP_PERSON, person);
                }
                for (String org : orgSet) {
                    doc.addField(NLP_ORGANIZATION, org);
                }
                cmd.solrDoc = doc;
                super.processAdd(cmd);
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }

    }

    private EnhancementResult stanbolPost(String request, URI uri) {
        Client client = Client.create();
        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
                .accept(new MediaType("application", "rdf+xml"))
                .entity(request, MediaType.TEXT_PLAIN)
                .post(ClientResponse.class);

        int status = response.getStatus();
        if (status != 200 && status != 201 && status != 202) {
            throw new RuntimeException("Failed : HTTP error code : "
                    + response.getStatus());
        }
        String output = response.getEntity(String.class);
        // Parse the RDF model

        Model model = ModelFactory.createDefaultModel();
        StringReader reader = new StringReader(output);
        model.read(reader, null);
        return new EnhancementResult(model);

    }


    private static URI getBaseURI() {
        return UriBuilder.fromUri(STANBOL_ENDPOINT).build();
    }

}
Dileepa Jayakody
  • 535
  • 1
  • 6
  • 19

2 Answers2

4

The batchSize option is used to retrieve the rows of a database table in batches in order to reduce memory usage (it is often used to prevent running out of memory when running the data import handler). While a lower batch size might be slower, the option does not intend to affect the speed of the import process.

My suggestion would be to limit the requests some other way, such as using a firewall rule. If you are using Linux and have access to Netfilter, you could run something like the following command:

iptables -A INPUT -p tcp --dport 12345 -m limit --limit 10/s -j ACCEPT

Where '12345' is the Stanbol port and '10/s' is the number of packets a second to accept.

Kevin
  • 4,070
  • 4
  • 45
  • 67
3

Mowgli is right, the batchsizewill not help you with this. Since most people got the problem the other way around (like My dataimport is too slow, please help) there is nothing like this in Solr. At least nothing I am aware of.


Personally I would not opt to configure your Linux system to handle the throttling for you. If you move from stage to stage or you migrate to a different server some-when you are required to remember for this. And if people change during the lifetime of your system, they will not know this.

So, I do not know the code of your StanbolContentProcessorFactory, but as it already got mentioned in your other question it appears to be custom code. As it is your custom code, you might add a throttle mechanism in there. To elaborate more on that, I would need some code to look at.


Update

Solr does have Google's guava, so I would use RateLimiter as proposed here. If you are building with Maven this would mean, you could use the scope provided. If you are not using Maven, there is no need to make a fatjar or place guava with Solr's lib folder.

import com.google.common.util.concurrent.RateLimiter;

public class StanbolContentProcessorFactory extends
    UpdateRequestProcessorFactory {

    // ...

    // add a rate limiter to throttle your requests
    // this setting would allow 10 requests per second
    private RateLimiter throttle = RateLimiter.create(0.1);

    // ...

    private EnhancementResult stanbolPost(String request, URI uri) {
        Client client = Client.create();

        // this will throttle your requests
        throttle.acquire();

        WebResource webResource = client.resource(uri);
        ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
            .accept(new MediaType("application", "rdf+xml"))
            .entity(request, MediaType.TEXT_PLAIN)
            .post(ClientResponse.class);

        int status = response.getStatus();
        if (status != 200 && status != 201 && status != 202) {
            throw new RuntimeException("Failed : HTTP error code : "
                + response.getStatus());
        }
        String output = response.getEntity(String.class);
        // Parse the RDF model
        Model model = ModelFactory.createDefaultModel();
        StringReader reader = new StringReader(output);
        model.read(reader, null);
        return new EnhancementResult(model);
}
Community
  • 1
  • 1
cheffe
  • 9,345
  • 2
  • 46
  • 57
  • I have added the Class here for your reference..stanbolPost() method is responsible for connecting to Stanbol server and return the enhancement results, while the document is updated with new fields(nlp_person, nlp_organization) using the enhancementResult in processAdd() method. – Dileepa Jayakody Nov 28 '13 at 09:12