MQTT, DDS, ZMQ Matrix board application in C++

authors:Sam Laan
date:Jun 2020

Description

The matrix board C++ directory can be found in src/demonstrators/ComparedToOtherProtocols/Multiple/MatrixBoard/C++/. This directory contains the matrix board application in C++.

This matrix board application contains a similar implementation compared to the poc3 of the C implementation (Matrix Board Communication in C). This is therefore a dynamic implementation of the matrix board application.

The application was written for usage with real-time posix threads. It is also possible to run it on a non real-time posix system.

Dependencies

The dependencies necessary for successful compilation of the matrix board application in C++ are the following:

  • CMake
  • C++17 compiler (Gcc 7+ for example)
  • paho.mqtt.cpp library (installation: MQTT C++)
  • cppzmq library (installation: ZMQ C++)
  • Cyclone DDS library (installation: Setup Guide for CycloneDDS)
  • MQTT server compatible with MQTT 5 (mosquitto(installation: MQTT C++) for example)

Building

To build the application run the following commands in the matrix board application source directory.

mkdir build && cd build
cmake ..
make

These commands will compile the source code and generate an executable.

To build the ZMQ proxy run the following commands in the matrix board application source directory.

cd zmqproxy
mkdir build && cd build
cmake ..
make

These commands will compile the source code and generate an executable for the proxy.

Execution

Todo

Fix MQTT, DDS, ZMQ Matrix board application in C++.

The matrix board application sends out messages each 24 milliseconds, and is also receiving messages from two other boards at this pace. Each time a message is sent or received, the message and a timestamp are written to a file. This is done by creating a thread so that the main applications performance will be affected minimally by writing these messages. The application may crash after a while due to the high number of threads created to write measurements. An error will occur because no further threads can be started at that point. If you think you can go ahead i wish you the best of luck!

The application needs to be started as super user, otherwise the starting of the posix threads whill fail due to their attributes. The first parameter of the program determines which protocol to use. The value of this parameter can be either DDS, ZMQ or MQTT. The number of parameters after the first one differs per protocol. To start the application with DDS the following parameter structure needs to be followed:

sudo ./rt_matrixboard DDS <Matrixboard ID>

In practice this means that matrix board one can be started by running the following command:

sudo ./rt_matrixboard DDS 1

When starting the application with ZMQ more parameters are needed. This can be done using the following parameter structure.

sudo ./rt_matrixboard ZMQ <Matrixboard ID> <Server sub address> <Server pub address> <Own address>

In practice this means that matrix board one for example, can be started by running the following command:

sudo ./rt_matrixboard ZMQ 1 tcp://192.168.178.32:5550 tcp://192.168.178.32:5551 tcp://192.168.178.39:5552

Besides this, for the communication to work using ZMQ the proxy should be started. The zmq proxy can be started form the build folder of the zmqproxy folder. Connect the socket according to the ports and address used for the proxy. The broxy can be started by running the following command:

./matrixboardproxy

When starting the application with MQTT more parameters are needed. This can be done using the following parameter structure.

sudo ./rt_matrixboard MQTT <Matrixboard ID> <Broker address> <QoS>

In practice this means that matrix board one for example, can be started by running the following command:

sudo ./rt_matrixboard MQTT 1 tcp://192.168.178.32:1883 0

For the MQTT implementation to work a broker must have been started. Connect the matrix board application according to the network address of the broker. When using the mosquitto broker the following terminal command can be used to start it:

mosquitto

This terminal will show connecting clients. If no clients pop up when starting the matrix board application but the communication still works please check if there is another mosquitto instance running. If the clients don’t pop up and the communication does not work check the address of the broker with the address parameter of the application.

A more thorough description of the parameters can be found when executing the application with a wrong number of parameters or can be read from the source of main.cpp.

Implementation

The figure below shows the design of the application in a class diagram.

sudo ./rt_matrixboard DDS <Matrixboard ID>

MatrixBoard "1" *-- "1" Display
MatrixBoard "1" *-- "1" Sensor
MatrixBoard "1" *-- "4" Light
MatrixBoard "1" --> "1" Communication
Communication "1" --> "1" Test
Communication <|-- ZMQCommunication
Communication <|-- MQTTCommunication
Communication <|-- DDSCommunication

class Test {
    - _measurements_file_name: string
    - _write_mutex: mutex
    + PerformanceMeasurements()
    + void initFile(): void
    + void WriteMeasurement(measurements): void
    + void WriteToFile(string): void
    + getMiliFromTime(): std::string
    + WriteThread(): void
}
class MatrixBoard {
    - _id : int
    - _next_boards : std::map<int, int>
    - _last_message_time_board1 : time_point
    - _last_message_time_board2 : time_point
    + MatrixBoard(communication, id)
    + run(): void
    + addBoard(id, address): void
    + deleteBoard(id): void
    + determineSubscribtion(): void
    + determineSign(): void
    + trafficMessagerThread(thread_arg): static void *
    + handleNetworkMessageThread(thread_arg): static void *
    + handleTrafficMessageThread(thread_arg): static void *
    + livelinessCheckerThread(thread_arg): static void *
}
class Communication {
    - _message_id: atomic<int>
    + sendNetworkMessafe(Network_Message): void
    + sendTrafficMessafe(Traffic_Message): void
    + receive():  message
    + subscribe():  void
    + unsubscribe():  void
    + getMessageID(): int
}
class Display {
    - _sign: Sign
    + setSign(sign): void
    + getSign(): Sign
}
class Sensor {
    + measure(): int
}
class Light {
    - _blinking: atomic<bool>
    + on() : void
    + off() : void
    + blink(): void
    + stopBlink(): void
}

class DDSCommunication {
    - _dds_participant : dds_entity_t
    - _writer : dds_entity_t
    - _network_writer : dds_entity_t
    - _board1_reader : dds_entity_t
    - _board2_reader : dds_entity_t
    - _network_reader : dds_entity_t
    - _topic_network : dds_entity_t
    - _topic_write : dds_entity_t
    - _topic_board1 : dds_entity_t
    - _topic_board2 : dds_entity_t
    - _topic_board1_name : string
    - _topic_board2_name : string
    + sendTrafficMessage(Traffic_Message): void
    + sendNetworkMessage(Network_Message): Message
    + connectToNetwork(Network_Message, id): void
    + subscribe(topic) : void
    + unsubscribe(topic) : void
    + receive(): message
}

class MQTTCommunication {
    - _qos : int
    - _client : mqtt::async_client
    - _topic_network : mqtt::topic
    - _topic_write : mqtt::topic
    + sendTrafficMessage(Traffic_Message): void
    + sendNetworkMessage(Network_Message): Message
    + connectToNetwork(Network_Message, id): void
    + receive(): message
    + stringToTrafficStruct(string) : Traffic_Message
    + stringToNetworkStruct(string) : Network_Message
    + trafficStructToString(Traffic_Message) : string
    + networkStructToString(Network_Message) : string
    + subscribe(topic) : void
    + unsubscribe(topic) : void
}

class ZMQCommunication {
    - _server_address : string
    - _server_publish_address : string
    - _own_address : string
    - _topic_write : string
    - _topic_network : string
    - _context : zmq::context_t
    - _publish_socket_network : zmq::socket_t
    - _publish_socket : zmq::socket_t
    - _subscribe_socket : zmq::socket_t
    + sendTrafficMessage(Traffic_Message): void
    + sendNetworkMessage(Network_Message): Message
    + connectToNetwork(Network_Message, id): void
    + subscribe(topic) : void
    + unsubscribe(topic) : void
    + receive(): message
    + generateMessageFromString(string) : zmq::message_t
    + generateMessageFromStruct(messageStruct) : zmq::message_t
}

The communication of the boards is done through a few topics. There is one network topic, each matrix board is subscribed to this topic. All messages concerning the connection of matrix boards are posted on this topic. This makes it possible for the boards to know which other boards are within the network so that they may subscribe accordingly. For publishing traffic intensity values each matrix board has a topic equal to its ID. To receive the traffic intensity values of a board other boards will subscribe to this topic. To receive the messages the matrix board makes use of the communication interface class. furthermore, this is also used to subscribe and unsubscribe from topics. The protocol which the matrix board is going to use is determined before its initialization. Based on a variable passed in the application either a DDS, MQTT or a ZMQ communication object will be initialised. This will be passed to the matrix board, which now uses this protocol for its communication.

Each protocol object implements the communication interface functions. However, there is, of course, some difference amongst the implementation. For connecting to the network ZMQ connects to the proxy, this proxy is used as an intermediary which all networking messages will be posted to. This allows ZMQ to detect other endpoints in the network to which they can connect. Other than ZMQ, MQTT connects to a broker, this is also an intermediary but all the MQTT messages go to this intermediary, therefore it only has to connect to this broker. Next DDS, has endpoint discovery built-in and connects directly to its subscriptions, it does need a unique reader and writer for each topic to read or write from, these are implemented. This also means that the DDS implementation has no single point of failure as it does not depend on any intermediary. On the contrary, if the broker breaks the MQTT implementation will not be able to communicate anymore. Next, in the ZMQ implementation discovery will be impossible if the proxy breaks down. However, sockets already connected can still communicate. Another thing taken from this is the di erence of connecting, while the MQTT connects to a broker, ZMQ connects through sockets and DDS connects through the protocol itself. Besides this difference, there is also a difference in the type of message sent. While DDS takes structs as messages the others do not. To keep the output of the communication the same struct is used, as DDS uses structs naturally, the used structs were the structs generated for DDS using an IDL fi le. This IDL file can be found in the source directory of the application. However, the other protocols implement functions to turn these structs into the types they need to send messages and the other way around for the application.

The matrix board class implements all the functionality of the matrix board application. It starts a thread to send out its traffic intensity and a thread to check the liveliness of its subscriptions. The traffic messages are sent with a minimal gap of 24 milliseconds. For the liveliness, subscriptions which the board has not received messages from for two seconds or longer are deemed disconnected. The board will notify the rest of the system of this disconnection by sending a message to a common topic.

The main thread will handle the received messages. If a network message is received a thread will be started to handle the message, the main thread can receive a new message while the network message is still being processed. In addition, the same is true for traffic messages. The network message processing thread determines whether a subscription should be added, deleted or if there is no action to be taken based on the contents of the message and the current subscriptions. On the other hand, the traffic thread updates the traffic intensity values of the subscriptions, it also determines which sign should be shown based on the updated value. To set the displayed sign, it makes use of the display object. If there is a board to display, this will be set and the lights will blink, this is done using the light objects.

The application uses the test class to write their measurements to a file. These measurements are all timing measurements, the high-resolution clock is used to get the most precise measurements.