I have a monthly data archiving webjob running on Azure that pulls a month's worth of data for about 5000 remote devices that are regularly posting data into Azure tables. The basic approach after the data is retrieved is to serialize it into a Memory Stream, then GZIP compress this stream into another stream and then upload the resulting stream to Azure Datalake blob storage. Here is the code currently being used:
public static async Task<String> ArchiveDataAsync(String container, String path, String filenameWithoutExtension, Object data)
{
try
{
Console.WriteLine("Attempting to serialize, compress and upload data to data lake...");
using (var uncompressedMs = new MemoryStream())
using (var compressedMs = new MemoryStream())
using (var streamWriter = new StreamWriter(uncompressedMs))
{
// Serialize data
Console.WriteLine("Starting serialization...");
var serializer = new JsonSerializer
{
NullValueHandling = NullValueHandling.Ignore,
Formatting = Formatting.None
};
serializer.Serialize(streamWriter, data);
uncompressedMs.Seek(0, SeekOrigin.Begin);
Console.WriteLine("Serialization completed successfully.");
Console.WriteLine("Starting data compression...");
using (var compressionStream = new GZipStream(compressedMs, CompressionMode.Compress))
{
// Compress data
await uncompressedMs.CopyToAsync(compressionStream);
await compressionStream.FlushAsync();
compressedMs.Seek(0, SeekOrigin.Begin);
Console.WriteLine($"Data compression successfully completed. Compression rate achieved: {Math.Round(100 - (100.0 * compressedMs.Length / uncompressedMs.Length), 1)}%.");
// Save data to Data Lake
Console.WriteLine($"Starting data upload to {path + "/" + filenameWithoutExtension}.json.gz...");
var dataLakeServiceClient = InSysDataLake.BlobStorage.GetDataLakeServiceClient();
var dataLakeFileSystemClient = InSysDataLake.BlobStorage.GetFileSystemClient(dataLakeServiceClient, container);
await InSysDataLake.BlobStorage.UploadFileFromMemoryStreamAsync(dataLakeFileSystemClient, path, filenameWithoutExtension + ".json.gz", compressedMs);
Console.WriteLine("Data upload completed successfully.");
return "OK";
}
}
}
catch (Exception ex)
{
Trace.TraceError(Helper.GenerateErrorMessage("InSysDataProcessing.DataArchiving.ArchiveDataAsync", ex));
return $"ERROR: {ex.StackTrace}";
}
}
The UploadFileFromMemoryStream method is:
public static async Task UploadFileFromMemoryStreamAsync(DataLakeFileSystemClient fileSystemClient, String directoryName, String filename, MemoryStream contentMS)
{
DataLakeDirectoryClient directoryClient = fileSystemClient.GetDirectoryClient(directoryName);
DataLakeFileClient fileClient = await directoryClient.CreateFileAsync(filename).ConfigureAwait(false);
contentMS.Position = 0;
var contentSize = contentMS.Length;
await fileClient.AppendAsync(contentMS, offset: 0).ConfigureAwait(false);
await fileClient.FlushAsync(position: contentSize).ConfigureAwait(false);
}
The compressed streams end up around 5-20MB, and the uncompressed serialized data can be 5-10 times bigger (as big as 250Mb). The process using this archiving method is calling this to store the monthly data for each device, and then it aggregates all the data for the devices at a particular location and archives that. It is getting called 5-10000 times over the space of 5-10 hours.
Every now and then (but frequently enough to be a problem) I see the following error:
[07/30/2020 16:43:07 > 2ef8ea: INFO] Monthly location data for ... for 3/2019 FAILED to be archived: ERROR: at System.IO.MemoryStream.set_Capacity(Int32 value)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at System.IO.MemoryStream.EnsureCapacity(Int32 value)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at System.IO.MemoryStream.Write(Byte[] buffer, Int32 offset, Int32 count)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at System.IO.StreamWriter.Flush(Boolean flushStream, Boolean flushEncoder)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at System.IO.StreamWriter.Write(Char value)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.JsonTextWriter.WriteValueDelimiter()
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.JsonWriter.AutoComplete(JsonToken tokenBeingWritten)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.JsonTextWriter.WritePropertyName(String name, Boolean escape)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeObject(JsonWriter writer, Object value, JsonObjectContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeList(JsonWriter writer, IEnumerable values, JsonArrayContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeList(JsonWriter writer, IEnumerable values, JsonArrayContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeObject(JsonWriter writer, Object value, JsonObjectContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.Serialize(JsonWriter jsonWriter, Object value, Type objectType)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.JsonSerializer.SerializeInternal(JsonWriter jsonWriter, Object value, Type objectType)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at InSysDataProcessing.DataArchiving.ArchiveDataAsync(String container, String path, String filenameWithoutExtension, Object data) in E:\InergySystems\GitHub\InSysCore\InSysDataManagement\DataArchiving.cs:line 60.
This only happens for the aggregated location data, i.e., for the larger data sets. It looks like after a while the memory is getting too fragmented. In the process that calls the archiving function I added:
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
GC.Collect();
after each call to the archiver, but this has not helped. I've also tried scaling up the App Service Plan on which the webjob is running (currently a P3v2, 16 GB RAM, but this performed no better than the plan with 7GB RAM).
At this point I'm not sure what to try next.