2

I have a Stream containing a collection of objects serialized into JSON, for example:

{"Name":"Alice","Age":20}{"Name":"Bob","Age":30}{"Name":"Charlie","Age":35}{"Name":"Danielle","Age":50}...

This stream may be very large so I want to process the items as they come in rather than reading the entire stream into a string. Also if the stream is a network stream it be open indefinitely.

Some of the objects may be quite complex so I would like to take advantage of the JSON serialization provided by JSON.NET.

Is there a way I can use Newtonsoft Json to convert the stream into an IObservable<Person> and process the items reactively as they arrive?

svick
  • 236,525
  • 50
  • 385
  • 514
Oenotria
  • 1,692
  • 11
  • 24
  • http://stackoverflow.com/a/9026821/468244 should give you an idea on how to continuously read from a json stream. Wrapping into a subject/observable/... should not be an issue. – Simon Opelt Mar 30 '13 at 14:32
  • Would that mean reconstructing the JObject manually from all the tokens? The reader doesn't seem to provide access to the underlying string so I can't see a way to accumulate the text until I hit an EndObject token? – Oenotria Mar 30 '13 at 14:35

1 Answers1

4

If you could modify your input so that it looks like a JSON array, then you could use JsonTextReader directly to read the array parts and then use JsonSerializer to read the objects inside the array.

This way, the results can be streamed, while you don't have to deal with all the deserialization.

The method producing an IObservable could look something like:

public static IObservable<T> DeserializeAsObservable<T>(this TextReader reader)
{
    var jsonReader = new JsonTextReader(reader);

    if (!jsonReader.Read() || jsonReader.TokenType != JsonToken.StartArray)
        throw new Exception();

    var serializer = JsonSerializer.Create(null);

    return Observable.Create<T>(o => 
        {
            while (true)
            {
                if (!jsonReader.Read())
                    o.OnError(new Exception());
                if (jsonReader.TokenType == JsonToken.EndArray)
                    break;
                var obj = serializer.Deserialize<T>(jsonReader);
                o.OnNext(obj);
            }
            o.OnCompleted();
            return Disposable.Empty;
        });
}

(Obviously use this only after replacing new Exception() with a more suitable exception type.)

svick
  • 236,525
  • 50
  • 385
  • 514
  • ...and using OnError(...) instead of throwing in the Observable.Create lamba. You may also want to substitute return (IDisposable)null; for return Disposable.Empty; – Lee Campbell Apr 02 '13 at 11:57