9

I am trying to connect to the Twitter streaming API endpoint. It looks like URLSession supports streaming via URLSessionStreamTask, however I can't figure out how to use the API. I have not been able to find any sample code either.

I tried testing the following, but there is no network traffic recorded:

let session = URLSession(configuration: .default, delegate: self, delegateQueue: nil)
let stream = session.streamTask(withHostName: "https://stream.twitter.com/1.1/statuses/sample.json", port: 22)
stream.startSecureConnection()
stream.readData(ofMinLength: 0, maxLength: 100000, timeout: 60, completionHandler: { (data, bool, error) in
   print("bool = \(bool)")
   print("error = \(String(describing: error))")
})
stream.resume()

I've also implemented the delegate methods (including URLSessionStreamDelegate), but they do not get called.

It would be really helpful if someone code post a sample of how to open a persistent connection for chunked responses from a streaming endpoint. Also, I am seeking solutions which don't involve third party libraries. A response similar to https://stackoverflow.com/a/9473787/5897233 but updated with the URLSession equivalent would be ideal.

Note: Authorization info was omitted from the sample code above.

iOS dev
  • 522
  • 6
  • 14

2 Answers2

30

Received lots of info courtesy of Quinn "The Eskimo" at Apple.

Alas, you have the wrong end of the stick here. URLSessionStreamTask is for wrangling a naked TCP (or TLS over TCP) connection, without the HTTP framing on top. You can think of it as a high-level equivalent to the BSD Sockets API.

The chunked transfer encoding is part of HTTP, and is thus supported by all of the other URLSession task types (data task, upload task, download task). You don’t need to do anything special to enable this. The chunked transfer encoding is a mandatory part of the HTTP 1.1 standard, and is thus is always enabled.

You do, however, have an option as to how you receive the returned data. If you use the URLSession convenience APIs (dataTask(with:completionHandler:) and so on), URLSession will buffer all the incoming data and then pass it to your completion handler in one large Data value. That’s convenient in many situations but it doesn’t work well with a streamed resource. In that case you need to use the URLSession delegate-based APIs (dataTask(with:) and so on), which will call the urlSession(_:dataTask:didReceive:) session delegate method with chunks of data as they arrive.

As for the specific endpoint I was testing, the following was uncovered: It seems that the server only enables its streaming response (the chunked transfer encoding) if the client sends it a streaming request. That’s kinda weird, and definitely not required by the HTTP spec.

Fortunately, it is possible to force URLSession to send a streaming request:

  1. Create your task with uploadTask(withStreamedRequest:)

  2. Implement the urlSession(_:task:needNewBodyStream:) delegate method to return an input stream that, when read, returns the request body

  3. Profit!

I’ve attached some test code that shows this in action. In this case it uses a bound pair of streams, passing the input stream to the request (per step 2 above) and holding on to the output stream.

If you want to actually send data as part of the request body you can do so by writing to the output stream.

class NetworkManager : NSObject, URLSessionDataDelegate {

static var shared = NetworkManager()

private var session: URLSession! = nil

override init() {
    super.init()
    let config = URLSessionConfiguration.default
    config.requestCachePolicy = .reloadIgnoringLocalCacheData
    self.session = URLSession(configuration: config, delegate: self, delegateQueue: .main)
}

private var streamingTask: URLSessionDataTask? = nil

var isStreaming: Bool { return self.streamingTask != nil }

func startStreaming() {
    precondition( !self.isStreaming )

    let url = URL(string: "ENTER STREAMING URL HERE")!
    let request = URLRequest(url: url)
    let task = self.session.uploadTask(withStreamedRequest: request)
    self.streamingTask = task
    task.resume()
}

func stopStreaming() {
    guard let task = self.streamingTask else {
        return
    }
    self.streamingTask = nil
    task.cancel()
    self.closeStream()
}

var outputStream: OutputStream? = nil

private func closeStream() {
    if let stream = self.outputStream {
        stream.close()
        self.outputStream = nil
    }
}

func urlSession(_ session: URLSession, task: URLSessionTask, needNewBodyStream completionHandler: @escaping (InputStream?) -> Void) {
    self.closeStream()

    var inStream: InputStream? = nil
    var outStream: OutputStream? = nil
    Stream.getBoundStreams(withBufferSize: 4096, inputStream: &inStream, outputStream: &outStream)
    self.outputStream = outStream

    completionHandler(inStream)
}

func urlSession(_ session: URLSession, dataTask: URLSessionDataTask, didReceive data: Data) {
    NSLog("task data: %@", data as NSData)
}

func urlSession(_ session: URLSession, task: URLSessionTask, didCompleteWithError error: Error?) {
    if let error = error as NSError? {
        NSLog("task error: %@ / %d", error.domain, error.code)
    } else {
        NSLog("task complete")
    }
}
}

And you can call the networking code from anywhere such as:

class MainViewController : UITableViewController {

override func tableView(_ tableView: UITableView, didSelectRowAt indexPath: IndexPath) {
    if NetworkManager.shared.isStreaming {  
       NetworkManager.shared.stopStreaming() 
    } else {
       NetworkManager.shared.startStreaming() 
    }
    self.tableView.deselectRow(at: indexPath, animated: true)
}
}

Hope this helps.

iOS dev
  • 522
  • 6
  • 14
  • I just stumbled on this very helpful answer and while it works great when my app is in the foreground it does not work when I transition to the background. Basically I communicate with a RESTful endpoint that I ask for an event stream. The event stream opens and faithfully delivers the event until the moment the app transitions to the background. I no longer get the events even though my stream is still running. I've tried a number of approaches centering around creating a URLSession with a background configuration. Any general thoughts? – QBit Jun 02 '19 at 20:40
  • While this approach works perfectly for me in iOS 12 it no longer works in iOS 13. I get an error "GET method must not have a body". But looking at the request object, the httpBody field is nil which makes no sense to me. Any thoughts? – QBit Sep 25 '19 at 23:10
  • A GET request should never have a body. Use methods PUT or POST to make a request that has a body. – Greg Ball Oct 30 '19 at 02:20
  • QBit, you don't need uploadTask, just change it to `let task = self.session.dataTask(with: request)` and it will work. – Dmitry Mar 15 '20 at 18:51
  • Thank you so much for this! I've spent hours looking at examples and couldn't understand why my delegate method wasn't called. Turns out I was using `URLSessionDelegate` instead of `URLSessionDataDelegate` as mentioned in your answer here. Four letters made all the difference :) Thanks again. – pbodsk Sep 21 '21 at 12:31
0

So, this is a lot less robust than the example with no explicit task canceling or writing to stream but if you're just YOLO listening to a Server Sent Event stream, this works as of Feb of 2023. It's based on "Use async/await with URLSession" WWDC21 session. That session also has an example for using a custom delegate.

https://developer.apple.com/videos/play/wwdc2021/10095/

    func streamReceiverTest(streamURL:URL, session:URLSession)  async throws {
        let (bytes, response) = try await session.bytes(from:streamURL)
        guard let httpResponse = response as? HTTPURLResponse else {
            throw APIngError("Not an HTTPResponse")
        } 
        guard httpResponse.statusCode == 200 else {
            throw APIngError("Not a success: \(httpResponse.statusCode)")
        }
    
        for try await line in bytes.lines {
            print(line)
            print()
        }
    }

Inspecting the request with try await streamReceiverTest(streamURL:URL(string:"https://httpbin.org/get")!, session:URLSession.shared) doesn't show that the Accepts header is set, but it seems the API I'm using does need that to offer the stream. Some servers might(?) so I'll include that version as well.

    func streamReceiverTestWithManualHeader(streamURL:URL, session:URLSession)  async throws {
        var request = URLRequest(url:streamURL)
        request.setValue("text/event-stream", forHTTPHeaderField:"Accept") 
        let (bytes, response) = try await session.bytes(for:request)
        guard let httpResponse = response as? HTTPURLResponse else {
            throw APIngError("Not an HTTPResponse")
        } 
        guard httpResponse.statusCode == 200 else {
            throw APIngError("Not a success: \(httpResponse.statusCode)")
        }
    
        for try await line in bytes.lines {
            print(line)
            print()
        }
    }
carlynorama
  • 166
  • 7