5

My previous question on the same theme: C#: Asynchronous NamedPipeServerStream understanding Now I have next:

private void StartListeningPipes()
{
    try
    {
        isPipeWorking = true;
                namedPipeServerStream = new NamedPipeServerStream(PIPENAME, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous, BUFFERSIZE, BUFFERSIZE);
                Console.Write("Waiting for client connection...");
                while(isPipeWorking)
                {
            IAsyncResult asyncResult = namedPipeServerStream.BeginWaitForConnection(this.WaitForConnectionAsyncCallback, null);
                        Thread.Sleep(3*1000);
                }
        }
        //// Catch the IOException that is raised if the pipe is broken or disconnected.
        catch (IOException e)
        {
        Console.WriteLine("IOException: {0}. Restart pipe server...", e.Message);
                StopListeningPipes();
                StartListeningPipes();
        }
        //// Catch ObjectDisposedException if server was stopped. Then do nothing.
        catch (ObjectDisposedException)
        {
        }
}

private void WaitForConnectionAsyncCallback(IAsyncResult result)
{
    try
    {
        namedPipeServerStream.EndWaitForConnection(result);
        Console.WriteLine("Client connected.");
        namedPipeServerStream.WaitForPipeDrain();
                byte[] buff = new byte[BUFFERSIZE];
                namedPipeServerStream.Read(buff, 0, BUFFERSIZE);
                string recStr = TrimNulls(buff);
                Array.Clear(buff, 0, buff.Length);
                Console.WriteLine();
                Console.WriteLine("'"+recStr+"'");
    }
    catch (Exception e)
    {
        Console.WriteLine("Error: " + e.Message);            
        }
}

But I'm getting

The pipe is being closed Exception everytime I receive a message from client

Why?

My client:

 using (NamedPipeClientStream pipeStream = new NamedPipeClientStream(General.PIPENAME))
{
    try
        {
        byte[] bytes = General.Iso88591Encoding.GetBytes(sendingMessage);
                pipeStream.Write(bytes, 0, bytes.Length);
                pipeStream.Flush();
                pipeStream.WaitForPipeDrain();
        }
        catch (TimeoutException)
        {
        Console.WriteLine("Timeout error!");
        }
    catch (Exception e)
        {
        Console.WriteLine(string.Format("Error! ", e.Message));
        }
}

Final code at the moment is:

/// <summary>
        /// Create new NamedPipeServerStream for listening to pipe client connection
        /// </summary>
        private void ListenForPipeClients()
        {
            if (!this.isListeningToClients)
                return;

            try
            {
                PipeSecurity ps = new PipeSecurity();
                PipeAccessRule par = new PipeAccessRule("Everyone", PipeAccessRights.ReadWrite, System.Security.AccessControl.AccessControlType.Allow);
                ps.AddAccessRule(par);
                pipeClientConnection = new NamedPipeServerStream(General.PIPENAME, PipeDirection.InOut, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous, General.BUFFERSIZE, General.BUFFERSIZE, ps);
                Console.Write("Waiting for client connection...");
                /*namedPipeServerStream.WaitForConnection();
                OnPipeConnected(namedPipeServerStream);*/
                IAsyncResult result = pipeClientConnection.BeginWaitForConnection(OnPipeConnected, pipeClientConnection);
            }
            catch (ObjectDisposedException)
            {
                //// Catch ObjectDisposedException if server was stopped. Then do nothing.
            }
            catch (Exception e)
            {
                Console.WriteLine("Error occures: {0}. Restart pipe server...", e.Message);
                this.logger.Add(LogLevel.Warning, string.Format("Error occures: {0}. Restart pipe server...", e.Message));
                ListenForPipeClients();
            }
        }

        /// <summary>
        /// Async callback on client connected action
        /// </summary>
        /// <param name="asyncResult">Async result</param>
        private void OnPipeConnected(IAsyncResult asyncResult)
        {
            using (var conn = (NamedPipeServerStream)asyncResult.AsyncState)
            {
                try
                {
                    conn.EndWaitForConnection(asyncResult);
                    Console.WriteLine("Client connected.");
                    PipeClientConnection clientConnection = new PipeClientConnection(conn, notifierSenderCache, defaultStorageTime);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                    this.logger.Add(LogLevel.Warning, e.Message);
                }
            }

            ListenForPipeClients();
        }
Community
  • 1
  • 1
Ksice
  • 3,277
  • 9
  • 43
  • 67
  • Where do you get that exception? Btw, you are calling BeginWaitForConnection every three seconds which basically is a memory leak if connections come in less frequently. – usr Jul 12 '12 at 11:19
  • Good suggestion! Thanks! I think to use WaitForConnection() instead async callback – Ksice Jul 12 '12 at 11:26
  • Btw. In case if I want to use BeginWaitForConnection(), then what I have to do to to prevent the waste of memory? – Ksice Jul 12 '12 at 11:27
  • @Ksice: see my updated answer for using BeginWaitForConnection(). – ShdNx Jul 13 '12 at 12:26

2 Answers2

9

It appears that you need a separate NamedPipeServerStream for each client. (Note that I was not the one to discover this, see the other answers.) I'd imagine the working server-side would look something like this (draft code):

while(this.isServerRunning)
{
     var pipeClientConnection = new NamedPipeServerStream(...);

     try
     {
         pipeClientConnection.WaitForConnection();
     }
     catch(...)
     {
         ...
         continue;
     }

     ThreadPool.QueueUserWorkItem(state =>
          {
               // we need a separate variable here, so as not to make the lambda capture the pipeClientConnection variable, which is not recommended in multi-threaded scenarios
               using(var pipeClientConn = (NamedPipeServerStream)state)
               {
                    // do stuff
                    ...
               }
          }, pipeClientConnection);
}

As a side note, as it was pointed out in a comment to your question, you're wasting memory with initiating a new async call every 3 seconds by calling BeginWaitForConnection in a loop (the only case where this wouldn't waste memory is when new connections are made in intervals smaller than 3 seconds, but I doubt that you can know this for sure). You see, basically every 3 seconds you're initiating a new async call, regardless of whether the last one is still pending or has completed. Furthermore, it - once again - does not take into account that you need a separate NamedPipeServerStream for each client.

To fix this issue, you need to eliminate the loop, and "chain" the BeginWaitForConnection calls using the callback method. This is a similar pattern you'll see quite often in async I/O when using .NET. Draft code:

private void StartListeningPipes()
{
    if(!this.isServerRunning)
    {
        return;
    }

    var pipeClientConnection = new NamedPipeServerStream(...);

    try
    {
        pipeClientConnection.BeginWaitForConnection(asyncResult =>
            {
                // note that the body of the lambda is not part of the outer try... catch block!
                using(var conn = (NamedPipeServerStream)asyncResult.AsyncState)
                {
                    try
                    {
                        conn.EndWaitForConnection(asyncResult);
                    }
                    catch(...)
                    {
                        ...
                    }

                    // we have a connection established, time to wait for new ones while this thread does its business with the client
                    // this may look like a recursive call, but it is not: remember, we're in a lambda expression
                    // if this bothers you, just export the lambda into a named private method, like you did in your question
                    StartListeningPipes();

                    // do business with the client
                    conn.WaitForPipeDrain();
                    ...
                }
            }, pipeClientConnection);
    }
    catch(...)
    {
        ...
    }
}

The control flow will be something like this:

  • [main thread] StartListeningPipes(): created NamedPipeServerStream, initiated BeginWaitForConnection()
  • [threadpool thread 1] client #1 connecting, BeginWaitForConnection() callback: EndWaitForConnection() then StartListeningPipes()
  • [threadpool thread 1] StartListeningPipes(): created new NamedPipeServerStream, BeginWaitForConnection() call
  • [threadpool thread 1] back to the BeginWaitForConnection() callback: getting down to business with the connected client (#1)
  • [threadpool thread 2] client #2 connecting, BeginWaitForConnection() callback: ...
  • ...

I think that this is a lot more difficult than using blocking I/O - in fact, I'm not quite certain I got it right, please point it out if you see any mistakes - and it's also a lot more confusing.

To pause the server in either examples, you obviously would set the this.isServerRunning flag to false.

jgillich
  • 71,459
  • 6
  • 57
  • 85
ShdNx
  • 3,172
  • 5
  • 40
  • 47
  • The option why I'm actually using async callback it's because I will probably do some long operation inside of WaitForConnectionAsyncCallback. So I don't want clients waiting while this operation will be completed – Ksice Jul 12 '12 at 11:31
  • -1 There should be a different NamedPipeServerStream instance for each client. Calling BeginWaitForConnection repeatedly on the same instance, wherever you do it, is wrong. – Chris Dickson Jul 12 '12 at 13:01
  • Removed the -1, following yur edit, which gives a good template, though I would name the NamedPipeServerStream variables differently - they don't refer to the pipe server (which is the entire server program), they refer to a read/write stream wrapped around the server end of a single pipe instance connected to one client, so perhaps "clientConnection". – Chris Dickson Jul 13 '12 at 10:31
  • @ChrisDickson: thank you, you're right, I modified my answer. – ShdNx Jul 13 '12 at 11:49
  • Thanks for good example. But in case of this 'recursive' StartListeningPipes function I have "All pipe instances are busy." IOException while creating of new NamedPipeServerStream. Any ideas what to do with this? Looks like I need to create a copy of existing NamedPipeServerStream and working with it... – Ksice Jul 16 '12 at 07:34
  • See me my addition in the question text above. – Ksice Jul 16 '12 at 14:01
  • Now it works. But not asynchronously, obviously. If I will do as you said and in OnPipeConnected(IAsyncResult asyncResult) will place ListenForPipeClients(); before creating new PipeClientConnection, then this "All pipe instances are busy." will happen – Ksice Jul 16 '12 at 14:04
  • I.e. while I'm working with my named pipe in PipeClientConnection, I can't create new NamedPipeServerStream. Looks so. – Ksice Jul 16 '12 at 14:05
2

Ok. Stupied me. There should be one NamedPipeServerStream for each client. So if Async operation was completed, then have to recreate NamedPipeServerStream. Thanks this Multithreaded NamePipeServer in C#

Should be:

while(isPipeWorking)
            {
                IAsyncResult asyncResult = namedPipeServerStream.BeginWaitForConnection(this.WaitForConnectionAsyncCallback, null);
                Thread.Sleep(3*1000);
                if (asyncResult.IsCompleted)
                {
                    RestartPipeServer();
                    break;
                }
            }
Community
  • 1
  • 1
Ksice
  • 3,277
  • 9
  • 43
  • 67