I have a requirement to import a large amount of data from a mysql database and index documents (about 1000 documents). During indexing process I need to do a special processing of a field by sending a enhancement requests to an external Apache Stanbol server. I have configured my dataimport-handler in solrconfig.xml to use the StanbolContentProcessor in the update chain, as below;
<updateRequestProcessorChain name="stanbolInterceptor">
<processor class="com.solr.stanbol.processor.StanbolContentProcessorFactory"/>
<processor class="solr.RunUpdateProcessorFactory" />
</updateRequestProcessorChain>
<requestHandler name="/dataimport" class="solr.DataImportHandler">
<lst name="defaults">
<str name="config">data-config.xml</str>
<str name="update.chain">stanbolInterceptor</str>
</lst>
</requestHandler>
My sample data-config.xml is as below;
<dataConfig>
<dataSource type="JdbcDataSource" driver="com.mysql.jdbc.Driver"
url="jdbc:mysql://localhost:3306/solrTest"
user="test" password="test123" batchSize="1" />
<document name="stanboldata">
<entity name="stanbolrequest" query="SELECT * FROM documents">
<field column="id" name="id" />
<field column="content" name="content" />
<field column="title" name="title" />
</entity>
</document>
</dataConfig>
When running a large import with about 1000 documents, my stanbol server goes down, I suspect due to heavy load from the above Solr Stanbolnterceptor. I would like to throttle the dataimport in batches, so that Stanbol can process a manageable number of requests concurrently.
Is this achievable using batchSize parameter in dataSource element in the data-config?
Can someone please give some ideas to throttle the dataimport load in Solr?
This is my custom UpdateProcessor class handling Stanbol requests during /dataimport
public class StanbolContentProcessorFactory extends
UpdateRequestProcessorFactory {
public static final String NLP_ORGANIZATION = "nlp_organization";
public static final String NLP_PERSON = "nlp_person";
public static final String[] STANBOL_REQUEST_FIELDS = { "title", "content" };
public static final String STANBOL_ENDPOINT = "http://localhost:8080/enhancer";
@Override
public UpdateRequestProcessor getInstance(SolrQueryRequest req,
SolrQueryResponse res, UpdateRequestProcessor next) {
return new StanbolContentProcessor(next);
}
class StanbolContentProcessor extends UpdateRequestProcessor {
public StanbolContentProcessor(UpdateRequestProcessor next) {
super(next);
}
@Override
public void processAdd(AddUpdateCommand cmd) throws IOException {
SolrInputDocument doc = cmd.getSolrInputDocument();
String request = "";
for (String field : STANBOL_REQUEST_FIELDS) {
if (null != doc.getFieldValue(field)) {
request += (String) doc.getFieldValue(field) + ". ";
}
}
try {
EnhancementResult result = stanbolPost(request, getBaseURI());
Collection<TextAnnotation> textAnnotations = result
.getTextAnnotations();
// extracting text annotations
Set<String> personSet = new HashSet<String>();
Set<String> orgSet = new HashSet<String>();
for (TextAnnotation text : textAnnotations) {
String type = text.getType();
String selectedText = text.getSelectedText();
if (null != type && null != selectedText) {
if (type.equalsIgnoreCase(StanbolConstants.DBPEDIA_PERSON)
|| type.equalsIgnoreCase(StanbolConstants.FOAF_PERSON)) {
personSet.add(selectedText);
} else if (type
.equalsIgnoreCase(StanbolConstants.DBPEDIA_ORGANIZATION)
|| type.equalsIgnoreCase(StanbolConstants.FOAF_ORGANIZATION)) {
orgSet.add(selectedText);
}
}
}
for (String person : personSet) {
doc.addField(NLP_PERSON, person);
}
for (String org : orgSet) {
doc.addField(NLP_ORGANIZATION, org);
}
cmd.solrDoc = doc;
super.processAdd(cmd);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
private EnhancementResult stanbolPost(String request, URI uri) {
Client client = Client.create();
WebResource webResource = client.resource(uri);
ClientResponse response = webResource.type(MediaType.TEXT_PLAIN)
.accept(new MediaType("application", "rdf+xml"))
.entity(request, MediaType.TEXT_PLAIN)
.post(ClientResponse.class);
int status = response.getStatus();
if (status != 200 && status != 201 && status != 202) {
throw new RuntimeException("Failed : HTTP error code : "
+ response.getStatus());
}
String output = response.getEntity(String.class);
// Parse the RDF model
Model model = ModelFactory.createDefaultModel();
StringReader reader = new StringReader(output);
model.read(reader, null);
return new EnhancementResult(model);
}
private static URI getBaseURI() {
return UriBuilder.fromUri(STANBOL_ENDPOINT).build();
}
}