2

I'm trying to stream data to a client over HTTP. To achieve this I'm using a WCF service with a WebHttpBinding. The problem is that the System.IO.Stream that my operation returns is closed before I can write something to it. I'd like to keep the stream open until data needs to be written to it. Usually this will not be more than half a minute.

In the service request method I create a new instance of System.IO.MemoryStream I put that into a collection of all streams and return it as the function output. Later on when there is audio data available I'm writing to all streams that are in the collection. But by then all the the requests have been closed. When I go to the endpoint url I get the browsers standard player completely greyed out. I also tested with a REST client, it showed me that the request closes immediately after the return statement.

The problem is that we use the Libspotify SDK to retrieve music. This sends 8192 bytes of PCM data per cycle. We would like to make it possible for users to play their music from a Chromecast device. The Chromecast doesn't support PCM data that's why we convert it to MP3 with libmp3lame and then send it through the output stream to the Chromecast. For this approach to work we need the connection to stay alive even though there is no actual data being send through the Stream.

The Libspotify music delivery callback can be found here.

This is how I set-up the service:

/// <summary>
/// The WCF service host.
/// </summary>
private ServiceHost ServiceHost;

/// <summary>
/// Start the HTTP WCF service.
/// </summary>
public void startListening()
{
    if (ServiceHost == null)
    {
        ServiceHost = new ServiceHost(typeof(StreamingService));

        var binding = new WebHttpBinding(WebHttpSecurityMode.None);
        binding.TransferMode = TransferMode.StreamedResponse;

        var endpoint = ServiceHost.AddServiceEndpoint(typeof(StreamingContract), binding, new Uri(streamAddress));
        endpoint.EndpointBehaviors.Add(new WebHttpBehavior());

        ServiceHost.Open();
    }
}

This is the Service implementation:

[ServiceContract(Name="StreamingContract")]
interface StreamingContract
{
    [WebGet(UriTemplate="audio")]
    [OperationContract()]
    Stream Audio();
}

[ServiceBehavior(InstanceContextMode = InstanceContextMode.Single,
                 IncludeExceptionDetailInFaults = true)]
public class StreamingService : StreamingContract
{
    public System.IO.Stream Audio()
    {
        var stream = new System.IO.MemoryStream();
        App.Logic.streaming.streams.Add(stream);

        WebOperationContext.Current.OutgoingResponse.ContentType = "audio/mp3";
        WebOperationContext.Current.OutgoingResponse.ContentLength = 1000;

        return stream;
    }
}

I also tried setting: [OperationContext(AutoDisposeParameter=false)] on Audio() in the ServiceContract. This only started throwing a System.InvalidOperationException. I also thought maybe it's a problem that the content length is unknown, that didn't help either.

Feanaro
  • 922
  • 3
  • 19
  • 35
  • 1
    See http://stackoverflow.com/questions/7469955/return-stream-from-wcf-service-using-sqlfilestream?rq=1, http://stackoverflow.com/questions/4748963/data-sent-to-a-wcf-rest-service-wouldnt-go-to-the-stream-input-parameter?rq=1, and http://stackoverflow.com/questions/23604828/how-can-i-stream-a-response-from-wcf-without-buffering?rq=1 for some ideas (from the "Related" panel to the right of your question). – John Saunders Apr 10 '15 at 00:30

2 Answers2

2

Your service is doing exactly what it should - returning an empty stream and then closing the connection.

It sounds like you want to wait for that stream to asynchronously be filled. In order to do that, you'll have to implement some kind of callback. You should look into the Task.Run() method, as that would be the standard way of implementing asynchronous logic in .NET.

John Saunders
  • 160,644
  • 26
  • 247
  • 397
Dan Field
  • 20,885
  • 5
  • 55
  • 71
  • I've looked into that. The problem is that I want to write to the output stream the moment there is new data incoming. So that the client will have the incoming chunk after the server has written it to the stream. When I create an instance of `MemoryStream` I have no way to safely return it without the connection closing. – Feanaro Apr 10 '15 at 01:18
  • You could try triggering your callback as soon as you start writing data, but you'd need some fancy wrangling of your MemoryStream to both read and write in separate threads. You'd also have to worry about whether your consumer was outpacing your producer. Does your write process really take that long? – Dan Field Apr 10 '15 at 01:29
  • Well, the writing process is not the problem. We are building a music player and would like to support the Chromecast. So we'd like to stream MP3 data over a HTTP endpoint to the Chromecast. But the problem is that we send a LOAD request to the Chromecast and start loading the external stream, that takes about a second then data flows from the incoming stream to the lame encoder into the output stream which sends it to the Chromecast. But even if the music is already running and data is being written to the output stream it still closes. – Feanaro Apr 10 '15 at 01:37
  • I'd try just having a static byte array in your method, loading that into a memory stream, and returning it to see if your plan can work at all. Then go from there to find out where the problem is. – Dan Field Apr 10 '15 at 02:00
  • So, you're suggesting writing some zero-bytes to the stream first? Because the data isn't known before external stream has started. – Feanaro Apr 10 '15 at 02:02
  • That only wrote my zero-bytes and after that closed the stream. So sadly that doesn't work as well :/ At least I can rule out the possibility that I couldn't write. – Feanaro Apr 10 '15 at 02:14
  • No, I mean just try to write your raw audio data immediately to the stream and see if it works at all. – Dan Field Apr 10 '15 at 02:22
  • I've tested the output from the lame encoder by writing it to a file, that checked out. After that I tried writing `Encoding.UTF8.GetBytes("Incoming")` as the constructor `Byte[]` for the `MemoryStream`. The when I loaded the URL that worked perfectly. – Feanaro Apr 10 '15 at 02:24
0

Hope you can use this sample as an answer. In this sample you can PUT an stream async to the server. Solution is tested and verified.

Here is an example of an HTTP WCF service(server) hosting the service with async setup:

Uri baseAddress = new Uri("http://localhost:8000/Service1/"); 

        // Step 2 Create a ServiceHost instance to host the service
        using (ServiceHost selfHost = new ServiceHost(typeof(Service1), baseAddress)) // type of class that implements service contract, and base address of service.
        {
            try
            {
                WebHttpBinding binding = new WebHttpBinding();
                //BasicHttpBinding binding = new BasicHttpBinding();
                binding.TransferMode = TransferMode.Streamed;
                binding.MaxReceivedMessageSize = int.MaxValue; //"1000000000000"
                binding.ReceiveTimeout = new TimeSpan(1, 0, 0); //"01:00:00";
                binding.SendTimeout = new TimeSpan(1, 0, 0); //"01:00:00";
                //binding.ReaderQuotas. = int.MaxValue;

                // Step 3 Add a service endpoint to host. Endpoint consist of address, binding and service contract.
                // Note this is optional in Framework 4.0 and upward. generate auto default.
                selfHost.AddServiceEndpoint(typeof(IService1), binding, "").EndpointBehaviors.Add(new WebHttpBehavior()); // service contract interface, binding, address

                // Step 5 Start the service.
                // Open host to listen for incoming messages.
                selfHost.Open();
                Console.WriteLine("The service is ready.");
                Console.WriteLine("Press <ENTER> to terminate service.");
                Console.WriteLine();
                Console.ReadLine();

                // Close the ServiceHostBase to shutdown the service.
                selfHost.Close();
            }
            catch (CommunicationException ce)
            {
                Console.WriteLine("An exception occurred: {0}", ce.Message);
                selfHost.Abort();
            }
        }


    }
}

}

Here is the actual service interface impl:

 [ServiceContract]
//[ServiceBehavior(InstanceContextMode = InstanceContextMode.PerCall)]
public interface IService1
{    

    /// <summary>
    /// An asynchronous service side upload operation.
    /// </summary>
    /// <param name="token">An application arbitrary piece of data.  Can be used for request obfuscation.</param>
    /// <param name="data">The data being uploaded.</param>
    /// <param name="callback">Callback for async pattern, client does not pass this.</param>
    /// <param name="asyncState">User state for async pattern, client does not pass this.</param>
    /// <remarks>
    /// The <paramref name="token"/> parameter is the only parameter passed in the URL by the client.  The <paramref name="data"/>
    /// parameter is the request body, the file being uploaded.
    /// </remarks>
    /// <returns></returns>
    [OperationContract(AsyncPattern = true)]
    [WebInvoke(Method = "PUT", UriTemplate = "asyncupload/")]
    IAsyncResult BeginAsyncUpload(Stream data, AsyncCallback callback, object asyncState);

    /// <summary>
    /// Ends the asynchonous operation initiated by the call to <see cref="BeginAsyncUpload"/>.
    /// </summary>
    /// <remarks>
    /// This is called by the WCF framework service side.  NOTE:  There is no <see cref="OperationContractAttribute"/> decorating
    /// this method.
    /// </remarks>
    /// <param name="ar"></param>
    void EndAsyncUpload(IAsyncResult ar);
}

And the implementation:

public class Service1 : IService1
    {


    /// <summary>
    /// <see cref="IUpload.Upload"/>
    /// </summary>
    /// <param name="token">This parameter is ignored.</param>
    /// <param name="data">Data being uploaded.</param>
    /// <param name="callback">Async callback.</param>
    /// <param name="asyncState">Async user state.</param>
    public IAsyncResult BeginAsyncUpload(Stream data, AsyncCallback callback, object asyncState)
    {
        return new CompletedAsyncResult<Stream>(data);
    }

    /// <summary>
    /// <see cref="IUpload.EndAsyncUpload"/>
    /// </summary>
    public void EndAsyncUpload(IAsyncResult ar)
    {
        Stream data = ((CompletedAsyncResult<Stream>)ar).Data;
        _streamToFile(data);
    }


    /// <summary>
    /// Writes the uploaded stream to a file.
    /// </summary>
    /// <remarks>
    /// This function is just to prove a test.  This simple saves the uploaded data into a file named &quot;upload.dat&quot; in a subdirectory
    /// whose name is created by a generated guid.
    /// </remarks>
    private static void _streamToFile(Stream data)
    {
        // create name of subdirectory
        string subDir = Guid.NewGuid().ToString("N");

        // get full path to and create the directory to save file in
        string uploadDir = Path.Combine(Path.GetDirectoryName(typeof(Service1).Assembly.Location), subDir);
        Directory.CreateDirectory(uploadDir);

        // 64 KiB buffer
        byte[] buff = new byte[0x10000];

        // save the file in chunks
        using (FileStream fs = new FileStream(Path.Combine(uploadDir, "upload.xml"), FileMode.Create))
        {
            int bytesRead = data.Read(buff, 0, buff.Length);
            while (bytesRead > 0)
            {
                fs.Write(buff, 0, bytesRead);
                bytesRead = data.Read(buff, 0, buff.Length);
            }
        }
    }

In addition add an class in this project with the following content:

 internal class CompletedAsyncResult<T> : IAsyncResult
{
    T data;

    public CompletedAsyncResult(T data)
    { this.data = data; }

    public T Data
    { get { return data; } }

    #region IAsyncResult Members
    public object AsyncState
    { get { return (object)data; } }

    public WaitHandle AsyncWaitHandle
    { get { throw new Exception("The method or operation is not implemented."); } }

    public bool CompletedSynchronously
    { get { return true; } }

    public bool IsCompleted
    { get { return true; } }
    #endregion
}


internal class AsyncResultNoResult : IAsyncResult
{
    // Fields set at construction which never change while 
    // operation is pending
    private readonly AsyncCallback m_AsyncCallback;
    private readonly Object m_AsyncState;

    // Fields set at construction which do change after 
    // operation completes
    private const Int32 c_StatePending = 0;
    private const Int32 c_StateCompletedSynchronously = 1;
    private const Int32 c_StateCompletedAsynchronously = 2;
    private Int32 m_CompletedState = c_StatePending;

    // Field that may or may not get set depending on usage
    private ManualResetEvent m_AsyncWaitHandle;

    // Fields set when operation completes
    private Exception m_exception;

    public AsyncResultNoResult(AsyncCallback asyncCallback, Object state)
    {
        m_AsyncCallback = asyncCallback;
        m_AsyncState = state;
    }

    public void SetAsCompleted(
       Exception exception, Boolean completedSynchronously)
    {
        // Passing null for exception means no error occurred. 
        // This is the common case
        m_exception = exception;

        // The m_CompletedState field MUST be set prior calling the callback
        Int32 prevState = Interlocked.Exchange(ref m_CompletedState,
           completedSynchronously ? c_StateCompletedSynchronously :
           c_StateCompletedAsynchronously);
        if (prevState != c_StatePending)
            throw new InvalidOperationException(
                "You can set a result only once");

        // If the event exists, set it
        if (m_AsyncWaitHandle != null) m_AsyncWaitHandle.Set();

        // If a callback method was set, call it
        if (m_AsyncCallback != null) m_AsyncCallback(this);
    }

    public void EndInvoke()
    {
        // This method assumes that only 1 thread calls EndInvoke 
        // for this object
        if (!IsCompleted)
        {
            // If the operation isn't done, wait for it
            AsyncWaitHandle.WaitOne();
            AsyncWaitHandle.Close();
            m_AsyncWaitHandle = null;  // Allow early GC
        }

        // Operation is done: if an exception occured, throw it
        if (m_exception != null) throw m_exception;
    }

    #region Implementation of IAsyncResult
    public Object AsyncState { get { return m_AsyncState; } }

    public Boolean CompletedSynchronously
    {
        get
        {
            return Thread.VolatileRead(ref m_CompletedState) ==
                c_StateCompletedSynchronously;
        }
    }

    public WaitHandle AsyncWaitHandle
    {
        get
        {
            if (m_AsyncWaitHandle == null)
            {
                Boolean done = IsCompleted;
                ManualResetEvent mre = new ManualResetEvent(done);
                if (Interlocked.CompareExchange(ref m_AsyncWaitHandle,
                   mre, null) != null)
                {
                    // Another thread created this object's event; dispose 
                    // the event we just created
                    mre.Close();
                }
                else
                {
                    if (!done && IsCompleted)
                    {
                        // If the operation wasn't done when we created 
                        // the event but now it is done, set the event
                        m_AsyncWaitHandle.Set();
                    }
                }
            }
            return m_AsyncWaitHandle;
        }
    }

    public Boolean IsCompleted
    {
        get
        {
            return Thread.VolatileRead(ref m_CompletedState) !=
                c_StatePending;
        }
    }
    #endregion
}

internal class AsyncResult<TResult> : AsyncResultNoResult
{
    // Field set when operation completes
    private TResult m_result = default(TResult);

    public AsyncResult(AsyncCallback asyncCallback, Object state) :
        base(asyncCallback, state) { }

    public void SetAsCompleted(TResult result,
       Boolean completedSynchronously)
    {
        // Save the asynchronous operation's result
        m_result = result;

        // Tell the base class that the operation completed 
        // sucessfully (no exception)
        base.SetAsCompleted(null, completedSynchronously);
    }

    new public TResult EndInvoke()
    {
        base.EndInvoke(); // Wait until operation has completed 
        return m_result;  // Return the result (if above didn't throw)
    }
}

Then the client impl:

try
        {
            //string txtDescription = "Test";
            string txtFileName = "Invoice_50000.xml";

            //byte[] fileToSend = File.ReadAllBytes(txtFileName)

            // Create the REST request.
            string url = "http://localhost:8000/Service1/";//ConfigurationManager.AppSettings["serviceUrl"];
            //string requestUrl = string.Format("{0}/Upload/{1}/{2}", url, System.IO.Path.GetFileName(txtFileName), txtDescription);

            /* Asynchronous */
            string requestUrl = string.Format("{0}/asyncupload/", url);

            HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(requestUrl); 

            using (FileStream inputStream = File.Open(txtFileName, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
            {

                //new BufferedStream
                //new Buffer                

                request.SendChunked = true;
                request.AllowWriteStreamBuffering = false;
                request.Method = "PUT";
                request.ContentType = "application/octet-stream";
                //request.ContentType = MediaTypeNames.Application.Octet
                request.ContentLength = inputStream.Length;

                /* BEGIN: Solution with chunks */
                // 64 KB buffer
                byte[] chunkBuffer = new byte[0x10000];
                Stream st = request.GetRequestStream();

                // as the file is streamed up in chunks, the server will be processing the file
                int bytesRead = inputStream.Read(chunkBuffer, 0, chunkBuffer.Length);
                while (bytesRead > 0)
                {
                    st.Write(chunkBuffer, 0, bytesRead);
                    bytesRead = inputStream.Read(chunkBuffer, 0, chunkBuffer.Length);
                }
                st.Close();
            }

            try
            {
                HttpWebResponse resp = (HttpWebResponse)request.GetResponse();
                Console.WriteLine("HTTP/{0} {1} {2}", resp.ProtocolVersion, (int)resp.StatusCode, resp.StatusDescription);
                resp.Close();
            }
            catch (System.Exception)
            {
                //TODO: error handling here.
            }
            /* END: Solution with chunks */
        }
Naha
  • 506
  • 6
  • 14
  • This is an interesting approach. The problem is that I don't know anything beforehand and that I want to gradually flow data into the stream as it comes in from our external source, in this case libspotify. Which sends 8192 bytes of PCM data per cycle, that has to be converted to MP3 and then written to the output stream. This is the data delivery callback from libspotify: https://developer.spotify.com/docs/libspotify/12.1.51/structsp__session__callbacks.html#a33a31478b8de1882ad7847ad033fbaeb – Feanaro Apr 10 '15 at 15:32