3

I have two apps, app1.cs and app2.cs (codes below). In addition I also have a dll I extracted from refer.cs(code below). When I compile app1.cs(which sends a measurement object) I get the following exception:

Unhandled Exception: RabbitMQ.Client.Exceptions.OperationInterruptioedException

I can't see how the connection is interrupted. Do you see where the problem is caused at?

Regards, Demi

//refer.cs from which refer.dll is created

using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;

namespace refer
{
    //start alternate serialization
    public static class AltSerialization
    {
        public static byte[] AltSerialize(Measurement m)
        {
         using (var ms = new MemoryStream())
            {
                var bf = new BinaryFormatter();
                bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
                bf.Serialize(ms, m);
                return ms.GetBuffer();
            }
        }

        public static Measurement AltDeSerialize(byte[] seriM)   
        {
        using (var stream = new MemoryStream( seriM ))
            {
                BinaryFormatter bf = new BinaryFormatter();
                bf.AssemblyFormat = System.Runtime.Serialization.Formatters.FormatterAssemblyStyle.Simple;
                return (Measurement)bf.Deserialize(stream);           
            }
        }
    }
    //end alternte serialization

    [Serializable] //This attribute sets class to be serialized
    public class Measurement : ISerializable
    {             
        [NonSerialized] public int id;
        public int time; //timestamp
        public double value;

        public Measurement()
        {
            id = 1;
            time = 12;
            value = 0.01;
        }

        public Measurement(int _id, int _time, double _value)
        {
            id = _id;
            time = _time;
            value = _value;
        }

        //Deserialization constructor   
        public Measurement(SerializationInfo info, StreamingContext ctxt)
        {
            //Assign the values from info to the approporiate properties   
            Console.WriteLine("DeSerialization construtor called.");
            time = (int)info.GetValue("MeasurementTime", typeof(int));
            value = (double)info.GetValue("MeasurementValue", typeof(double));
        }

       //Serialization function   
        public void GetObjectData(SerializationInfo info, StreamingContext ctxt)
        {
            // Custom name-value pair
            // Values must be read with the same name they're written       
            info.AddValue("MeasurementTime", time);
            info.AddValue("MeasurementValue", value);
        }
    }
}

//MB1.cs

using System;
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using UtilityMeasurement;

public interface IMessageBus
{    
string MsgSys       // Property 1
{
    get;
    set;
}

void write (Measurement m1);
Measurement read();
void publish(string queue);   
void subscribe(string queue);   
}

public class Rabbit : IMessageBus
{   
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();


public void write ( Measurement m1 )
{
    byte[] body = Measurement.AltSerialize( m1 );

    IConnection connection = factory.CreateConnection();
    IModel channel = connection.CreateModel();

    foreach (string queue in publishQ) 
    {
        channel.BasicPublish("", queue, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", queue);
    }
}

public void publish(string queueName)
{       
    channel.QueueDeclare(queueName, true, false, false, null); //durable=true
    publishQ.Add(queueName); //and, add it the list of queue names to publish to
}

public Measurement read() 
{
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    foreach (string queue in subscribeQ) 
    {
        channel.BasicConsume(queue, true, consumer);
    }   
    System.Console.WriteLine(" [*] Waiting for messages." +
                            "To exit press CTRL+C");
    BasicDeliverEventArgs ea = 
        (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    return Measurement.AltDeSerialize(ea.Body);
}

public void subscribe(string queueName)
{
    channel.QueueDeclare(queueName, true, false, false, null);
    subscribeQ.Add(queueName);
}

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

public Rabbit(string _msgSys) //Constructor
{
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "localhost"; 

    System.Console.WriteLine("\nMsgSys: RabbitMQ");
    MsgSys = _msgSys;
}
}

public class Zmq : IMessageBus
{
public void write ( Measurement m1 )
{
    //
}
public Measurement read() 
{
    //
    return null;
}
public void publish(string queue)
{
//
}
public void subscribe(string queue)
{
//      
}   

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

// Implementation of methods for Zmq class go here
public Zmq(string _msgSys) //Constructor
{
    System.Console.WriteLine("ZMQ");
    MsgSys = _msgSys;
}
} 

public class MessageBusFactory
{
public static IMessageBus GetMessageBus(string MsgSysName)
{
    switch ( MsgSysName )
    {
        case "Zmq":
            return new Zmq(MsgSysName);
        case "Rabbit":
            return new Rabbit(MsgSysName);
        default:
            throw new ArgumentException("Messaging type " +
                MsgSysName + " not supported." );
    }
}
}

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Create a new Measurement message
    Measurement m1 = new Measurement(2, 2345, 23.456);

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    System.Console.WriteLine("With Test message:\n    ID: {0}", m1.id);
    System.Console.WriteLine("    Time: {0}", m1.time);
    System.Console.WriteLine("    Value: {0}", m1.value);

    // Ask queue name and store it
    System.Console.WriteLine("Enter a queue name to publish the message to: ");
    string QueueName = (System.Console.ReadLine()).ToString();
    obj1.publish( QueueName );

    System.Console.WriteLine("Enter another queue name: ");
    QueueName = (System.Console.ReadLine()).ToString();
    obj1.publish( QueueName );

    // Write message to the queue
    obj1.write( m1 ); 

}
}

//MB2.cs

using System; 
using System.IO;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.Runtime.Serialization.Formatters.Binary;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using UtilityMeasurement;

public interface IMessageBus
{    
string MsgSys       // Property 1
{
    get;
    set;
}

void write (Measurement m1);
Measurement read();
void publish(string queue);   
void subscribe(string queue);   
}

public class Rabbit : IMessageBus
{   
// Implementation of methods for Rabbit class go here
private List<string> publishQ = new List<string>();
private List<string> subscribeQ = new List<string>();


public void write ( Measurement m1 )
{
    byte[] body = Measurement.AltSerialize( m1 );

    IConnection connection = factory.CreateConnection();
    IModel channel = connection.CreateModel();

    foreach (string queue in publishQ) 
    {
        channel.BasicPublish("", queue, null, body);
        Console.WriteLine("\n  [x] Sent to queue {0}.", queue);
    }
}

public void publish(string queueName)
{       
    channel.QueueDeclare(queueName, true, false, false, null); //durable=true
    publishQ.Add(queueName); //and, add it the list of queue names to publish to
}

public Measurement read() 
{
    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
    foreach (string queue in subscribeQ) 
    {
        channel.BasicConsume(queue, true, consumer);
    }   
    System.Console.WriteLine(" [*] Waiting for messages." +
                            "To exit press CTRL+C");
    BasicDeliverEventArgs ea = 
        (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    return Measurement.AltDeSerialize(ea.Body);
}

public void subscribe(string queueName)
{
    channel.QueueDeclare(queueName, true, false, false, null);
    subscribeQ.Add(queueName);
}

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

public Rabbit(string _msgSys) //Constructor
{
    ConnectionFactory factory = new ConnectionFactory();
    factory.HostName = "localhost"; 

    System.Console.WriteLine("\nMsgSys: RabbitMQ");
    MsgSys = _msgSys;
}
}


public class Zmq : IMessageBus
{
public void write ( Measurement m1 )
{
    //
}
public Measurement read() 
{
    //
    return null;
}
public void publish(string queue)
{
//
}
public void subscribe(string queue)
{
//      
}   

public static string MsgSysName;
public string MsgSys
{
    get 
    { 
        return MsgSysName;
    }
    set
    {
        MsgSysName = value;
    }
}

// Implementation of methods for Zmq class go here
public Zmq(string _msgSys) //Constructor
{
    System.Console.WriteLine("ZMQ");
    MsgSys = _msgSys;
}
} 

public class MessageBusFactory
{
public static IMessageBus GetMessageBus(string MsgSysName)
{
    switch ( MsgSysName )
    {
        case "Zmq":
            return new Zmq(MsgSysName);
        case "Rabbit":
            return new Rabbit(MsgSysName);
        default:
            throw new ArgumentException("Messaging type " +
                MsgSysName + " not supported." );
    }
}
}

public class MainClass
{
    public static void Main()
    {
    //Asks for the message system
    System.Console.WriteLine("\nEnter name of messageing system: ");
    System.Console.WriteLine("Usage: [Rabbit] [Zmq]");
    string MsgSysName = (System.Console.ReadLine()).ToString();

    //Declare an IMessageBus instance:
    //Here, an object of the corresponding Message System
        // (ex. Rabbit, Zmq, etc) is instantiated
    IMessageBus obj1 = MessageBusFactory.GetMessageBus(MsgSysName);

    System.Console.WriteLine("\nA {0} object is now created.", MsgSysName);

    System.Console.WriteLine("Enter a queue to subscribe to: ");
    string QueueName = (System.Console.ReadLine()).ToString();
    obj1.subscribe( QueueName );

    //Create a new Measurement object m2
    Measurement m2 = new Measurement(); 

    //Read message into m2
    m2 = obj1.read();
    m2.id = 11;
    System.Console.WriteLine("\nMessage received from queue {0}:\n    ID: {1}",QueueName, m2.id);
    System.Console.WriteLine("    Time: {0}", m2.time);
    System.Console.WriteLine("    Value: {0}", m2.value);
}
}
Demi
  • 318
  • 1
  • 4
  • 13

1 Answers1

2

I just created a vanilla C# VS2010 Console application project with the Refer.cs and App1.cs in the same project.

I made the following changes:

  • Added RabbitMQ.Client.dll
  • Removed the AssemblyVersion attributes
  • Added string[] args to the Main method in App1.cs

Also, I changed:

factory.HostName = "localhost";

To this:

factory.HostName = "192.168.56.101";

Which is the ip address to my VirtualBox Ubuntu VM running rabbitmq-server. There was no exception thrown, and the message successfully was received on the server.

All signs point to server configuration with what is given. My guess is either your rabbitmq-server is not running at all, it's not running on localhost, or there is some kind of connectivity issue with port 5672.

karlgrz
  • 14,485
  • 12
  • 47
  • 58
  • @KG I removed the AssemblyVersion attributes and it worked. I add the RabbitMQ.Client dll when I compile both app1.cs and app2.cs(I am working on command line). So, thanks much! But, I don't see why that could be a problem? – Demi Jul 12 '11 at 15:03
  • @Demi the project AssemblyInfo already has an AssemblyVersion attribute in it. You shouldn't even be able to compile that if it exists in two places. If your project --doesn't-- have AssemblyInfo.cs, you wouldn't have a conflict. Are you using Visual Studio? – karlgrz Jul 12 '11 at 17:56
  • @KG Will do. On a similar note, I encapsulated the send and receive codes into a class, created a constructor and put the declaration for factory, connection, channel in this constructor hoping that the object that will be created out of app1 and app2 each to work on that connection. However, when I try to use factory, connection and channel in the subscribe and publish methods, I got an error saying it doesn't know those attributes. How can you make connection, hostname etc public to the class so that methods in the class exploit this declaration? – Demi Jul 12 '11 at 19:14
  • @KG I just did. I left refer.cs just for the record. Thanks for looking into my code in advance. – Demi Jul 12 '11 at 19:47