Search This Blog

Sunday, October 17, 2010

Publish-Subscribe Communication

Summary: Simple example showing how to implement a communication scenario where one application sends notifications to other applications.


Introduction
The scenario where one application needs to inform other applications about some particular events is very common.
The application that wants to be informed about events subscribes to get notification messages from the publishing application. Then, when the event occurs, the publishing application notifies all subscribed applications.



The example bellow shows how to implement this scenario with using Eneter Messaging Framework.
 (Full, not limited and for non-commercial usage free version of the framework can be downloaded from www.eneter.net. The online help for developers can be found at http://www.eneter.net/OnlineHelp/EneterMessagingFramework/Index.html)

Publisher
The publishing application in our example is very simple. It provides just three events that are invoked when the button is clicked.


To publish these events the publisher uses the Broker component. The broker component receives messages via the input channel and forwards them to all subscribed receivers.
In our scenario we want, the broker receives the message when the button is clicked and then forwards it to subscribed receivers.
Therefore we need that our broker can communicate two ways:
  • Internally - to receive internal messages (from the same process) when the 'Notify' button is clicked.
  • Interprocess - to register/unregister subscribers and to forward notifications to subscribers.
To enable the communication via 2 different channels, we can use the Dispatcher component.
The dispatcher receives messages and forwards them to all attached receivers. In our case the dispatcher receives messages from two channels (internal messages and interprocess messages) and forwards them to only one receiver - Broker.


The whole implementation is very simple:

using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.Infrastructure.ConnectionProvider;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.SynchronousMessagingSystem;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;
using Eneter.Messaging.Nodes.Dispatcher;

namespace Publisher
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public DateTime CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }

        public Form1()
        {
            InitializeComponent();

            // Create internal messaging system that will be used for the internal communication.
            IMessagingSystemFactory anInternalMessaging = new SynchronousMessagingSystemFactory();


            //*******************************************************************************
            // Create communication components that will model our communication scenario
            // -> DuplexBroker: to forward notification messages to subscribers
            // -> DuplexDispatcher: to connect the broker with messaging systems: Tcp, Internal
            // -> BrokerClient: to send notifications from this application to the broker
            //*******************************************************************************

            // Create broker responsible for forwarding notification messages.
            IDuplexBrokerFactory aBrokerFactory = new DuplexBrokerFactory(new XmlStringSerializer());
            myBroker = aBrokerFactory.CreateBroker();
            
            // Create broker client used to send notification messages from this application.
            myBrokerClient = aBrokerFactory.CreateBrokerClient();

            // Create Dispatcher that will receive messages from Tcp messaging and also from the internal
            // messaging and send them to the broker.
            // Note: Internal messages will come from myBrokerClient
            //       Tcp messages will come from other applications subscribed for notifications.
            IDuplexDispatcherFactory aDispatcherFactory = new DuplexDispatcherFactory(anInternalMessaging);
            myDispatcher = aDispatcherFactory.CreateDuplexDispatcher();


            //*******************************************************************************
            // "Click" communication components together
            // -> connect the broker with the dispatcher
            // -> connect the dispatcher with the 'broker client'
            // -> connect the dispatcher to Tcp messaging
            //*******************************************************************************

            // Create helper to create connections for 'internal messaging'.
            // Note: It internally creates input/output channels and connect them to components.
            IConnectionProviderFactory aConnectionProviderFactory = new ConnectionProviderFactory();
            IConnectionProvider aConnectionProvider = aConnectionProviderFactory.CreateConnectionProvider(anInternalMessaging);

            // Connect the broker to the internal messaging via the duplex input channel.
            aConnectionProvider.Attach(myBroker, "MyBrokerChannelId");

            // Tell dispatcher to send messages to the broker.
            myDispatcher.AddDuplexOutputChannel("MyBrokerChannelId");

            // Connect dispatcher with the 'broker client'.
            aConnectionProvider.Connect(myDispatcher, myBrokerClient, "MyInternalDispatcherChannelId");

            // Create Tcp messaging for the communication with subscribed applications.
            IMessagingSystemFactory aMessaging = new TcpMessagingSystemFactory();

            // Attach Tcp duplex input channel to the dispatcher and start listening.
            IDuplexInputChannel aTcpInputChannel = aMessaging.CreateDuplexInputChannel("127.0.0.1:8091");
            myDispatcher.AttachDuplexInputChannel(aTcpInputChannel);
        }

        // Correctly close listening to Tcp messages.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            myDispatcher.DetachDuplexInputChannel();
        }

        // Send NotifyMsg1
        private void Notify1Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg1 aMsg = new NotifyMsg1();
            aMsg.CurrentTime = DateTime.Now;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg1>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg1", aSerializedMsg);
        }

        // Send NotifyMsg2
        private void Notify2Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg2 aMsg = new NotifyMsg2();
            aMsg.Number = 12345;

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg2>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg2", aSerializedMsg);
        }

        // Send NotifyMsg3
        private void Notify3Btn_Click(object sender, EventArgs e)
        {
            NotifyMsg3 aMsg = new NotifyMsg3();
            aMsg.TextMessage = "My notifying text message.";

            object aSerializedMsg = mySerializer.Serialize<NotifyMsg3>(aMsg);

            myBrokerClient.SendMessage("MyNotifyMsg3", aSerializedMsg);
        }


        // Broker used to forward messages to subscribers.
        private IDuplexBroker myBroker;

        private IDuplexDispatcher myDispatcher;

        // Broker client is used to send messages to the broker,
        // that forwards messages to subscribers.
        private IDuplexBrokerClient myBrokerClient;

        // Serializer used to serialize notification messages.
        // Note: It is possible to use BinarySerializer too.
        //       In that case the messages would have to be declared in a
        //       separate library that would be linked by subscribers too.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();
    }
}

Subscriber
The subscribing application in this example is very simple too. It can subscribe and unsubscribe via the button and simply displays the incoming notification messages.


To subscribe, unsubscribe and receive messages the subscriber uses the BrokerClient component.



The whole implementation is very simple:
using System;
using System.Windows.Forms;
using Eneter.Messaging.DataProcessing.Serializing;
using Eneter.Messaging.MessagingSystems.MessagingSystemBase;
using Eneter.Messaging.MessagingSystems.TcpMessagingSystem;
using Eneter.Messaging.Nodes.Broker;

namespace Subscriber
{
    public partial class Form1 : Form
    {
        // Notification message 1
        public class NotifyMsg1
        {
            public DateTime CurrentTime { get; set; }
        }

        // Notification message 2
        public class NotifyMsg2
        {
            public int Number { get; set; }
        }

        // Notification message 3
        public class NotifyMsg3
        {
            public string TextMessage { get; set; }
        }

        public Form1()
        {
            InitializeComponent();

            // Create the broker client that will receive notification messages.
            IDuplexBrokerFactory aBrokerFactory = new DuplexBrokerFactory(new XmlStringSerializer());
            myBrokerClient = aBrokerFactory.CreateBrokerClient();
            myBrokerClient.BrokerMessageReceived += OnNotificationMessageReceived;

            // Create the Tcp messaging for the communication with the publisher.
            // Note: For the interprocess communication you can use: Tcp, NamedPipes and Http.
            IMessagingSystemFactory aMessagingFactory = new TcpMessagingSystemFactory();

            // Create duplex output channel for the communication with the publisher.
            // Note: The duplex output channel can send requests and receive responses.
            //       In our case, the broker client will send requests to subscribe/unsubscribe
            //       and receive notifications as response messages.
            IDuplexOutputChannel anOutputChannel = aMessagingFactory.CreateDuplexOutputChannel("127.0.0.1:8091");

            // Attach the output channel to the broker client
            myBrokerClient.AttachDuplexOutputChannel(anOutputChannel);
        }

        // Correctly close the communication.
        // Note: If the communication is not correctly closed, the thread listening to
        //       response messages will not be closed.
        private void Form1_FormClosed(object sender, FormClosedEventArgs e)
        {
            myBrokerClient.DetachDuplexOutputChannel();
        }

        // Method processing notification messages from the publisher.
        private void OnNotificationMessageReceived(object sender, BrokerMessageReceivedEventArgs e)
        {
            // The notification event does not come in UI thread.
            // Therefore, if we want to work with UI controls we must execute it in the UI thread.
            InvokeInUIThread(() =>
                {
                    if (e.ReceivingError == null)
                    {
                        if (e.MessageTypeId == "MyNotifyMsg1")
                        {
                            NotifyMsg1 aDeserializedMsg = mySerializer.Deserialize<NotifyMsg1>(e.Message);
                            Received1TextBox.Text = aDeserializedMsg.CurrentTime.ToString();
                        }
                        else if (e.MessageTypeId == "MyNotifyMsg2")
                        {
                            NotifyMsg2 aDeserializedMsg = mySerializer.Deserialize<NotifyMsg2>(e.Message);
                            Received2TextBox.Text = aDeserializedMsg.Number.ToString();
                        }
                        else if (e.MessageTypeId == "MyNotifyMsg3")
                        {
                            NotifyMsg3 aDeserializedMsg = mySerializer.Deserialize<NotifyMsg3>(e.Message);
                            Received3TextBox.Text = aDeserializedMsg.TextMessage;
                        }
                    }
                });
        }

        // Subscribe to notification message 1
        private void Subscribe1Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg1");
        }

        // Unsubscribe from notification message 1
        private void Unsubscribe1Btn_Click(object sender, EventArgs e)
        {
            Received1TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg1");
        }

        // Subscribe to notification message 2
        private void Subscribe2Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg2");
        }

        // Unsubscribe from notification message 2
        private void Unsubscribe2Btn_Click(object sender, EventArgs e)
        {
            Received2TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg2");
        }

        // Subscribe to notification message 3
        private void Subscribe3Btn_Click(object sender, EventArgs e)
        {
            myBrokerClient.Subscribe("MyNotifyMsg3");
        }

        // Unsubscribe from notification message 3
        private void Unsubscribe3Btn_Click(object sender, EventArgs e)
        {
            Received3TextBox.Text = "";
            myBrokerClient.Unsubscribe("MyNotifyMsg3");
        }

        // Helper method to invoke some functionality in UI thread.
        private void InvokeInUIThread(Action uiMethod)
        {
            // If we are not in the UI thread then we must synchronize via the invoke mechanism.
            if (InvokeRequired)
            {
                Invoke(uiMethod);
            }
            else
            {
                uiMethod();
            }
        }


        // BrokerClient provides the communication with the broker.
        private IDuplexBrokerClient myBrokerClient;


        // Serializer used to sdeerialize notification messages.
        // Note: It is possible to use BinarySerializer too.
        //       In that case the messages would have to be declared in a
        //       separate library that would be linked by publisher too.
        private XmlStringSerializer mySerializer = new XmlStringSerializer();
    }
}

You can execute more subscribers and subscribe/unsubscribe them for desired events.


I hope you found the article helpful and if you have any questions about Eneter Messaging Framework feel free to ask.

6 comments:

  1. Antonis Kleanthous, email ask@logos.cy.netApril 25, 2012 at 10:54 AM

    I have tried to use this Publish-Subscribe Communication with MonitoredMessagingFactory. It does send the messages ok but does not reconnect in case of failure. Any samples on this?

    ReplyDelete
  2. Hi Antonis,
    Here is the example with the reconnect:
    http://www.eneter.net/Downloads/Examples/PublishSubscribeWithReconnect.zip

    The reconnect in case of the broker is a bit tricky.
    The point is, the broker is not stateless. It maintains for which notification messages clients are subscribed.
    In case of a disconnection, this information can be lost (e.g. if broker crashed).
    So, after the reconnect, the client must recover from the disconnection and subscribe for messages again.

    The modified example on the link above, uses MonitoredMessaging to detect the disconnection early. And then it uses the component Reconnecter to establish the connection again. When reconnected, the client subscribes again.
    Please refer to the link above for more details.

    ReplyDelete
  3. Hi Ondrej,

    Is it possible to make bidirectional communication between Publish and Subscribe?

    I mean can subscribe send message to publisher? if is it possible? please send me back an example.

    Thank you.

    ReplyDelete
    Replies
    1. Hi Adel,
      You can implement it the way that publisher subscribes in broker for a specific event too.
      E.g. on publisher side you can subscribe:
      myBrokerClient.Subscribe("Response_For_Publisher_xyz_Notified");

      Then when a subscriber wants to notify the publisher it can the message to the publisher by:
      myBrokerClient.SendMessage("Response_For_Publisher_xyz_Notified", aSerializedMessage);

      Also please notice the example in this article uses an older version of Eneter. The current version (latest version) also allows to publish events directly using DuplexBroker.
      E.g. instead of using Dispatcher (like in this article) you can use DuplexBroekr (myBroker) directly to publish and subscribe events.
      myBroker.Subscribe(...)
      myBroker.SendMessage(...)

      I hope my response will help.

      Delete
    2. Hi Ondrej,

      Thank you so much for you quick replay, I will implement this example using the new functionality using DuplexBroker, I will post the source once I finish to take a look if there are any change.

      Thank you Ondrej.

      Delete
  4. Hi Ondrej,
    I am trying to use your example eneter reconnect program(mentioned above) in Visual studio 2015. Everything is ok, but it still shows me that the ConnectionProvider does not exist in the Eneter.Messaging.Infrastructure. Is there any option that I can fix it? Thank you for your reply. Best regrats Michal S.

    ReplyDelete