Using the boxfuse cloudwatchlogs-java-appender's appender as the starting point, I had created a generic version of the log4j2 appender for logging to AWS CloudWatch. However, I am facing issue with the log4j2 appender not shutting down at all.
Here is my appender plugin CloudwatchLogsLog4J2Appender.java -
package ...
imports ...
@Plugin(name = CloudwatchLogsLog4J2Appender.APPENDER_NAME, category = "Core", elementType = Appender.ELEMENT_TYPE, printObject = true)
public class CloudwatchLogsLog4J2Appender extends AbstractAppender {
static final String APPENDER_NAME = "CloudwatchLogs-Appender";
private final CloudwatchLogsConfig config = new CloudwatchLogsConfig();
private BlockingQueue<CloudwatchLogsLogEvent> eventQueue;
private CloudwatchLogsLogEventPutter putter;
private long discardedCount;
public CloudwatchLogsLog4J2Appender(String name, Filter filter, Layout<? extends Serializable> layout) {
super(name, filter, layout);
}
public CloudwatchLogsLog4J2Appender(String name, Filter filter, Layout<? extends Serializable> layout, boolean ignoreExceptions) {
super(name, filter, layout, ignoreExceptions);
}
// Your custom appender needs to declare a factory method
// annotated with `@PluginFactory`. Log4j will parse the configuration
// and call this factory method to construct an appender instance with
// the configured attributes.
@PluginFactory
public static CloudwatchLogsLog4J2Appender createAppender(
@PluginAttribute(value = "name", defaultString = APPENDER_NAME) String name,
@PluginElement("Filter") final Filter filter,
@PluginAttribute("debug") Boolean debug,
@PluginAttribute("stdoutFallback") Boolean stdoutFallback,
@PluginAttribute("endpoint") String endpoint,
@PluginAttribute("logGroupName") String logGroupName,
@PluginAttribute("module") String module,
@PluginAttribute(value = "maxEventQueueSize", defaultInt = CloudwatchLogsConfig.DEFAULT_MAX_EVENT_QUEUE_SIZE) Integer maxEventQueueSize,
@PluginAttribute("region") String region,
@PluginAttribute("flushDelayInMillis") int flushDelayInMillis) {
System.out.println("CloudwatchLogsLog4J2Appender:createAppender() called...");
CloudwatchLogsLog4J2Appender appender = new CloudwatchLogsLog4J2Appender(name, filter, null, true);
if (debug != null) {
appender.getConfig().setStdoutFallback(debug);
}
if (stdoutFallback != null) {
appender.getConfig().setStdoutFallback(stdoutFallback);
}
if (endpoint != null) {
appender.getConfig().setEndpoint(endpoint);
}
if (logGroupName != null) {
appender.getConfig().setLogGroupName(logGroupName);
}
if (module != null) {
appender.getConfig().setModule(module);
}
appender.getConfig().setMaxEventQueueSize(maxEventQueueSize);
if (region != null) {
appender.getConfig().setRegion(region);
}
if (flushDelayInMillis > 0) {
appender.getConfig().setFlushDelayInMills(flushDelayInMillis);
}
return appender;
}
/**
* @return The config of the appender. This instance can be modified to override defaults.
*/
public CloudwatchLogsConfig getConfig() {
return config;
}
@Override
public void start() {
System.out.println("CloudwatchLogsLog4J2Appender:start() called...");
super.start();
eventQueue = new LinkedBlockingQueue<>(config.getMaxEventQueueSize());
putter = CloudwatchLogsLogEventPutter.create(config, eventQueue);
new Thread(putter).start();
}
@Override
public void stop() {
System.out.println("CloudwatchLogsLog4J2Appender:stop() called...");
putter.terminate();
super.stop();
}
@Override
protected boolean stop(Future<?> future) {
System.out.println("CloudwatchLogsLog4J2Appender:stop(future) called...");
putter.terminate();
return super.stop(future);
}
@Override
public boolean stop(long timeout, TimeUnit timeUnit) {
System.out.println("CloudwatchLogsLog4J2Appender:stop(timeout, timeunit) called...");
putter.terminate();
System.out.println("CloudwatchLogsLog4J2Appender:stop(timeout, timeunit) Done calling terminate()... passing to super");
return super.stop(timeout, timeUnit);
}
/**
* @return The number of log events that had to be discarded because the event queue was full.
* If this number is non zero without having been affected by AWS CloudWatch Logs availability issues,
* you should consider increasing maxEventQueueSize in the config to allow more log events to be buffer before having to drop them.
*/
public long getDiscardedCount() {
return discardedCount;
}
@Override
public void append(LogEvent event) {
String message = event.getMessage().getFormattedMessage();
Throwable thrown = event.getThrown();
while (thrown != null) {
message += "\n" + dump(thrown);
thrown = thrown.getCause();
if (thrown != null) {
message += "\nCaused by:";
}
}
Marker marker = event.getMarker();
String eventId = marker == null ? null : marker.getName();
CloudwatchLogsLogEvent logEvent = new CloudwatchLogsLogEvent(event.getLevel().toString(), event.getLoggerName(), eventId, message, event.getTimeMillis(), event.getThreadName());
while (!eventQueue.offer(logEvent)) {
eventQueue.poll();
discardedCount++;
}
}
private String dump(Throwable throwableProxy) {
StringBuilder builder = new StringBuilder();
builder.append(throwableProxy.getClass().getName()).append(": ").append(throwableProxy.getMessage()).append("\n");
for (StackTraceElement step : throwableProxy.getStackTrace()) {
String string = step.toString();
builder.append("\t").append(string);
builder.append(step);
builder.append("\n");
}
return builder.toString();
}
}
Here is the CloudwatchLogsLogEventPutter
public class CloudwatchLogsLogEventPutter implements Runnable {
private static int MAX_FLUSH_DELAY = 500 * 1000 * 1000;
private static final int MAX_BATCH_COUNT = 10000;
private static final int MAX_BATCH_SIZE = 1048576;
private final CloudwatchLogsConfig config;
private final BlockingQueue<CloudwatchLogsLogEvent> eventQueue;
private final AWSLogs logsClient;
private final ObjectMapper objectMapper = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL);
private final boolean enabled;
private boolean running;
private String module;
private String logGroupName;
private int batchSize;
private long lastFlush;
private List<InputLogEvent> eventBatch;
private String nextSequenceToken;
private final AtomicLong processedCount = new AtomicLong(0);
/**
* Creates a new EventPutter for the current AWS region.
*
* @param config The config to use.
* @param eventQueue The event queue to consume from.
* @return The new EventPutter.
*/
public static CloudwatchLogsLogEventPutter create(CloudwatchLogsConfig config, BlockingQueue<CloudwatchLogsLogEvent> eventQueue) {
boolean enabled = config.getRegion() != null || config.getEndpoint() != null;
AWSLogs logsClient = enabled ? createLogsClient(config) : null;
CloudwatchLogsLogEventPutter logPutter = new CloudwatchLogsLogEventPutter(config, eventQueue, logsClient, enabled);
return logPutter;
}
/**
* For internal use only. This constructor lets us switch the AWSLogs implementation for testing.
*/
public CloudwatchLogsLogEventPutter(CloudwatchLogsConfig config, BlockingQueue<CloudwatchLogsLogEvent> eventQueue,
AWSLogs awsLogs, boolean enabled) {
this.config = config;
module = config.getModule();
this.eventQueue = eventQueue;
this.enabled = enabled;
logsClient = awsLogs;
if(config.getFlushDelayInMills() > 0) {
MAX_FLUSH_DELAY = config.getFlushDelayInMills() * 1000;
}
logGroupName = config.getLogGroupName();
}
static AWSLogs createLogsClient(CloudwatchLogsConfig config) {
AWSLogsClientBuilder builder = AWSLogsClientBuilder.standard();
if (config.getEndpoint() != null) {
// Non-AWS mock endpoint
builder.setCredentials(new AWSStaticCredentialsProvider(new AnonymousAWSCredentials()));
builder.setEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(config.getEndpoint(), config.getRegion()));
} else {
builder.setRegion(config.getRegion());
}
return builder.build();
}
/**
* @return The number of log events that have been processed by this putter.
*/
public long getProcessedCount() {
return processedCount.get();
}
@Override
public void run() {
if (!enabled && !config.isStdoutFallback()) {
System.out.println("WARNING: AWS CloudWatch Logs appender is disabled (Unable to detect the AWS region and no CloudWatch Logs endpoint specified)");
return;
}
running = true;
nextSequenceToken = null;
eventBatch = new ArrayList<>();
batchSize = 0;
lastFlush = System.nanoTime();
printWithTimestamp(new Date(), "Initiating the while loop...");
while (running) {
CloudwatchLogsLogEvent event = eventQueue.poll();
printWithTimestamp(new Date(), "Inside Loopity loop...");
if (event != null) {
Map<String, Object> eventMap = new TreeMap<>();
eventMap.put("context", config.getContext());
eventMap.put("module", config.getModule());
eventMap.put("level", event.getLevel());
eventMap.put("event", event.getEvent());
eventMap.put("message", event.getMessage());
eventMap.put("logger", event.getLogger());
eventMap.put("thread", event.getThread());
String eventJson;
try {
eventJson = toJson(eventMap);
} catch (JsonProcessingException e) {
printWithTimestamp(new Date(), "Unable to serialize log event: " + eventMap);
continue;
}
// Source: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
// The maximum batch size is 1,048,576 bytes,
int eventSize =
// and this size is calculated as the sum of all event messages in UTF-8,
eventJson.getBytes(StandardCharsets.UTF_8).length
// plus 26 bytes for each log event.
+ 26;
if (eventSize > MAX_BATCH_SIZE) {
printWithTimestamp(new Date(), "Unable to send log event as its size (" + eventSize + " bytes)"
+ " exceeds the maximum size supported by AWS CloudWatch Logs (" + MAX_BATCH_SIZE + " bytes): " + eventMap);
continue;
}
if (config.isDebug()) {
printWithTimestamp(new Date(), "Event Size: " + eventSize + " bytes, Batch Size: " + batchSize
+ " bytes, Batch Count: " + eventBatch.size() + ", Event: " + eventJson);
}
if ((eventBatch.size() + 1) >= MAX_BATCH_COUNT || (batchSize + eventSize) >= MAX_BATCH_SIZE) {
flush();
}
eventBatch.add(new InputLogEvent().withMessage(eventJson).withTimestamp(event.getTimestamp()));
batchSize += eventSize;
printWithTimestamp(new Date(event.getTimestamp()), "batchSize = " + batchSize);
} else {
printWithTimestamp(new Date(), "No events, just flush attempts...");
if (!eventBatch.isEmpty() && isTimeToFlush()) {
printWithTimestamp(new Date(), "eventbatch is not empty and its time to flush");
flush();
}
try {
printWithTimestamp(new Date(), "going to sleep...");
Thread.sleep(100);
printWithTimestamp(new Date(), "done sleeping...");
} catch (InterruptedException e) {
printWithTimestamp(new Date(), "Exception while flusing and sleeping...");
running = false;
}
}
}
printWithTimestamp(new Date(), "Done with that while loop...");
}
private void finalFlush() {
printWithTimestamp(new Date(), "finalFlush() called...");
if (!eventBatch.isEmpty()) {
printWithTimestamp(new Date(), "finalFlush() ==> flush()...");
flush();
printWithTimestamp(new Date(), "finalFlush() ==> flush()... DONE");
}
try {
printWithTimestamp(new Date(), "finalFlush() ==> Sleeping...");
Thread.sleep(100);
printWithTimestamp(new Date(), "finalFlush() ==> Sleeping... DONE");
} catch (InterruptedException e) {
printWithTimestamp(new Date(), "Exception while finalFlusing and sleeping... setting running to false");
running = false;
}
}
private boolean isTimeToFlush() {
return lastFlush <= (System.nanoTime() - MAX_FLUSH_DELAY);
}
private void flush() {
printWithTimestamp(new Date(),"flush() called");
Collections.sort(eventBatch, new Comparator<InputLogEvent>() {
@Override
public int compare(InputLogEvent o1, InputLogEvent o2) {
return o1.getTimestamp().compareTo(o2.getTimestamp());
}
});
if (config.isStdoutFallback()) {
for (InputLogEvent event : eventBatch) {
printWithTimestamp(new Date(event.getTimestamp()), logGroupName + " " + module + " " + event.getMessage());
}
} else {
int retries = 15;
do {
printWithTimestamp(new Date(),"flush() - prepping PutLogEventsRequest");
PutLogEventsRequest request =
new PutLogEventsRequest(logGroupName, module, eventBatch).withSequenceToken(nextSequenceToken);
try {
long start = 0;
if (config.isDebug()) {
start = System.nanoTime();
}
PutLogEventsResult result = logsClient.putLogEvents(request);
if (config.isDebug()) {
long stop = System.nanoTime();
long elapsed = (stop - start) / 1000000;
printWithTimestamp(new Date(), "Sending " + eventBatch.size() + " events took " + elapsed + " ms");
}
processedCount.addAndGet(request.getLogEvents().size());
nextSequenceToken = result.getNextSequenceToken();
break;
} catch (DataAlreadyAcceptedException e) {
nextSequenceToken = e.getExpectedSequenceToken();
printWithTimestamp(new Date(),"flush() - received DataAlreadyAcceptedException");
} catch (InvalidSequenceTokenException e) {
nextSequenceToken = e.getExpectedSequenceToken();
printWithTimestamp(new Date(),"flush() - received InvalidSequenceTokenException");
} catch (ResourceNotFoundException e) {
printWithTimestamp(new Date(), "Unable to send logs to AWS CloudWatch Logs at "
+ logGroupName + ">" + module + " (" + e.getErrorMessage() + "). Dropping log events batch ...");
break;
} catch (SdkClientException e) {
try {
printWithTimestamp(new Date(),"flush() - received SDKClientException. Sleeping to retry");
Thread.sleep(1000);
printWithTimestamp(new Date(),"flush() - received SDKClientException. Sleeping DONE");
} catch (InterruptedException e1) {
System.out.println("SDKException while pushing logs to cloudwatch ...");
}
if (--retries > 0) {
printWithTimestamp(new Date(), "Attempt " + (15-retries) + "Unable to send logs to AWS CloudWatch Logs ("
+ e.getMessage() + "). Dropping log events batch ...");
}
}
} while (retries > 0); // && eventBatch.size() > 0
}
eventBatch = new ArrayList<>();
batchSize = 0;
lastFlush = System.nanoTime();
}
/* private -> for testing */
String toJson(Map<String, Object> eventMap) throws JsonProcessingException {
// Compensate for https://github.com/FasterXML/jackson-databind/issues/1442
Map<String, Object> nonNullMap = new TreeMap<>();
for (Map.Entry<String, Object> entry : eventMap.entrySet()) {
if (entry.getValue() != null) {
nonNullMap.put(entry.getKey(), entry.getValue());
}
}
return objectMapper.writeValueAsString(nonNullMap);
}
private void printWithTimestamp(Date date, String str) {
System.out.println(new SimpleDateFormat("YYYY-MM-dd HH:mm:ss.SSS").format(date) + " " + str);
}
public void terminate() {
printWithTimestamp(new Date(),"terminate() ==> finalFlush()");
//finalFlush();
printWithTimestamp(new Date(),"terminate() ==> finalFlush() DONE. Setting running=false");
running = false;
}
}
CloudwatchLogsLogEvent
public class CloudwatchLogsLogEvent {
private final String level;
private final String logger;
private final String event;
private final String message;
private final long timestamp;
private final String thread;
public CloudwatchLogsLogEvent(String level, String logger, String event, String message, long timestamp, String thread) {
this.level = level;
this.logger = logger;
this.event = event;
this.message = message;
this.timestamp = timestamp;
this.thread = thread;
}
public String getLevel() {
return level;
}
public String getLogger() {
return logger;
}
public String getEvent() {
return event;
}
public String getMessage() {
return message;
}
public long getTimestamp() {
return timestamp;
}
public String getThread() {
return thread;
}
}
and lastly a sample log4j2.xml configuration
<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="trace" package="com.cloudwatchlogs.appender.log4j2">
<Appenders>
<CloudwatchLogs-Appender name="myCloudWatchLogger">
<region>us-west-2</region>
<logGroupName>myCloudWatchLogGroup</logGroupName>
<module>myCloudWatchLogStream</module>
<flushDelayInMillis>1</flushDelayInMillis>
<!-- Optional config parameters -->
<!-- Whether to fall back to stdout instead of disabling the appender when running outside of a Boxfuse instance. Default: false -->
<stdoutFallback>false</stdoutFallback>
<!-- The maximum size of the async log event queue. Default: 1000000.
Increase to avoid dropping log events at very high throughput.
Decrease to reduce maximum memory usage at the risk if the occasional log event drop when it gets full. -->
<maxEventQueueSize>1000000</maxEventQueueSize>
</CloudwatchLogs-Appender>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="DEBUG">
<AppenderRef ref="console"/>
</Root>
<Logger name="com.mycompany.src" level="DEBUG" additivity="false">
<AppenderRef ref="myCloudWatchLogger" level="DEBUG"/>
</Logger>
</Loggers>
</Configuration>
I tried using this config in a very simple app -
package ...
import ...
public class MyApp
{
private static Logger logger = LogManager.getLogger(MyApp.class);
AmazonS3 s3Client = null;
AmazonDynamoDB dynamoDBClient = null;
MyApp() {
initS3Client(new DefaultAWSCredentialsProviderChain());
}
public void listObjects(String bucketName) {
ObjectListing objectListing = s3Client.listObjects(bucketName);
logger.info("Listing objects in bucket - " + bucketName);
List<String> commonPrefixes = objectListing.getCommonPrefixes();
commonPrefixes.stream().forEach(s -> System.out.println("commonPrefix - " + s));
List<S3ObjectSummary> objectSummaries = objectListing.getObjectSummaries();
for(S3ObjectSummary objectSummary : objectSummaries) {
logger.info("key = " + objectSummary.getKey());
logger.info("ETag = " + objectSummary.getETag());
logger.info("Size = " + objectSummary.getSize());
logger.info("Storage Class = " + objectSummary.getStorageClass());
logger.info("Last Modified = " + objectSummary.getLastModified());
}
s3Client.shutdown();
}
public static void main(String[] args){
MyApp myApp = new MyApp();
myApp.listObjects("test-bucket");
}
void initS3Client(AWSCredentialsProvider credentialsProvider) {
AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(Regions.US_WEST_2);
s3Client = clientBuilder.build();
}
void initDynamoDBClient(AWSCredentialsProvider credentialsProvider) {
AmazonDynamoDBClientBuilder clientBuilder = AmazonDynamoDBClientBuilder.standard()
.withCredentials(credentialsProvider)
.withRegion(Regions.US_WEST_2);
dynamoDBClient = clientBuilder.build();
}
}
When I run the MyApp.java, I see that after all the relevant logs are streamed to CloudWatch, the while loop in the CloudwatchLogsLogEventPutter.java's run() method does not terminate. I understand it is a separate thread that is running forever, but shouldn't log4j2 be initiating the stop() method in the lifecycle by itself once the application related tasks in the MyApp.main() method are complete?
If I try to do a Ctrl+C, I see the below overriden stop() method from the CloudwatchLogsLog4J2Appender.java
being called -
public boolean stop(long timeout, TimeUnit timeUnit)
I am not sure where I am going wrong and there seems to be very little documentation around handling the various lifecycle methods for Appender and the lifecycle itself. This is my first time writing an appender. Any help is appreciated. Thanks.
Update 1: Sample log file - https://gist.github.com/dev-usa/822309bcd8b4f8a5fb0f4e1eca70d67e