6

I have a JSON-RPC service which for one of the requests returns a continuous stream of JSON objects.

I.e. :

{id:'1'}
{id:'2'}
//30 minutes of no data
{id:'3'}
//...

Of course, there's no Content-Length because the stream is endless.

I'm using custom TStream descendant to receive and parse the data. But internally TIdHttp buffers the data and does not pass it to me until RecvBufferSize bytes are received.

This results in:

{id:'1'} //received
{id:'2'} //buffered by Indy but not received
//30 minutes of no data
{id:'3'} //this is where Indy commits {id:'2'} to me

Obviously this won't do because the message which mattered 30 minutes ago should have been delivered 30 minutes ago.

I'd like Indy to do just what sockets do: read up to RecvBufferSize or less if there's data available and return immediately.

I've found this discussion from 2005 where some poor soul tried to explain the problem to Indy developers but they didn't understand him. (Read it; it's a sad sight)

Anyway, he worked around this by writing custom IOHandler descendant, but that was back in 2005, maybe there are some ready solutions today?

himself
  • 4,806
  • 2
  • 27
  • 43

4 Answers4

4

Sounds to me like a WebSocket task, since your connection is not plain HTTP question/answer oriented any more, but a stream of content.

See WebSocket server implementations for Delphi for some code.

There is at least one based on Indy, from the author of AsmProfiler.

AFAIK there are two kind of stream in websockets: binary and text. I suspect your JSON stream is some text content, from the websocket point of view.

Another option is to use long-pooling or some older protocols, which are more rooter-friendly - when the connection switch to websockets mode, it is no standard HTTP any more, so some "sensible" packet-inspection tools (on a corporate network) may identify it as a security attack (e.g. DoS), so may stop the connection.

Community
  • 1
  • 1
Arnaud Bouchez
  • 42,305
  • 3
  • 71
  • 159
  • If I get it right, both solutions require rewriting the service? Because I don't have access to it. – himself Mar 26 '13 at 09:08
  • @himself If your request is to have the connection open and not use Content-Length headers, this is not HTTP any more, therefore I suppose you will have to change the service side! – Arnaud Bouchez Mar 26 '13 at 12:42
  • Mhm, guess what the service side will say? "Nowhere in HTTP standard it says that HTTP middleware can buffer data for extended periods of time. Therefore our service is fine, I suppose you'll have to fix your HTTP client code". Back to square one. – himself Mar 26 '13 at 13:51
  • @ArnaudBouchez it is still HTTP - content-length header can be ommitted, and the server simply closes the connection at the end (see [RFC 2616](http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.4)), and connections are persistent in HTTP 1.1 by default – mjn Mar 27 '13 at 10:55
  • @mjn My point was that even if it "may" be compliant with HTTP standard, websockets layout does not smell good for some OSI level 7 security inspectors. Server or router may stop the connection with some kind of timeout, if it did not recognize the stream as a "classic" HTTP stateless process. It may be the open door to DoS - see https://bugs.webkit.org/show_bug.cgi?id=32246 and http://media.blackhat.com/bh-us-12/Briefings/Shekyan/BH_US_12_Shekyan_Toukharian_Hacking_Websocket_Slides.pdf – Arnaud Bouchez Mar 28 '13 at 16:39
2

You do not need to write a IOHandler descendant, it is already possible with the TIdTCPClient class. It exposes a TIdIOHandler object, which has methods to read from the socket. These ReadXXX methods block until the requested data has been read or a timeout occurs. As long as the connection exists, ReadXXX can be executed in a loop and whenever it receives a new JSON object, pass it to the application logic.

Your example looks like all JSON objects only have one line. JSON objects however could be multi-lined, in this case the client code needs to know how they are separated.


Update: in a similar Stackoverflow question (for .Net) for a 'streaming' HTTP JSON web service, the most upvoted solution used a lower-level TCP client instead of a HTTP client: Reading data from an open HTTP stream

Community
  • 1
  • 1
mjn
  • 36,362
  • 28
  • 176
  • 378
2

While using TCP stream was an option, in the end I went with original solution of writing custom TIdIOHandlerStack descendant.

The motivation was that with TIdHTTP I know what doesn't work and only need to fix that, while switching to lower level TCP means new problems can arise.

Here's the code that I'm using, and I'm going to discuss the key points here.

New TIdStreamIoHandler has to inherit from TIdIOHandlerStack.

Two functions need to be rewritten: ReadBytes and ReadStream:

function TryReadBytes(var VBuffer: TIdBytes; AByteCount: Integer;
  AAppend: Boolean = True): integer; virtual;
procedure ReadStream(AStream: TStream; AByteCount: TIdStreamSize = -1;
  AReadUntilDisconnect: Boolean = False); override;

Both are modified Indy functions which can be found in IdIOHandler.TIdIOHandler. In ReadBytes the while clause has to be replaced with a singe ReadFromSource() request, so that TryReadBytes returns after reading up to AByteCount bytes in one go.

Based on this, ReadStream has to handle all combinations of AByteCount (>0, <0) and ReadUntilDisconnect (true, false) to cyclically read and then write to stream chunks of data arriving from the socket.

Note that ReadStream need not terminate prematurely even in this stream version if only part of the requested data is available in the socket. It just has to write that part to the stream instantly instead of caching it in FInputBuffer, then block and wait for the next part of data.

himself
  • 4,806
  • 2
  • 27
  • 43
  • as Indy is open source, modified sources may (and, if helpful for others, should) be made public – mjn Apr 24 '13 at 13:03
0

There is actually a length data right before the content of packet which transferred in chunked encoding transfer mode. Using this length data, IOhandler of idhttp read one packet by one packet to stream. The minimum meaningful unit is a packet, So there should be no need to read characters one by one from a packet and then no need to change the functions of IOHandler. The only problem is idhttp wouldn't stop an turn the stream data to next step because of the endless of stream data: there is no ending packet. So the solution is using idhttp onwork event to trigger a reading from stream and setting the stream position to zero in order to avoid overflow .like this:

    //add a event handler to idhttp    
    IdHTTP.OnWork :=  IdHTTPWork;


    procedure  TRatesStreamWorker.IdHTTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    begin
         .....
         ResponseStringStream.Position :=0; 
         s:=ResponseStringStream.ReadString(ResponseStringStream.Size) ;//this is the packet conten
         ResponseStringStream.Clear;
         ... 
    end;

procedure TForm1.ButtonGetStreamPricesClick(Sender: TObject);
var
 begin
    .....    
    source := RatesWorker.RatesURL+'EUR_USD';  
    RatesWorker.IdHTTP.Get(source,RatesWorker.ResponseStringStream);  
 end;

Yet use a custom write() function of Tstream may be a better solution for this kind of requirement.

Ming
  • 1