Portable C++ Timer Queue
Ramblings on Timer Queues, and a portable and self-contained C++ implementation.
Introduction
Originally posted on my blog at http://www.crazygaze.com/blog/2016/03/24/portable-c-timer-queue/
If you looked at my previous posts, you noticed I explored some of the nice things Boost Asio gives you, and how I implemented my own version.
I covered what I called callstack markers, which has more uses than it looks on the surface. Also covered strands, and why they are such a nice concept.
This time around, I’m going to focus on the equivalent of boost::timer::deadline_timer. The implementation shown here is custom made, portable C++11 and self contained.
If I had to roughly describe what deadline_timer is for while at the same time hinting at the implementation, I would say “deadline_timer allows you group together a bunch of handlers to be called when a timer expires“.
I think the “deadline” in deadline_timer is a bit misleading. With the definition of “deadline” being “the latest time or date by which something should be completed“, the first thing that crosses my mind is that it is used to cancel some operation that did not complete by the specified time/date. Such has to cancel a connect attempt. Certainly it can, but can be used for a lot more.
For simplicity, I’ll be calling it “timer(s)”, and not “deadline timer(s)”
A bit of brainstorming
So, how do we go about implementing timers?
At its core, you need a queue for the timers, and a worker thread that dequeues and executes timers according to the specified expiry times/dates. Lets call it “Timer Queue”. Win32 API uses the same name for such a thing: Timer Queues
What I explore in this post is a Timer Queue implementation. It allows one-shot timers. Higher level things such as grouping handlers in one timer and timers that be be reused (like boost Asio’s own implementation), can be built on top.
The trickiest thing to solve is how to correctly notify the worker thread that something happened that might change the current wait time.
For example:
Worker thread is waiting for timers to process
Main thread adds a timer A with expiry time of 10 seconds.
Worker thread detects there is a timer in the queue, and starts a wait based on that timer expiry time.
Main thread adds a timer B with expiry time 1 second
Worker thread needs to rethink the waiting time, since timer B should be executed before A.
Now, if you throw in a couple more threads adding or cancelling timers, things might get confusing. Personally, I think it’s a bad sign if to verify correctness of your multithreaded code you have to think too hard. Sure, performance is good, but first get it right. If performance is a problem, profile it, then think as hard as you need to get it right and fast.
My first pass at this was convoluted to say the least. There was a queue for the timers, then communication with the worker thread was done with another queue where I would pass commands such as “recalculate wait time”, or “execute this timer right now”, or “shutdown”.
To add insult to injury, those two queues were locked independently.
Sure enough, as I started to write this post to share the code, I’ve spotted another bug. Ever had that feeling of “Surely there must be a cleaner way to do this” ?
To be honest, in hindsight that first pass was quite bad. Maybe it just got that bad in small increments as I was focusing on the wrong problems.
What was sabotaging my efforts was that I was in the wrong mindset. I kept thinking off the thread-to-thread communication as “work to be done“, and thus the obvious choice there was to have a command queue.
After a moment of clarity, I changed my mindset from “work to be done” to “something has changed“, and a cleaner solution formed:
The thread wakes up when something has changed (e,g: Timer added/cancelled, or expired), and checks for work (e.g: execute handlers for expired timers). Extraneous notifications to wake up the thread have no side effects, since the thread wakes up, checks for work, recalculates the new wait time and goes back to waiting.
Implementation
The only helper class needed is a semaphore, which can be implemented in portable C++11 with amutex, a condition_variable and a counter.
////////////////////////////////////////// #pragma once #include <mutex> #include <condition_variable> class Semaphore { public: Semaphore(unsigned int count = 0) : m_count(count) {} void notify() { std::unique_lock<std::mutex> lock(m_mtx); m_count++; m_cv.notify_one(); } void wait() { std::unique_lock<std::mutex> lock(m_mtx); m_cv.wait(lock, [this]() { return m_count > 0; }); m_count--; } template <class Clock, class Duration> bool waitUntil(const std::chrono::time_point<Clock, Duration>& point) { std::unique_lock<std::mutex> lock(m_mtx); if (!m_cv.wait_until(lock, point, [this]() { return m_count > 0; })) return false; m_count--; return true; } private: std::mutex m_mtx; std::condition_variable m_cv; unsigned int m_count; }; //////////////////////////////////////////
A few more useful methods could be added to Semaphore, but were omitted to keep it short.
bool waitFor(duration) : To wait for specified duration before timeout
bool tryWait() : To check and decrement the semaphore without waiting.
And the TimerQueue implementation, heavily documented to further explain the implementation…
////////////////////////////////////////// #pragma once #include "Semaphore.h" #include <thread> #include <queue> #include <chrono> #include <assert.h> // Timer Queue // // Allows execution of handlers at a specified time in the future // Guarantees: // - All handlers are executed ONCE, even if cancelled (aborted parameter will //be set to true) // - If TimerQueue is destroyed, it will cancel all handlers. // - Handlers are ALWAYS executed in the Timer Queue worker thread. // - Handlers execution order is NOT guaranteed // class TimerQueue { public: TimerQueue() { m_th = std::thread([this] { run(); }); } ~TimerQueue() { cancelAll(); // Abusing the timer queue to trigger the shutdown. add(0, [this](bool) { m_finish = true; }); m_th.join(); } //! Adds a new timer // \return // Returns the ID of the new timer. You can use this ID to cancel the // timer uint64_t add(int64_t milliseconds, std::function<void(bool)> handler) { WorkItem item; item.end = Clock::now() + std::chrono::milliseconds(milliseconds); item.handler = std::move(handler); std::unique_lock<std::mutex> lk(m_mtx); uint64_t id = ++m_idcounter; item.id = id; m_items.push(std::move(item)); lk.unlock(); // Something changed, so wake up timer thread m_checkWork.notify(); return id; } //! Cancels the specified timer // \return // 1 if the timer was cancelled. // 0 if you were too late to cancel (or the timer ID was never valid to // start with) size_t cancel(uint64_t id) { // Instead of removing the item from the container (thus breaking the // heap integrity), we set the item as having no handler, and put // that handler on a new item at the top for immediate execution // The timer thread will then ignore the original item, since it has no // handler. std::unique_lock<std::mutex> lk(m_mtx); for (auto&& item : m_items.getContainer()) { if (item.id == id && item.handler) { WorkItem newItem; // Zero time, so it stays at the top for immediate execution newItem.end = Clock::time_point(); newItem.id = 0; // Means it is a canceled item // Move the handler from item to newitem (thus clearing item) newItem.handler = std::move(item.handler); m_items.push(std::move(newItem)); lk.unlock(); // Something changed, so wake up timer thread m_checkWork.notify(); return 1; } } return 0; } //! Cancels all timers // \return // The number of timers cancelled size_t cancelAll() { // Setting all "end" to 0 (for immediate execution) is ok, // since it maintains the heap integrity std::unique_lock<std::mutex> lk(m_mtx); for (auto&& item : m_items.getContainer()) { if (item.id && item.handler) { item.end = Clock::time_point(); item.id = 0; } } auto ret = m_items.size(); lk.unlock(); m_checkWork.notify(); return ret; } private: using Clock = std::chrono::steady_clock; TimerQueue(const TimerQueue&) = delete; TimerQueue& operator=(const TimerQueue&) = delete; void run() { while (!m_finish) { auto end = calcWaitTime(); if (end.first) { // Timers found, so wait until it expires (or something else // changes) m_checkWork.waitUntil(end.second); } else { // No timers exist, so wait forever until something changes m_checkWork.wait(); } // Check and execute as much work as possible, such as, all expired // timers checkWork(); } // If we are shutting down, we should not have any items left, // since the shutdown cancels all items assert(m_items.size() == 0); } std::pair<bool, Clock::time_point> calcWaitTime() { std::lock_guard<std::mutex> lk(m_mtx); while (m_items.size()) { if (m_items.top().handler) { // Item present, so return the new wait time return std::make_pair(true, m_items.top().end); } else { // Discard empty handlers (they were cancelled) m_items.pop(); } } // No items found, so return no wait time (causes the thread to wait // indefinitely) return std::make_pair(false, Clock::time_point()); } void checkWork() { std::unique_lock<std::mutex> lk(m_mtx); while (m_items.size() && m_items.top().end <= Clock::now()) { WorkItem item(std::move(m_items.top())); m_items.pop(); lk.unlock(); if (item.handler) item.handler(item.id == 0); lk.lock(); } } Semaphore m_checkWork; std::thread m_th; bool m_finish = false; uint64_t m_idcounter = 0; struct WorkItem { Clock::time_point end; uint64_t id; // id==0 means it was cancelled std::function<void(bool)> handler; bool operator>(const WorkItem& other) const { return end > other.end; } }; std::mutex m_mtx; // Inheriting from priority_queue, so we can access the internal container class Queue : public std::priority_queue<WorkItem, std::vector<WorkItem>, std::greater<WorkItem>> { public: std::vector<WorkItem>& getContainer() { return this->c; } } m_items; } //////////////////////////////////////////
And a small example just adding and cancelling timers…
////////////////////////////////////////// #include "TimerQueue.h" #include <future> namespace Timing { using Clock = std::chrono::high_resolution_clock; static thread_local Clock::time_point ms_previous; double now() { static auto start = Clock::now(); return std::chrono::duration<double, std::milli>(Clock::now() - start) .count(); } void sleep(unsigned ms) { std::this_thread::sleep_for(std::chrono::milliseconds(ms)); } } // namespace Timing int main() { TimerQueue q; q.add(10000, [start = Timing::now()](bool aborted) mutable { printf("T 1: %d, Elapsed %4.2fms\n", (int)aborted, Timing::now() - start); }); q.add(10001, [start = Timing::now()](bool aborted) mutable { printf("T 2: %d, Elapsed %4.2fms\n", (int)aborted, Timing::now() - start); }); q.cancelAll(); q.add(1000, [start = Timing::now()](bool aborted) mutable { printf("T 3: %d, Elapsed %4.2fms\n", (int)aborted, Timing::now() - start); }); auto id = q.add(2000, [start = Timing::now()](bool aborted) mutable { printf("T 4: %d, Elapsed %4.2fms\n", (int)aborted, Timing::now() - start); }); auto ret = q.cancel(id); assert(ret == 1); Timing::sleep(2000); return 0; } //////////////////////////////////////////
The output will be this (with varying *Elapsed” values)…
T 1: 1, Elapsed 0.01ms T 2: 1, Elapsed 0.27ms T 4: 1, Elapsed 0.48ms T 3: 0, Elapsed 1000.92ms
Things to improve
Main objective with this implementation was to have a small, self-contained and portable implementation. Therefore a few things can be improved if performance is a problem:
Use some kind of lockless queue
Cancelling a timer requires iterating through all the timers (the worst case scenario)
Add checks in TimerQueue::add to detect if we are shutting down
Try to get rid of unnecessary wake ups.
For example, for simplicity, adding or cancelling a timer makes no effort to detect if it’s really necessary to wake up the timer thread.
Using a uint64_t to identify timers works fine, but it’s error prone.
A fresh pair of eyes looking at the code, since this was a quick implementation with some basic tests.
Conclusion
Although TimerQueue is useful as-is, if you are familiar with Boost’s Asio deadline_timer, you will spot a couple of differences:
TimerQueue is not in any way associated with something akin to Boost Asio’s io_service
Handlers are executed in the TimerQueue’s own thread.
Timers are one-shot
Boost Asio deadline_timer is reusable.
A close match to Boost Asio deadline_timer can be implemented on top of TimerQueue by having aTimer class that aggregates handlers, and puts a one-off timer on TimerQueue whenever we set an expiry time. That one-off timer will run any handlers the Timer instance owns. Even more, instead of executing the handlers in the TimerQueue thread, Timer can forward execution of the handlers to another object, thus giving the application control when/where to execute the handlers.
While writing this, I took a quick look at how Boost’s Asio’s implements deadline timers (on Windows), to see if it was significantly different. Particularly, I was curious how deadline_timer is tied to io_service. If there is another thread involved for triggering timer related things, or if somehow everything was controlled by Windows Completion Ports.
From my short study of the beast’s guts, it seems it uses a somewhat similar method:
There is a thread for timer related things (struct win_iocp_io_service::timer_thread_function )
This thread just loops until shutdown, waiting on a Win32 waitable timer object for something to happen.
When this waitable timer expires, it uses PostQueuedCompletionStatus on the associated Completion Port, to trigger any handler execution in the appropriate threads (where the user is calling io_service::run)
Changes to deadline_timer instances set that win32 waitable timer
I said somewhat similar, since my implementation executes handlers in the timer queue thread itself when a timer expires. In Boost Asio, when a timer expires, the timer thread sends a signal to the respective io_service, and the handlers are executed there, as explained above.
Feel free to comment with corrections, suggestions, or ask me to cover other areas.
Read more about:
BlogsAbout the Author
You May Also Like