Modern C++ new concurrency API

Yesterday I decided to play a bit with the new Concurrency API offered by C++11 and later versions, aka “Modern C++“. This API lies just a bit higher than pthread, but is low enough to offer the possibility to fine tune our programs according to our target.

Although day to day I’m mostly working on embedded systems, I’m not familiar in writing multi-threaded programs using such a low level API. I either write my unit-test programs using frameworks such as Qt, which come with very high level abstractions. Or I’m relying on real-time OSes, usually providing sets of API which hide the ugly truth of inter-task synchronization on bare-bone micro-controllers.

Playing with C++11 concurrency API is a good opportunity to learn something new, so here we go!
You can grab the code presented here on my Github, along with a tester.

First, I need a good way for a master thread to provide works to a worker thread. A perfect way to do this is to use a queue that will act as a FIFO. But I would like to be able to put higher priority work on top of the stack at any time, so a simple queue wont be enough. Thus I will use some kind of deque, allowing me to add new items on both ends of the collection.

As the STL collections are not thread-safe, lets build a wrapper around std::deque!

Wrapping std::queue

The first work to do before coding a class shared between threads is to design its interface. The tighter, the better!
According to my whishes, I need:

  • An operation to put an item on the back of the collection.
    The normal way to add more jobs.
  • An operation to put an item on the front of the collection.
    The way to add a more priority job.
  • An operation to “pop” (read and erase) the next item. In my case, poping from the front of the collection.

The writing operations will be called in the producer (master) thread, while the operation to pop the next element will be used in the consumer (worker) thread.

I choose to use and wrap the emplace operations of the STL collection.The use of variadic templates will allow me to forward them an undefined number of parameters and keep the same interface.

template<typename T>
class TQueueConcurrent
{
public:
    template<typename... Args>
    void emplace_front( Args&&... args );

    template<typename... Args>
    void emplace_back( Args&&... args );

    T pop_front( void ) noexcept;
};

The integrity of the storage has to be guarantied thus I’ll have to use a mutex before modifying the data.

The pop_front operation will return the front content and remove it just after, so it will wrap std::deque::front and std::deque::pop.  The mutex will have to be used before reading the content, but more is required. Indeed, if the collection is empty, the wrapped function will throw an exception. Thus I will use a condition to guaranty that it never happen. If the deque is empty, the function will waits (blocks) on the condition, until an emplace operation notifies that some new data is available.

T pop_front( void ) noexcept
{
    std::unique_lock<std::mutex> lock{_mutex}; /* acquiring the mutex */
    while (_collection.empty()) {
        _condNewData.wait(lock);  /* waiting to be notified of new data */
    }

    auto elem = std::move(_collection.front()); /* moving the front element */
    _collection.pop_front();                    /* which is immediately deleted */

    return elem;
}

Note that std::condition_variable::wait() will pause the operation and unlock the mutex until the waited condition is notified. Thus, during the wait, other threads can of course acquire the mutex.

The following writing operations are kinda symmetric. After locking the mutex, the element is “emplaced” then the condition is notified. This will allow pop_front to resume if it was waiting in the consumer thread.

template<typename... Args>
void emplace_back( Args&&... args )
{
  std::unique_lock<std::mutex> lock{ _mutex };              /* Acquires the mutex */
  _collection.emplace_back( std::forward<Args>(args)... );  /* Forward the param to the */
  lock.unlock();                                            /* operation on the wrapped collection */
  _condNewData.notify_one();                                /* Notifies that a new item */
}                                                           /* has been stored */

emplace_front will be very similar to emplace_back. The only line to change is the call to _collection.emplace_back() that will get replaced by a call to _collection.emplace_front().

But duplicating code is never a good thing. So here come a bit of factorization!

The idea is to write a function template composed of all the redundant code. The only difference being the call to the non-thread-safe writing operation, it will be passed as a parameter.

template<class F>
void addData_protected(F&& addData_unprotected)
{
    std::unique_lock<std::mutex> lock{ _mutex };
    addData_unprotected();
    lock.unlock();
    _condNewData.notify_one();
}

Concerning the new version of emplace_back, I will use a lambda to inform the addData_unprotected parameter.

template<typename... Args>
void emplace_back( Args&&... args )
{
    addData_protected( [&] {
        _collection.emplace_back(std::forward<Args>(args)...);
    } );
}

Note: Using [&] as a closure allows me to access collection from within the lamba expression.

Way better! And now we have the possibility to wrap any unprotected operation of the std::deque! The only thing to modify is the call inside the lambda!

It will also be easy to reuse with any of the STL’ collection!

Don’t miss the next post, were I discuss building a simple threading library upon this collection!

All the code

#include <deque>
#include <mutex>
#include <condition_variable>

/** @brief  A templated *thread-safe* collection based on dequeue
            pop_front() waits for the notification of a filling method if the collection is empty.
            The various "emplace" operations are factorized by using the generic "addData_protected".
            This generic asks for a concrete operation to use, which can be passed as a lambda.
**/
template< typename T >
class TQueueConcurrent {

    using const_iterator = typename std::deque<T>::const_iterator;

public:
    //! @brief Emplaces a new instance of T in front of the deque

    template<typename... Args>
    void emplace_front( Args&&... args )
    {
        addData_protected( [&] {
            _collection.emplace_front(std::forward<Args>(args)...);
        } );
    }

    /** @brief Emplaces a new instance of T at the back of the deque **/
    template<typename... Args>
    void emplace_back( Args&&... args )
    {
        addData_protected( [&] {
            _collection.emplace_back(std::forward<Args>(args)...);
        } );
    }

    /** @brief  Returns the front element and removes it from the collection    
                No exception is ever returned as we garanty that the deque is not empty
                before trying to return data.
    **/
    T pop_front( void ) noexcept
    {
        std::unique_lock<std::mutex> lock{_mutex};
        while (_collection.empty()) {
            _condNewData.wait(lock);
        }
        auto elem = std::move(_collection.front());
        _collection.pop_front();
        return elem;
    }

private:

    /** @brief  Protects the deque, calls the provided function and notifies the presence of new data
        @param  The concrete operation to be used. It MUST be an operation which will add data to the deque,
                as it will notify that new data are available!
    **/
    template<class F>
    void addData_protected(F&& fct)
    {
        std::unique_lock<std::mutex> lock{ _mutex };
        fct();
        lock.unlock();
        _condNewData.notify_one();
    }

    std::deque<T> _collection;                     ///< Concrete, not thread safe, storage.

    std::mutex   _mutex;                    ///< Mutex protecting the concrete storage

    std::condition_variable _condNewData;   ///< Condition used to notify that new data are available.

};