23

We're using the Stream functionality in RavenDB to load, transform and migrate data between 2 databases like so:

var query = originSession.Query<T>(IndexForQuery);

using (var stream = originSession.Advanced.Stream(query))
{
    while (stream.MoveNext())
    {
        var streamedDocument = stream.Current.Document;

        OpenSessionAndMigrateSingleDocument(streamedDocument);
    }
}

The problem is that one of the collections has millions of rows, and we keep receiving an IOException in the following format:

Application: MigrateToNewSchema.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.IO.IOException
Stack:
   at System.Net.ConnectStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32)
   at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef)
   at System.IO.StreamReader.Read(Char[], Int32, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read()
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext()
   at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext()
   at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection()
   at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore)
   at MigrateToNewSchema.Program.Main(System.String[])

This happens quite a long way into streaming and of course transient connection issues will occur over this sort of period (It takes hours to complete).

However, when we retry, as we are using a Query we have to start from scratch. So ultimately if there is a connection failure during the whole Stream then we have to try it again, and again until it works end to end.

I know you can use ETag with stream to effectively restart at a certain point, however there is no overload to do this with a Query which we need to filter the results being migrated and specify the correct collection.

So, in RavenDB, is there a way to either improve the internal resilience of the connection (connection string property, internal settings etc) or effectively "recover" a stream on an error?

Luke Merrett
  • 5,724
  • 8
  • 38
  • 70
  • I've discovered [Data Subscriptions](http://ravendb.net/docs/article-page/3.0/csharp/client-api/data-subscriptions/how-to-create-data-subscription), a RavenDb 3.0 feature that provides a reliable mechanism for iterating across a collection of documents matching specified criteria, and allowing you to easily pick up where you left off. If someone were willing to put together some code samples showing how that feature might answer this question, I'd consider that worthy of the bounty. – StriplingWarrior Oct 06 '15 at 14:30
  • Are you tied to using a query? Although it will be more inefficent, this is a migration so memory is not a problem--why not iterate the raw doc collections and filter in-memory, so you can resume at an Etag? This is how I handle all streaming, I never use queries. – kamranicus Dec 30 '15 at 14:51
  • @StriplingWarrior It has been a while :-) I don't work for the company using RavenDB anymore but this still interests me so I'll stick an answer up with the data subscription code today – Luke Merrett Feb 07 '16 at 11:06

1 Answers1

2

As per the suggestion from @StriplingWarrior I've recreated the solution using Data Subscriptions.

Using this approach I was able to iterate over all 2 million rows (though admittedly with much less processing per item); 2 points here that would have helped when we were trying to implement the same logic using Streams:

  1. Batches only get removed from the subscription "queue" once acknowledged (like most standard queues)
    1. The subscribed IObserver<T> has to complete successfully for this acknowledgment to be set.
    2. This information is handled by the server rather than the client so allows the client to restart without affecting the last successful position processed in the subscription
    3. See here for more details
  2. As @StriplingWarrior indicated because you can create subscriptions with filters right down to property level it would be possible to replay with a smaller result set in the event of an exception within the subscription itself.
    1. The first point really supersedes this; but it allows us additional flexibility not seen in the Stream API

The testing environment is a RavenDB 3.0 database (local machine, running as a windows service) with default settings against a collection of 2 million records.

Code to generate the dummy records:

using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();

    using (var bulkInsert = store.BulkInsert())
    {
        for (var i = 0; i != recordsToCreate; i++)
        {
            var person = new Person
            {
                Id = Guid.NewGuid(),
                Firstname = NameGenerator.GenerateFirstName(),
                Lastname = NameGenerator.GenerateLastName()
            };

            bulkInsert.Store(person);
        }
    }
}

Subscribing to this collection is then a case of:

using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>());

    var personSubscription = store.Subscriptions.Open<Person>(
        subscriptionId, new SubscriptionConnectionOptions()
    {
        BatchOptions = new SubscriptionBatchOptions()
        {
            // Max number of docs that can be sent in a single batch
            MaxDocCount = 16 * 1024,  
            // Max total batch size in bytes
            MaxSize = 4 * 1024 * 1024,
            // Max time the subscription needs to confirm that the batch
            // has been successfully processed
            AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
        },
        IgnoreSubscribersErrors = false,
        ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
    });

    personSubscription.Subscribe(new PersonObserver());

    while (true)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(500));
    }
}

Note the PersonObserver; this is just a basic implementation of IObserver like so:

public class PersonObserver : IObserver<Person>
{
    public void OnCompleted()
    {
        Console.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("Error occurred: " + error.ToString());
    }

    public void OnNext(Person person)
    {
        Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'");
    }
}
Luke Merrett
  • 5,724
  • 8
  • 38
  • 70
  • 1
    Nice write-up. I found it useful to pass in a `Task` (or create a `Task` based on a given `CancellationToken`), and `await` the task rather than `while(true)`. That way, the calling code can safely cancel the operation without killing the whole thread or process. I also came up with an ETag-based mechanism to help the migration know when it's "done" hitting all the target documents, so it can stop itself, but it's pretty tricky and isn't great for all purposes. – StriplingWarrior Feb 08 '16 at 19:09