event
Development

Event Publishing and Subscribing with ActiveMQ and M2MQTT

Event Publishing/Subscribing

The event publishing and subscribing mechanism was introduced for several reasons:

  • To allow the communication between our current applications hosted in separate servers (web application, REST API, Scheduler)
  • To allow the breaking of current sequential processes of high complexity into smaller and parallel tasks
  • To implement the distributed cache

The Published/Subscriber Broker

The broker is in fact a service which manages launched events. In our case the broker is Active MQ (http://activemq.apache.org, Windows version), installed as a Windows service.

The broker is interchangeable, being based on the message queue protocol.

The broker receives events from publishers in the form of content as bytes for a certain topic. The broker then searches for all subscribers to the given topic and forwards the message to them.

The Published/Subscriber Client

The used publisher/subscriber client is the M2MQTT library (http://m2mqtt.codeplex.com). Below you can find a simple example of publishing and subscribing to an event.

Publisher

MqttClient client = new MqttClient(“server”);
try
{
  //after the message is published disconnect the client
  client.MqttMsgPublished += (sender, e) =>
  {
     (sender as MqttClient).Disconnect();
  };
  client.Connect(TheClientId);
 
  //finally we publish it
  client.Publish(TargetTopic, contentBytes, MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
}

Subscriber

MqttClient client = new MqttClient(“server”);
_client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
_client.Connect(TheClientId);
_client.Subscribe(new string[] { TargetTopic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });

The needed elements for publishing/subscribing to an event are:

  • The server name (address) – in our case this is retrieved through the MQUtils class from the application configuration file where it is kept along with three other configuration data components:  
<add key="MQServer" value="localhost" />
<add key="MQAppName" value="WebApp" />
<add key="MQServerList" value="localhost" />
<add key="MQAppList" value="Scheduler,WebApp,Api" />
  • The client id – in our case the client id is generated each time as a GUID, to avoid name clashes
  • The target topic – the topic to which we publish the event
  • The content in bytes
  • The qos level – how many times to publish the event
  • The retain value – if this is true then the subscribers will receive a published event the next time they connect

– The qos level has to be the same for both the publishers and the subscribers.
– Avoid using the retain feature in order to keep the broker light in consumed resources.

MQ Events

An event is defined by implementing the IEntityEvent<T> interface.

/// <summary>
/// Should be implemented by any entity event to be launched and used in the pub/sub mechanism.
/// <para>CRITICAL: Avoid changing the class definition as in the pub/sub system
/// reflection is used to process the events which are sent through the MQ broker.</para>
/// </summary>
/// <typeparam name="T"></typeparam>
public interface IEntityEvent<T> where T : BaseEntity
{
  /// <summary>
  /// The MQ topic which describes the event.
  /// Should usually be something like:
  /// "entity event type fullname/entity type full name"
  /// </summary>
  string Topic { get; }
  /// <summary>
  /// The entity for which the event was launched.
  /// </summary>
  T Entity { get; }
}

The Entity property will contain the data to send in the event and the Topic property provides the topic to publish to.

Example:

/// <summary>
/// A container for entities that have been inserted.
/// </summary>
/// <typeparam name="T"></typeparam>
[DataContract]
[Serializable]
public class EntityInserted<T> : IEntityEvent<T> where T: BaseEntity
{
  public EntityInserted(T entity)
  {
    this.Entity = entity;
  }
  public T Entity { get; private set; }
  public string Topic
  {
    get
    {
      return typeof(EntityInserted<>).FullName + MQUtils.MQ_SEPARATOR + typeof(T).FullName;
    }
  }
}

The topic has to be the type of the event combined with the type of the data contained. This structure allows us to rebuild the event data on the subscriber’s side.

Publishing Events

The publishing is done through the EventPublisher service in two steps:

  • Publishing the event directly to local subscribers, not through the broker. The subscribers are identified as classes which implement the IConsumer<T1> interface where T1 will be a class implementing the IEntity interface.
try
{
  var subs = _subscriptionService.GetSubscriptions<T2, T1>();
  subs.ToList().ForEach(x => PublishToConsumer(x, entityEvent));
}
catch (Exception e)
{
  _log.Error("Could not publish event {0} to current context: {1}", entityEvent.Topic, e.Message);
}
  • Publishing the event to other applications through the broker:
//publish the event to the mq broker
MqttClient client = new MqttClient(MQUtils.CurrentMQServer);
try
{
  string clientId = Guid.NewGuid().ToString();
  
  //after the message is published disconnect the client
  client.MqttMsgPublished += (sender, e) =>
  {
    (sender as MqttClient).Disconnect();
  };
 
  client.Connect(clientId);
 
  //we serialize the entity which will be sent to the mq broker
  var bytes = Utility.Serialize<T2>(entityEvent.Entity);
 
  // we get the topic to which to publish the event
  string topic = MQUtils.FullTopicName(entityEvent.Topic);
 
  //finally we publish it
  client.Publish(topic, bytes, MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE, false);
}
catch (Exception e)
{
  _log.Error("Could not publish event {0} to MQ broker: {1}", entityEvent.Topic, e.Message);
 
  //don't leave the client connected
  if (client != null && client.IsConnected)
  try { client.Disconnect(); }
  catch { }
}

The two step process is needed in order to ensure the current application treats the event as soon as possible, by avoiding the delay introduced by going through the broker.

In step 2, there two things to consider:

  1. The data is serialized so complex objects can be passed through the broker.
  2. The topic has a specific structure which will be discussed when describing the subscriber mechanism.

For ease of use there is an extension class which makes it easier to publish events, such as in the case of the EntityInserted event:

public static void EntityInserted<T>(this IEventPublisher eventPublisher, T entity) where T : BaseEntity
{
  eventPublisher.Publish<EntityInserted<T>, T>(new EntityInserted<T>(entity));
}

Event Subscriptions

Subscribing to events is done each time an application is started using a listener. For now there is a single listener (CacheMQListener) which handles cache clearing for various events and it will be used as an example henceforth.

The subscribing is performed in the constructor:

//the listener listens to the current mq server(which is set in the configuration file of the application)
_listener = new MqttClient(MQUtils.CurrentMQServer);
//always set the event listener before connecting
_listener.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
//use a randomly generated client id
//maybe change this in the future, not sure if useful,
//but consider the option nonetheless
string clientId = Guid.NewGuid().ToString();
_listener.Connect(clientId);
//we subscribe the listener to all allowed topics
//for example he is not allowed to access topics he launches himself
var topics = MQUtils.GetCurrentAppListenableTopics(IoC.Resolve<ITypeFinder>());
foreach (var topic in topics)
  _listener.Subscribe(new string[] { topic }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_LEAST_ONCE });

We will focus on the topics to which the subscriber will listen. The list of topics is automatically generated based on the current server and the current application.

A topic has the following structure:

originating server/originating application/entity event type/entity type

The current application is excluded in order to avoid subscribing to its own events through the broker. The current application will treat the event itself before sending it to the broker, so it does not need to subscribe to them through the broker.

An event is received in the following method:

void client_MqttMsgPublishReceived(object sender, uPLibrary.Networking.M2Mqtt.Messages.MqttMsgPublishEventArgs e)

The event processing follows these steps:

  • The event topic is parsed to get the server, application name, the event type and the event entity type. We check that the event was not sent from the current application (theoretically not possible, but the check is made nonetheless).
//parse the topic of the event to get the true topic and the server/app from which it originated
string server, appName;
Type eventType, entityType;
if (!MQUtils.ParseFullTopicName(e.Topic, out server, out appName, out eventType, out entityType))
{
  _log.Error("There was an error parsing the full topic: {0}.", e.Topic);
  return;
}
//if the event originated from the current server and app
//then we ignore it because it should have already been treated
//!!!this should not happen because we only subscribe to other apps' events
if (MQUtils.IsCurrentApp(server, appName))
{
  _log.Debug("Ignoring event {0} from current app ({1}/{2}).", e.Topic, MQUtils.CurrentMQServer, MQUtils.CurrentMQAppName);
  return;
}
  • Recreate the event entity object by use of reflection:
//we deserialize the object sent through the mq broker
object entityObj = typeof(Utility)
.GetMethod("Deserialize")
.MakeGenericMethod(entityType)
.Invoke(null, new object[] { e.Message });//the entity passed in the event
  • Recreate the entity event by use of reflection:
Type entityEventGenericType = eventType.MakeGenericType(new Type[]{ entityType });
//we create the entity event itself
object entityEventObj = Activator.CreateInstance(entityEventGenericType, entityObj);
  • Get the event consumers by use of reflection:
//we create the generic type of the EntityEvent for the received event
//we get the consumers subscribed to the received entity event
var method = _subscriptionService
.GetType()
.GetMethod("GetSubscriptions")
.MakeGenericMethod(entityType, entityEventGenericType);
object[] consumers = (object[])method.Invoke(_subscriptionService, null);
  • Publish the event to the consumers by use of reflection:
//we get the publishing method
var pubMethod = this
.GetType()
.GetMethod("PublishToConsumer")
.MakeGenericMethod(entityEventGenericType);
//we publish the entity event to its proper consumers
consumers.ToList().ForEach(x =>
{
  pubMethod.Invoke(this, new object[] {x, entityEventObj});
});

Reflection is heavily used in processing an event in order to allow working with complex objects. Avoid making changes to the methods used in the process.

Consumers

A consumer is determined by a class implementing the IConsumer interface. These classes will automatically be registered through Autofac when starting the application:

//Register event consumers
var consumers = typeFinder.FindClassesOfType(typeof(IConsumer<>)).ToList();
 
foreach (var consumer in consumers)
{
  builder.RegisterType(consumer).As(consumer.FindInterfaces((type, criteria) =>
  {
    var isMatch = type.IsGenericType && ((Type)criteria).IsAssignableFrom(type.GetGenericTypeDefinition());
    return isMatch;
  }, typeof(IConsumer<>)))
  .InstancePerHttpRequest();
}

This further allows to find consumers for a certain event by use Autofac:

	
EngineContext.Current.ResolveAll<IConsumer<EntityInserted<Customer>>>();

The IConsumer interface requires implementing a single method, which will handle the event.

Pub/Sub Showcase – Clear all cache

As a simple example for the pub/sub mechanism we have the event which clears the cache for all applications.

On launching, all applications will be subscribed to the CacheCleared entity event.

/// <summary>
/// Publish this event to clear the cache in all applications.
/// </summary>
[DataContract]
[Serializable]
public class CacheCleared<T> : IEntityEvent<T> where T : BaseEntity
{
  public CacheCleared(T entity)
  {
    this.Entity = entity;
  }
 
  public T Entity { get; private set; }
 
  public string Topic
  {
    get
    {
      return typeof(CacheCleared<>).FullName + MQUtils.MQ_SEPARATOR + typeof(T).FullName;
    }
  }
}

which can be used with the dummy entity class:

/// <summary>
/// Dummy entity class for the CacheCleared event.
/// </summary>
[DataContract]
[KnownType(typeof(ClearCacheEntity))]
public class ClearCacheEntity : BaseEntity
{
  public override int GetId()
  {
    return 0;
  }
}

The consumer for this event does nothing else but clear the cache entirely for the current application:

public class CacheClearedEventConsumer : CacheKeys, IConsumer<CacheCleared<ClearCacheEntity>>
{
  public void HandleEvent(CacheCleared<ClearCacheEntity> eventMessage)
  {
    new MemoryCacheManager().Clear();
  }
}