Threading.h#
MultiThreading tools, including abstract classes for tasks and scheduling, and adaptation of MoodyCamel’s BlockingConcurrentQueue.
Includes#
../external/concurrent_queue/blockingconcurrentqueue.h
atomic
condition_variable
deque
mutex
sstream
thread
Included By#
Namespaces#
Classes#
Typedefs#
Source Code#
#pragma once
#include "../external/concurrent_queue/blockingconcurrentqueue.h"
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
#include <sstream>
#include <deque>
// ASYNCHRONOUS FRAMEWORK: Framework async loader base etc //
namespace pvr {
namespace async {
typedef moodycamel::details::mpmc_sema::LightweightSemaphore Semaphore;
class Mutex
{
Semaphore sema;
public:
Mutex() : sema(1) {}
void lock() { sema.wait(); }
void unlock() { sema.signal(); }
Semaphore& getSemaphore() { return sema; }
const Semaphore& getSemaphore() const { return sema; }
};
typedef std::shared_ptr<Semaphore> SemaphorePtr;
template<typename T>
class IFrameworkCleanupObject
{
bool _destroyed;
protected:
IFrameworkCleanupObject() : _destroyed(false) {}
public:
virtual ~IFrameworkCleanupObject() {}
void cleanup()
{
if (!_destroyed)
{
cleanup_();
_destroyed = true;
}
}
private:
virtual void cleanup_() = 0;
};
template<typename T>
class IFrameworkAsyncResult : public IFrameworkCleanupObject<T>
{
public:
typedef T ValueType;
typedef std::shared_ptr<IFrameworkAsyncResult<T>> PointerType;
typedef void (*Callback)(PointerType);
bool isComplete() const { return _isComplete ? true : (_isComplete = isComplete_()); }
ValueType get() { return get_(); }
bool isSuccessful() const { return _successful; }
protected:
Callback _completionCallback;
std::atomic<bool> _inCallback;
bool _successful;
void setTheCallback(Callback completionCallback) { _completionCallback = completionCallback; }
virtual void executeCallBack(PointerType thisPtr)
{
if (_completionCallback)
{
_inCallback = true;
_completionCallback(thisPtr);
_inCallback = false;
}
}
IFrameworkAsyncResult() : _completionCallback(nullptr), _inCallback(false), _successful(false), _isComplete(false) {}
private:
mutable bool _isComplete;
virtual bool isComplete_() const = 0;
virtual T get_() const = 0;
};
template<typename ValueType, typename FutureType, void (*worker)(FutureType)>
class AsyncScheduler
{
public:
typedef std::shared_ptr<IFrameworkAsyncResult<ValueType>> AsyncResult;
uint32_t getNumApproxQueuedItem() { return static_cast<uint32_t>(_queue.size()); }
uint32_t getNumQueuedItems()
{
_queueSemaphore.wait();
uint32_t retval = static_cast<uint32_t>(_queue.size());
_queueSemaphore.signal();
return retval;
}
virtual ~AsyncScheduler()
{
bool didit = false;
_queueSemaphore.wait(); // protect the _done variable
if (!_done)
{
didit = true;
_done = true;
_workSemaphore.signal();
}
_queueSemaphore.signal();
if (didit) // Only join if it was actually running. Duh!
{ _thread.join(); } }
protected:
AsyncScheduler()
{
_done = false;
_thread = std::thread(&AsyncScheduler::run, this);
}
Semaphore _workSemaphore;
Semaphore _queueSemaphore;
std::deque<FutureType> _queue;
std::string _myInfo;
private:
std::thread _thread;
std::atomic_bool _done;
void run()
{
Log(LogLevel::Information,
"%s : Asynchronous Scheduler starting. "
"1 worker thread spawned. The worker thread will be sleeping as long as no work is being performed, "
"and will be released when the async sheduler is destroyed.",
_myInfo.c_str());
std::string _str_0_unlock_ = (_myInfo + " : LOOP ENTER: Unlocking the queue.");
std::string _str_1_wait_work_ = (_myInfo + " : Queue unlocked, waiting for work to be scheduled");
std::string _str_2_queuelock_ = (_myInfo + " : Work scheduled, Obtaining queue lock");
std::string _str_3_queueLocked_ = (_myInfo + " : Queue lock obtained, testing to determine work/exit");
std::string _str_4_exit_signal_ = (_myInfo + " : Queue was signalled without items - must be the exit signal. Exiting...");
std::string _str_5_obtaining_work_ = (_myInfo + " : Queue had items: Obtaining work!");
std::string _str_6_obtained_work_ = (_myInfo + " : Work obtained, Releasing the queue.");
std::string _str_7_executing_work_ = (_myInfo + " : Queue released, Executing work.");
std::string _str_8_execution_done_ = (_myInfo + " : Work execution done, locking the queue.");
std::string _str_9_looping_up_ = (_myInfo + " : Queue locked, LOOPING!");
// Are we done?
// a) The queue will be empty on the first iteration.
// b) Even if the queue is empty, consumers may be blocked on results.
// c) Even if done, consumers may be blocked on results.
while (!_done || _queue.size())
{
DebugLog(_str_0_unlock_.c_str());
_queueSemaphore.signal(); // First iteration: Signal that we are actually waiting on the queue, essentially priming it,
// consumers to start adding items.
// Subsequent iterations: Release the check we are doing on the queue.
DebugLog(_str_1_wait_work_.c_str());
_workSemaphore.wait(); // Wait for work to arrive
DebugLog(_str_2_queuelock_.c_str());
_queueSemaphore.wait(); // Work has arrived! Or has it? Lock the queue, as we need to check that it is not empty,
DebugLog(_str_3_queueLocked_.c_str());
// and to unqueue items.
if (!_queue.size()) // Are we done? Was this a signal for work or for us to finish?
{
DebugLog(_str_4_exit_signal_.c_str());
// Someone signalled the queue without putting items in. This means we are finished.
assertion(_done); // duh?
break;
}
else // Yay! Work to do!
{
DebugLog(_str_5_obtaining_work_.c_str());
FutureType future = std::move(_queue.front()); // Get work.
_queue.pop_front();
DebugLog(_str_6_obtained_work_.c_str());
_queueSemaphore.signal(); // Release the krak... QUEUE.
DebugLog(_str_7_executing_work_.c_str());
worker(future); // Load a texture! Yay!
DebugLog(_str_8_execution_done_.c_str());
}
_queueSemaphore.wait(); // Continue. Lock the queue to check it out.
DebugLog(_str_9_looping_up_.c_str());
}
_queueSemaphore.signal(); // Finish. Release the queue.
Log(LogLevel::Information, "%s: Asynchronous asset loader closing down. Freeing workers.", _myInfo.c_str());
}
};
} // namespace async
} // namespace pvr
namespace pvr {
template<typename T>
class LockedQueue
{
moodycamel::BlockingConcurrentQueue<T> _queue;
std::atomic<int32_t> _unblockingCounter; // 1->32767:unblocking one 2,147,483,648:draining all
volatile bool _isUnblocking;
const int TIMEOUT_US = 1000;
public:
typedef moodycamel::ProducerToken ProducerToken;
typedef moodycamel::ConsumerToken ConsumerToken;
LockedQueue() : _unblockingCounter(0), _isUnblocking(false) {}
ConsumerToken getConsumerToken() { return ConsumerToken(_queue); }
ProducerToken getProducerToken() { return ProducerToken(_queue); }
void produce(const T& item) { _queue.enqueue(item); }
void produce(ProducerToken& token, const T& item) { _queue.enqueue(token, item); }
template<typename iterator>
void produceMultiple(iterator items, uint32_t numItems)
{
_queue.enqueue_bulk(items, numItems);
}
template<typename iterator>
void produceMultiple(ProducerToken& token, iterator items, uint32_t numItems)
{
_queue.enqueue_bulk(token, items, numItems);
}
// This function will test if there is spend an unblock token. Should be called once whenever a dequeue
inline bool tryUnblockAndReturnTrueIfUnblocked()
{
// times out
if (_isUnblocking)
{
// Spend an "unblock token"
int32_t howManyUnblocks = _unblockingCounter--;
if (howManyUnblocks == 1)
{
// Finish unblocking if all tokens are spent
_isUnblocking = false;
} // Last one!
if (howManyUnblocks > 0)
{
// If you didn't spend a token that was not there, break free, you are unblocked
return true;
}
++_unblockingCounter; // Nope, not unblocking, at least not for THIS thread. Revert your change, and keep waiting - token is already spent.
}
return false;
}
bool consume(T& item)
{
bool result = false;
// Spins until you either an item is consumed (returns "result" number of items), or an "unblock" token is consumed.
while (!(result = _queue.wait_dequeue_timed(item, TIMEOUT_US)))
{
if (tryUnblockAndReturnTrueIfUnblocked()) { break; }
}
assert(_unblockingCounter >= 0);
return result;
}
bool consume(ConsumerToken& token, T& item)
{
bool result = false;
// Spins until you either an item is consumed (returns "result" number of items), or an "unblock" token is consumed.
while (!(result = _queue.wait_dequeue_timed(token, item, TIMEOUT_US)))
{
if (tryUnblockAndReturnTrueIfUnblocked()) { break; }
}
assert(_unblockingCounter >= 0);
return result;
}
template<typename iterator>
size_t consumeMultiple(iterator firstItem, uint32_t maxItems)
{
bool result = false;
// Spins until you either an item is consumed (returns "result" number of items), or an "unblock" token is consumed.
while (!(result = _queue.wait_dequeue_bulk_timed(firstItem, maxItems, TIMEOUT_US)))
{
if (tryUnblockAndReturnTrueIfUnblocked()) { break; }
}
assert(_unblockingCounter >= 0);
return result;
}
template<typename iterator>
size_t consumeMultiple(ConsumerToken& token, iterator firstItem, uint32_t maxItems)
{
size_t result = 0;
while (!(result = _queue.wait_dequeue_bulk_timed(token, firstItem, maxItems, TIMEOUT_US)))
{
if (tryUnblockAndReturnTrueIfUnblocked()) { break; }
}
assert(_unblockingCounter >= 0);
return result;
}
void unblockOne()
{
_isUnblocking = true;
_unblockingCounter += 1;
}
void unblockMultiple(uint16_t numUnblock)
{
_isUnblocking = true;
_unblockingCounter += numUnblock;
}
void reset()
{
_unblockingCounter = 0;
_isUnblocking = false;
}
bool isEmpty() { return itemsRemainingApprox() == 0; }
size_t itemsRemainingApprox() { return _queue.size_approx(); }
void waitUntilEmpty()
{
while (_queue.size_approx())
{
while (_queue.size_approx())
{
// allow the consumers to consume
std::this_thread::sleep_for(std::chrono::microseconds(10 * _queue.size_approx()));
}
// just for good measure...
std::this_thread::sleep_for(std::chrono::microseconds(50));
}
done();
}
void done()
{
_unblockingCounter = (std::numeric_limits<int>::max() / 2); // Robustness. Allow user to still call unblock without overflow
_isUnblocking = true;
}
}; // namespace pvr
} // namespace pvr