We are experiencing a problem in production where consumers are having zero utilization and the queues keep growing and performance degrades.
Each of the consumers is a container which contains a single instance of a non thread-safe listener bean.
Each listener needs to write to its own set of files. In order to avoid thread contention I would like only one thread to write to its own set of files.
Each listener is only instantiated once by using @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
I'm using a configuration similar to the one in this question
Each container is also configured with a retry advice which has the following code:
public class RetryMessageAdvice extends StatelessRetryOperationsInterceptorFactoryBean {
private static final int DEFAULT_RETRY_COUNT = 5;
private static final int DEFAULT_BACKOFF_MS = 250;
private int retryCount;
private int backOffPeriodInMS;
public RetryMessageAdvice() {
this.retryCount = DEFAULT_RETRY_COUNT;
this.backOffPeriodInMS = DEFAULT_BACKOFF_MS;
initializeRetryPolicy();
}
public RetryMessageAdvice(int retryCount, int backoff) {
this.retryCount = retryCount;
this.backOffPeriodInMS = backoff;
initializeRetryPolicy();
}
public void initializeRetryPolicy() {
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(this.retryCount);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(backOffPeriodInMS);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
this.setRetryOperations(retryTemplate);
this.setMessageRecoverer(new RetryMessageRecoverer());
}
public int getRetryCount() {
return retryCount;
}
public void setRetryCount(int retryCount) {
this.retryCount = retryCount;
}
}
The consumer looks something like this:
@Component("exportListenerImpl")
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class ExportListenerImpl extends ExportListenerBase {
private static final Logger LOG = LoggerFactory.getLogger(ExportListenerImpl.class);
private final ExportMapper exportMapper;
private final ExportFormatter exportFormatter;
@Autowired
public ExportListenerImpl(@Qualifier("exportFormatter") ExportFormatter exportFormatter,
@Qualifier("exportMapper") ExportedMapper exportedMapper,
@Value("${export.root.dir}") String exportDirectory) {
super(exportDirectory);
this.exportedFormatter = exportFormatter;
this.exportedMapper = exportedMapper;
}
@Override
public void handle(AnalyticsEvent analyticsEvent) throws Exception {
ExportedEvent exportedEvent = exportMapper.mapPlace(analyticsEvent);
File csvFile = getCsvFile(exportedEvent);
String csvRow = exportFormatter.writeAsString(exportedEvent);
writeCsvRow(csvRow, csvFile);
}
}
Other things to note
- Export mapper and export formatter are thread-safe but not using @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
- The method writeCsvRow is synchronized.
- There is a high number of errors which cause the exportMapper to throw an exception and trigger the retry advice
- The incoming message rage is 120/s
- The ratio between the incoming and deliver rate is usually 5:1
My theories on what is wrong are
- The high number of errors is causing a large number of retries and degrading performance. I would be better off putting the bad message in an error queue.
- Somehow the synchronized method in writeCsvRow is causing problems with some higher level thread managed by spring-amqp.
My question is, which theory is right? Is the impact of the retry advice the problem?