Threading.h#

MultiThreading tools, including abstract classes for tasks and scheduling, and adaptation of MoodyCamel’s BlockingConcurrentQueue.

Includes#

  • ../external/concurrent_queue/blockingconcurrentqueue.h

  • atomic

  • condition_variable

  • deque

  • mutex

  • sstream

  • thread

Included By#

Namespaces#

Classes#

Typedefs#

Source Code#

#pragma once
#include "../external/concurrent_queue/blockingconcurrentqueue.h"

#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <sstream>
#include <deque>

//  ASYNCHRONOUS FRAMEWORK: Framework async loader base etc //
namespace pvr {
namespace async {
typedef moodycamel::details::mpmc_sema::LightweightSemaphore Semaphore;

class Mutex
{
    Semaphore sema;

public:
    Mutex() : sema(1) {}
    void lock() { sema.wait(); }
    void unlock() { sema.signal(); }
    Semaphore& getSemaphore() { return sema; }
    const Semaphore& getSemaphore() const { return sema; }
};

typedef std::shared_ptr<Semaphore> SemaphorePtr;

template<typename T>
class IFrameworkCleanupObject
{
    bool _destroyed;

protected:
    IFrameworkCleanupObject() : _destroyed(false) {}

public:
    virtual ~IFrameworkCleanupObject() {}
    void cleanup()
    {
        if (!_destroyed)
        {
            cleanup_();
            _destroyed = true;
        }
    }

private:
    virtual void cleanup_() = 0;
};

template<typename T>
class IFrameworkAsyncResult : public IFrameworkCleanupObject<T>
{
public:
    typedef T ValueType;
    typedef std::shared_ptr<IFrameworkAsyncResult<T>> PointerType;
    typedef void (*Callback)(PointerType);

    bool isComplete() const { return _isComplete ? true : (_isComplete = isComplete_()); }
    ValueType get() { return get_(); }
    bool isSuccessful() const { return _successful; }

protected:
    Callback _completionCallback;
    std::atomic<bool> _inCallback;
    bool _successful;

    void setTheCallback(Callback completionCallback) { _completionCallback = completionCallback; }

    virtual void executeCallBack(PointerType thisPtr)
    {
        if (_completionCallback)
        {
            _inCallback = true;
            _completionCallback(thisPtr);
            _inCallback = false;
        }
    }

    IFrameworkAsyncResult() : _completionCallback(nullptr), _inCallback(false), _successful(false), _isComplete(false) {}

private:
    mutable bool _isComplete;

    virtual bool isComplete_() const = 0;
    virtual T get_() const = 0;
};

template<typename ValueType, typename FutureType, void (*worker)(FutureType)>
class AsyncScheduler
{
public:
    typedef std::shared_ptr<IFrameworkAsyncResult<ValueType>> AsyncResult;

    uint32_t getNumApproxQueuedItem() { return static_cast<uint32_t>(_queue.size()); }

    uint32_t getNumQueuedItems()
    {
        _queueSemaphore.wait();
        uint32_t retval = static_cast<uint32_t>(_queue.size());
        _queueSemaphore.signal();
        return retval;
    }

    virtual ~AsyncScheduler()
    {
        bool didit = false;
        _queueSemaphore.wait(); // protect the _done variable
        if (!_done)
        {
            didit = true;
            _done = true;
            _workSemaphore.signal();
        }
        _queueSemaphore.signal();
        if (didit) // Only join if it was actually running. Duh!
        { _thread.join(); } }

protected:
    AsyncScheduler()
    {
        _done = false;
        _thread = std::thread(&AsyncScheduler::run, this);
    }

    Semaphore _workSemaphore;
    Semaphore _queueSemaphore;
    std::deque<FutureType> _queue;
    std::string _myInfo;

private:
    std::thread _thread;
    std::atomic_bool _done;
    void run()
    {
        Log(LogLevel::Information,
            "%s : Asynchronous Scheduler starting. "
            "1 worker thread spawned. The worker thread will be sleeping as long as no work is being performed, "
            "and will be released when the async sheduler is destroyed.",
            _myInfo.c_str());

        std::string _str_0_unlock_ = (_myInfo + " : LOOP ENTER: Unlocking the queue.");
        std::string _str_1_wait_work_ = (_myInfo + " : Queue unlocked, waiting for work to be scheduled");
        std::string _str_2_queuelock_ = (_myInfo + " : Work scheduled, Obtaining queue lock");
        std::string _str_3_queueLocked_ = (_myInfo + " : Queue lock obtained, testing to determine work/exit");
        std::string _str_4_exit_signal_ = (_myInfo + " : Queue was signalled without items - must be the exit signal. Exiting...");
        std::string _str_5_obtaining_work_ = (_myInfo + " : Queue had items: Obtaining work!");
        std::string _str_6_obtained_work_ = (_myInfo + " : Work obtained, Releasing the queue.");
        std::string _str_7_executing_work_ = (_myInfo + " : Queue released, Executing work.");
        std::string _str_8_execution_done_ = (_myInfo + " : Work execution done, locking the queue.");
        std::string _str_9_looping_up_ = (_myInfo + " : Queue locked, LOOPING!");

        // Are we done?
        // a) The queue will be empty on the first iteration.
        // b) Even if the queue is empty, consumers may be blocked on results.
        // c) Even if done, consumers may be blocked on results.
        while (!_done || _queue.size())
        {
            DebugLog(_str_0_unlock_.c_str());
            _queueSemaphore.signal(); // First iteration: Signal that we are actually waiting on the queue, essentially priming it,
            // consumers to start adding items.
            // Subsequent iterations: Release the check we are doing on the queue.
            DebugLog(_str_1_wait_work_.c_str());
            _workSemaphore.wait(); // Wait for work to arrive
            DebugLog(_str_2_queuelock_.c_str());
            _queueSemaphore.wait(); // Work has arrived! Or has it? Lock the queue, as we need to check that it is not empty,
            DebugLog(_str_3_queueLocked_.c_str());
            // and to unqueue items.
            if (!_queue.size()) // Are we done? Was this a signal for work or for us to finish?
            {
                DebugLog(_str_4_exit_signal_.c_str());
                // Someone signalled the queue without putting items in. This means we are finished.
                assertion(_done); // duh?
                break;
            }
            else // Yay! Work to do!
            {
                DebugLog(_str_5_obtaining_work_.c_str());

                FutureType future = std::move(_queue.front()); // Get work.
                _queue.pop_front();
                DebugLog(_str_6_obtained_work_.c_str());
                _queueSemaphore.signal(); // Release the krak... QUEUE.
                DebugLog(_str_7_executing_work_.c_str());
                worker(future); // Load a texture! Yay!
                DebugLog(_str_8_execution_done_.c_str());
            }
            _queueSemaphore.wait(); // Continue. Lock the queue to check it out.
            DebugLog(_str_9_looping_up_.c_str());
        }
        _queueSemaphore.signal(); // Finish. Release the queue.
        Log(LogLevel::Information, "%s: Asynchronous asset loader closing down. Freeing workers.", _myInfo.c_str());
    }
};
} // namespace async
} // namespace pvr

namespace pvr {
template<typename T>
class LockedQueue
{
    moodycamel::BlockingConcurrentQueue<T> _queue;
    std::atomic<int32_t> _unblockingCounter; // 1->32767:unblocking one 2,147,483,648:draining all
    volatile bool _isUnblocking;
    const int TIMEOUT_US = 1000;

public:
    typedef moodycamel::ProducerToken ProducerToken;
    typedef moodycamel::ConsumerToken ConsumerToken;

    LockedQueue() : _unblockingCounter(0), _isUnblocking(false) {}

    ConsumerToken getConsumerToken() { return ConsumerToken(_queue); }

    ProducerToken getProducerToken() { return ProducerToken(_queue); }

    void produce(const T& item) { _queue.enqueue(item); }

    void produce(ProducerToken& token, const T& item) { _queue.enqueue(token, item); }

    template<typename iterator>
    void produceMultiple(iterator items, uint32_t numItems)
    {
        _queue.enqueue_bulk(items, numItems);
    }

    template<typename iterator>
    void produceMultiple(ProducerToken& token, iterator items, uint32_t numItems)
    {
        _queue.enqueue_bulk(token, items, numItems);
    }

    // This function will test if there is spend an unblock token. Should be called once whenever a dequeue
    inline bool tryUnblockAndReturnTrueIfUnblocked()
    {
        // times out
        if (_isUnblocking)
        {
            // Spend an "unblock token"
            int32_t howManyUnblocks = _unblockingCounter--;
            if (howManyUnblocks == 1)
            {
                // Finish unblocking if all tokens are spent
                _isUnblocking = false;
            } // Last one!
            if (howManyUnblocks > 0)
            {
                // If you didn't spend a token that was not there, break free, you are unblocked
                return true;
            }
            ++_unblockingCounter; // Nope, not unblocking, at least not for THIS thread. Revert your change, and keep waiting - token is already spent.
        }
        return false;
    }

    bool consume(T& item)
    {
        bool result = false;
        // Spins until you either an item is consumed (returns "result" number of items), or an "unblock" token is consumed.
        while (!(result = _queue.wait_dequeue_timed(item, TIMEOUT_US)))
        {
            if (tryUnblockAndReturnTrueIfUnblocked()) { break; }
        }
        assert(_unblockingCounter >= 0);
        return result;
    }

    bool consume(ConsumerToken& token, T& item)
    {
        bool result = false;
        // Spins until you either an item is consumed (returns "result" number of items), or an "unblock" token is consumed.
        while (!(result = _queue.wait_dequeue_timed(token, item, TIMEOUT_US)))
        {
            if (tryUnblockAndReturnTrueIfUnblocked()) { break; }
        }
        assert(_unblockingCounter >= 0);
        return result;
    }

    template<typename iterator>
    size_t consumeMultiple(iterator firstItem, uint32_t maxItems)
    {
        bool result = false;
        // Spins until you either an item is consumed (returns "result" number of items), or an "unblock" token is consumed.
        while (!(result = _queue.wait_dequeue_bulk_timed(firstItem, maxItems, TIMEOUT_US)))
        {
            if (tryUnblockAndReturnTrueIfUnblocked()) { break; }
        }
        assert(_unblockingCounter >= 0);
        return result;
    }

    template<typename iterator>
    size_t consumeMultiple(ConsumerToken& token, iterator firstItem, uint32_t maxItems)
    {
        size_t result = 0;
        while (!(result = _queue.wait_dequeue_bulk_timed(token, firstItem, maxItems, TIMEOUT_US)))
        {
            if (tryUnblockAndReturnTrueIfUnblocked()) { break; }
        }
        assert(_unblockingCounter >= 0);
        return result;
    }

    void unblockOne()
    {
        _isUnblocking = true;
        _unblockingCounter += 1;
    }

    void unblockMultiple(uint16_t numUnblock)
    {
        _isUnblocking = true;
        _unblockingCounter += numUnblock;
    }

    void reset()
    {
        _unblockingCounter = 0;
        _isUnblocking = false;
    }

    bool isEmpty() { return itemsRemainingApprox() == 0; }

    size_t itemsRemainingApprox() { return _queue.size_approx(); }

    void waitUntilEmpty()
    {
        while (_queue.size_approx())
        {
            while (_queue.size_approx())
            {
                // allow the consumers to consume
                std::this_thread::sleep_for(std::chrono::microseconds(10 * _queue.size_approx()));
            }
            // just for good measure...
            std::this_thread::sleep_for(std::chrono::microseconds(50));
        }
        done();
    }

    void done()
    {
        _unblockingCounter = (std::numeric_limits<int>::max() / 2); // Robustness. Allow user to still call unblock without overflow
        _isUnblocking = true;
    }
}; // namespace pvr
} // namespace pvr