2

I've got a problem i've been struggling with for a few days now and couldn't find any explanation with debugging or searching the internet. So as a last resort i'm asking here.

I have a simple named pipe server written in c++ and a NamedPipeStreamClient in c# running on Windows. Before trying to make a protocol that passes the message size in bytes in front of the message i wanted to try and use the message mode to seperate individual messages.

The relevant code fragments are:

Creation of the C++ NamedPipe

hOutputPipe = CreateNamedPipeA(
    lOutputPipeName,               // pipe name
    PIPE_ACCESS_OUTBOUND,         // only write access
    PIPE_TYPE_MESSAGE |         // message type pipe
        PIPE_READMODE_MESSAGE | // message-read mode
        PIPE_WAIT,              // blocking mode
    PIPE_UNLIMITED_INSTANCES,   // max. instances
    1024,                    // output buffer size
    1024,                    // input buffer size
    0,                          // client time-out
    NULL);                      // default security attribute
if (hOutputPipe == INVALID_HANDLE_VALUE)
{
    std::cout << "CreateNamedPipe failed, GLE=" << GetLastError() << std::endl;
    return -1;
}

// Wait for the client to connect; if it succeeds,
// the function returns a nonzero value. If the function
// returns zero, GetLastError returns ERROR_PIPE_CONNECTED.

BOOL fConnected = ConnectNamedPipe(hOutputPipe, NULL) ? TRUE : (GetLastError() == ERROR_PIPE_CONNECTED);

if (!fConnected)
{
    // The client could not connect, so close the pipe.
    CloseHandle(hOutputPipe);
    return -1;
}

Creation of C# NamedPipeClientStream:

fromagent_pipe = new NamedPipeClientStream(".", pipeName + "_output", PipeDirection.In);
// timeout after 2 seconds to prevent blocking
fromagent_pipe.Connect(2000);

fromagent_pipe.ReadMode = PipeTransmissionMode.Message;

C++ Funciton called in a while(true) loop:

bool writeOutputMessage(HANDLE hPipe, const CubesExample::OutputMessage& outputMessage)
{
    size_t messageBytes = outputMessage.ByteSizeLong();
    char buffer[messageBytes];
    DWORD bytesWritten = 0;
    outputMessage.SerializeToArray(&buffer, messageBytes);

    std::cout << std::string("Writing ") + std::to_string(messageBytes) + " bytes" << std::endl;

    BOOL fSuccess = WriteFile(
        hPipe,        // handle to pipe
        buffer,     // buffer to write from
        messageBytes, // number of bytes to write
        &bytesWritten,   // number of bytes written
        NULL);        // not overlapped I/O
    if (!fSuccess || bytesWritten != messageBytes)
    {
        std::cout << "InstanceThread WriteFile failed, GLE=" << GetLastError() << std::endl;
        return false;
    }

    return true;
}

C# method for reading a full message from the pipe and returning the byte[]:

public byte[] readOutputMessage()
{        
    int offset = 0;
    int readBytes = 0;
    do{
        readBytes = fromagent_pipe.Read(inputBuffer, offset, 1);
        offset++;
        Debug.Log("Reading from output pipe! isComplete: " + fromagent_pipe.IsMessageComplete + " readBytes: " + readBytes);
    }while(!fromagent_pipe.IsMessageComplete && readBytes > 0);

    Debug.Log("Read " + offset + " bytes from agent pipe");

    byte[] buffer = new byte[offset];
    Array.Copy(inputBuffer, buffer, offset);
    return buffer;
}

The C# method above is run in a Task<byte[]> so the main thread isn't blocked when waiting for PipeStream.Read(). The inputBuffer is a field in the class where this code is located and has a size of 4096 so i don't have to allocate it before every read.

The problem is now, that fromagent_pipe.IsMessageComplete is never set to true, no matter how many bytes it reads. The message size i'm sending is 7 bytes for reference so i would expect the do while loop to iterate 7 times and after the 7nth byte is read IsMessageComplete should be set to true, right? I'm new to Named Pipes and IPC so i might be missing sth obvious but it's drivig me crazy bc i set every possible flag from the docs and use the IsMessageComplete flag just like other people on the internet but mine just never seems to switch to true.

Another piece of info is that c++ server runs much faster than the c# loop so the pipe consumes data slower than it gets it. I will eventually discard all messages delivered between single Reads but for now i can't even get one message read.

PS. Before someone points it out, yes the inputBuffer does overflow after it reads 4096 bytes. I want to get the IsMessageComplete problem out of the way before dealing with reseting it.

Thanks to anyone reading in advance and ... pls send help

user2466076
  • 95
  • 1
  • 12

1 Answers1

3

I was able to get message mode to work by setting the pipe direction as duplex. This accepted answer explains why I had to do that.

This is the quick example I whipped up. I've provided both an asynchronous and synchronous example on the C# side. I did not provide an asynchronous example using OVERLAPPED on the C++ side.

Server

int main( ) 
{
    std::cout << "Press any key to begin\n";
    std::cin.ignore( );

    const auto print_error{ [ ]( DWORD code ) 
    { 
        const std::error_code ec{ static_cast<int>( code ),
            std::system_category( ) };

        std::cerr << ec.message( ) << '\n';
    } };

    const auto pipe{ CreateNamedPipeA( 
        R"(\\.\pipe\testpipe)",
        PIPE_ACCESS_DUPLEX,       
        PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE | PIPE_WAIT,              
        PIPE_UNLIMITED_INSTANCES,  
        1024,                    
        1024,                    
        0,                          
        nullptr ) };

    if ( pipe == INVALID_HANDLE_VALUE ) 
    {
        print_error( GetLastError( ) );
        return EXIT_FAILURE;
    }

    if ( !ConnectNamedPipe( pipe, nullptr ) ) 
    {        
        if ( const auto ec{ GetLastError( ) }; ec != ERROR_PIPE_CONNECTED ) 
        {
            print_error( ec );
            CloseHandle( pipe );
            return EXIT_FAILURE;
        }        
    }

    std::string_view message{ "Hello friend!" };
    const auto length{ static_cast<DWORD>( message.size( ) ) };

    std::cout << "Sending message\n";

    DWORD sent{ 0 };
    const auto result{ WriteFile(
        pipe, message.data( ), length , &sent, nullptr ) };

    if ( !result || sent != length ) 
    {
        print_error( GetLastError( ) );
        CloseHandle( pipe );
        return EXIT_FAILURE;
    }

    std::cout << "Message sent, press enter to exit\n";
    std::cin.ignore( );

    std::cout << "Closing handle\n";
    CloseHandle( pipe );
}

Synchronous Client

static void Main( string[ ] args )
{
    Console.WriteLine( "Press any key to begin" );
    Console.ReadKey( );

    using var client = new NamedPipeClientStream( ".", "testpipe", PipeDirection.InOut );
    Console.WriteLine( "Connecting to server" );

    client.Connect( 5000 );
    client.ReadMode = PipeTransmissionMode.Message;

    Console.WriteLine( "Connected" );

    var buffer = new byte[ 1024 ];
    var nBytes = 0;

    do
    {
        nBytes += client.Read( buffer.AsSpan( )[ nBytes.. ] );

    } while ( !client.IsMessageComplete );


    var message = Encoding.ASCII.GetString( buffer.AsSpan( )[ ..nBytes ] );
    Console.WriteLine( $"Received message: {message}" );

    Console.WriteLine( "Press any key to exit" );
    Console.ReadKey( );
}

Asynchronous Client

static async Task Main( string[ ] args )
{
    Console.WriteLine( "Press any key to begin" );
    Console.ReadKey( );

    using var client = new NamedPipeClientStream( ".", "testpipe", PipeDirection.InOut );
    Console.WriteLine( "Connecting to server" );

    using ( var cts = new CancellationTokenSource( TimeSpan.FromSeconds( 5 ) ) )
    {
        await client.ConnectAsync( cts.Token ).ConfigureAwait( false );
        client.ReadMode = PipeTransmissionMode.Message;
    }
        
    Console.WriteLine( "Connected" );

    var buffer = new byte[ 1024 ];
    var nBytes = 0;

    do
    {                
        nBytes += await client.ReadAsync( 
            buffer.AsMemory( )[ nBytes.. ] ).ConfigureAwait( false );

    } while ( !client.IsMessageComplete );


    var message = Encoding.ASCII.GetString( buffer.AsSpan( )[ ..nBytes ] );
    Console.WriteLine( $"Received message: {message}" );

    Console.WriteLine( "Press any key to exit" );
    Console.ReadKey( );
}
WBuck
  • 5,162
  • 2
  • 25
  • 36