Software Experts
Call Us: +40 770 613 713 | EU
13th Jan 2015 | by: cetus

Event Publishing/Subscribing

The event publishing and subscribing mechanism was introduced for several reasons:

  • To allow the communication between our current applications hosted in separate servers (web application, REST API, Scheduler)
  • To allow the breaking of current sequential processes of high complexity into smaller and parallel tasks
  • To implement the distributed cache

The Published/Subscriber Broker

The broker is in fact a service which manages launched events. In our case the broker is Active MQ (http://activemq.apache.org, Windows version), installed as a Windows service.

The broker is interchangeable, being based on the message queue protocol.

The broker receives events from publishers in the form of content as bytes for a certain topic. The broker then searches for all subscribers to the given topic and forwards the message to them.

The Published/Subscriber Client

The used publisher/subscriber client is the M2MQTT library (http://m2mqtt.codeplex.com). Below you can find a simple example of publishing and subscribing to an event.

Publisher

Subscriber

The needed elements for publishing/subscribing to an event are:

  1. The server name (address) – in our case this is retrieved through the MQUtils class from the application configuration file where it is kept along with three other configuration data components:
  2. The client id – in our case the client id is generated each time as a GUID, to avoid name clashes
  3. The target topic – the topic to which we publish the event
  4. The content in bytes
  5. The qos level – how many times to publish the event
  6. The retain value – if this is true then the subscribers will receive a published event the next time they connect
  • The qos level has to be the same for both the publishers and the subscribers.
  • Avoid using the retain feature in order to keep the broker light in consumed resources.

MQ Events

An event is defined by implementing the IEntityEvent<T> interface.

The Entity property will contain the data to send in the event and the Topic property provides the topic to publish to.

Example:

The topic has to be the type of the event combined with the type of the data contained. This structure allows us to rebuild the event data on the subscriber’s side.

Publishing Events

The publishing is done through the EventPublisher service in two steps:

  1. Publishing the event directly to local subscribers, not through the broker. The subscribers are identified as classes which implement the IConsumer<T1> interface where T1 will be a class implementing the IEntity interface.

  2. Publishing the event to other applications through the broker:

The two step process is needed in order to ensure the current application treats the event as soon as possible, by avoiding the delay introduced by going through the broker.

In step 2, there two things to consider:

  1. The data is serialized so complex objects can be passed through the broker.
  2. The topic has a specific structure which will be discussed when describing the subscriber mechanism.

For ease of use there is an extension class which makes it easier to publish events, such as in the case of the EntityInserted event:

Event Subscriptions

Subscribing to events is done each time an application is started using a listener. For now there is a single listener (CacheMQListener) which handles cache clearing for various events and it will be used as an example henceforth.

The subscribing is performed in the constructor:

We will focus on the topics to which the subscriber will listen. The list of topics is automatically generated based on the current server and the current application.

A topic has the following structure:

The current application is excluded in order to avoid subscribing to its own events through the broker. The current application will treat the event itself before sending it to the broker, so it does not need to subscribe to them through the broker.

An event is received in the following method:

The event processing follows these steps:

  1. The event topic is parsed to get the server, application name, the event type and the event entity type. We check that the event was not sent from the current application (theoretically not possible, but the check is made nonetheless).

  2. Recreate the event entity object by use of reflection:

  3. Recreate the entity event by use of reflection:

  4. Get the event consumers by use of reflection:

  5. Publish the event to the consumers by use of reflection:

  • Reflection is heavily used in processing an event in order to allow working with complex objects. Avoid making changes to the methods used in the process.

Consumers

A consumer is determined by a class implementing the IConsumer interface. These classes will automatically be registered through Autofac when starting the application:

This further allows to find consumers for a certain event by use Autofac:

The IConsumer interface requires implementing a single method, which will handle the event.

Pub/Sub Showcase – Clear all cache

As a simple example for the pub/sub mechanism we have the event which clears the cache for all applications.

On launching, all applications will be subscribed to the CacheCleared entity event.

which can be used with the dummy entity class:

The consumer for this event does nothing else but clear the cache entirely for the current application:

Share This Post

About the Author: cetus

Leave a Reply

Your email address will not be published.