In the last post, I presented how to wrap std::deque to make it thread-safe. The resulting class could then be used to feed a worker thread with jobs. Now I would like to go a step further and use the very same class to write a very small (and limited) threading library. The goal is to create threads that can communicate by messages and, when receiving one, start a process according to its type. This small lib should be very easy to use from a user perspective and hide all the plumbing!

In order to understand this post and its associated code, it’s advised to have read my previous post!
The code and a working example is available on my GitHub.

First implementation

Let’s get started! The TDequeConcurrent class discussed in my previous post will act as some kind of a mailbox, receiving messages from other threads. I will write an abstract class, called AThread, which will host the message queue and the std::thread instance that will wait for new messages then process them. User defined threading classes will have to expend this abstract class in order to benefit from its facilities and will also have to provide the various processing operations. Those will be called according to the type of the message to process. I will also provide a templated class, TMessage, which will carry the actual data along with all the plumbing necessary to call the right processing function. More on that later.

First, here is a class diagram showing these relationships.

class diagram

Note that AThread is dependent to IMessage. It’s because the message queue (_messages) will store instances of IMessage and polymorphism will be used to execute the right operation to process the carried data. As stated before, these operations are provided by the “threading classeswritten by the user. Thus the various TMessages have to be instantiated by those classes. And is there a better place to do so than inside a send operation? This operation will:

  1. Instantiate the message and provide it the operation to process it.
  2. Emplace the message in the message queue.

send() will instantiate template messages so it also need to be templated. But unfortunately this function cannot be part of the abstract class AThread. Because if it were, every type of message to be sent would lead to the generation of its corresponding send function, which would then be part of every derived user class. Unacceptable! But I also cannot ask the user to write this send function himself! So send is written as a macro!
Thus, a user class will require to inherit AThread and to put the macro in its public area.

I chose to call the macro I_AM_A_THREAD.

#define I_AM_A_THREAD 	    \
template<typename TDATA>    \
void send(TDATA&& data) {   \
    _messages.emplace_back(new TMessage<TDATA>(std::forward<TDATA>(data),   \
                           [this](const TDATA& msg) {                       \
                               process(msg);                                \
                           })                                               \
    ); \
}

Reading the code, you can see that two parameters are provided to the message’s constructor.

  1. The data to be carried by the message (of course provided by the caller)
  2. The operation to be called when executing (processing) the message.

This operation makes a call to process. It is up to the user to write a process function accepting all data types that can be received and processed. If he does not comply, his program will of course not link.
The message class itself stores the operation as a std::function member and calls it when executed.

template<typename TDATA>
class TMessage : public IMessage
{
public:
    // Interface to be implemented
    // Executes the message processor
    void execute(void) override	{
        _processor(_data);
    }
private:
    const TDATA _data;
    std::function<void(const TDATA&)>  _processor;
};

Now you can see the “grand scheme”:

An IMessage is stored in the message list. When it is retrieved by the working thread, a call is made to the virtual IMessage::execute() operation, which will call the concrete TMessage::execute() operation which will in turn call the processing function provided by the user.

Sequence diagram Sequence diagram showing a user defined UserClass1 sending a message to UserClass2

Some enhancements

Spawning threads that can receive and process messages is nice. But it would be nicer if we could have a mean to stop them…

In order to do so, I just check if the thread was asked to stop by testing a boolean after having processed a message. A nice way to use the brand new C++11 std::atomic<bool>: they will guaranty that the set/test operations are thread-safe!

auto mailbox = std::thread([this] {
    while (!_hasToStop) {
        auto message = _messages.pop_front();
        message->execute();
    }
    _isRunning = false;
});

The public stop operation will emplace a STOP message for itself! This message is placed in the front of the message queue so it will be processed as soon as possible.

struct STOP {};

void AThread::stop(void) noexcept
{
    STOP msgStop;
    _messages.emplace_front(new TMessage<STOP>(std::forward<STOP>(msgStop),
                            [this](const STOP& msg) {
                                process(msg);
                            }
    ));
}

void AThread::join(void) noexcept {
    _mailbox.join();
}

void AThread::process(STOP) noexcept {
    _hasToStop = true;
    _messages.clear();
}

Now that we can stop a thread, it can be joined afterwards and the user is given the tools to properly manage the life-time of his threads.

To sum up, all a user wanting to perform asynchronous work on data safely carried by messages has to do is:

  • Create a class that derive from AThread
  • Put the I_AM_A_THREAD macro in its public section
  • Write a process function for each type of message (data) that can be received

That’s all folks! Quite easy to use, thanks to C++11! 🙂

A complete example involving three working threads is available on my GitHub.