25#ifndef __mqtt_thread_queue_h
26#define __mqtt_thread_queue_h
29#include <condition_variable>
84template <
typename T,
class Container = std::deque<T>>
100 mutable std::mutex lock_;
102 std::condition_variable notEmptyCond_;
104 std::condition_variable notFullCond_;
111 std::queue<T, Container> que_;
114 using guard = std::lock_guard<std::mutex>;
116 using unique_guard = std::unique_lock<std::mutex>;
119 bool is_done()
const {
return closed_ && que_.empty(); }
159 cap_ = std::max<size_type>(cap, 1);
160 if (cap_ > que_.size())
161 notFullCond_.notify_all();
180 notFullCond_.notify_all();
181 notEmptyCond_.notify_all();
210 while (!que_.empty()) que_.pop();
211 notFullCond_.notify_all();
220 unique_guard g{lock_};
221 notFullCond_.wait(g, [
this] {
return que_.size() < cap_ || closed_; });
225 que_.emplace(std::move(val));
226 notEmptyCond_.notify_one();
236 if (que_.size() >= cap_ || closed_)
239 que_.emplace(std::move(val));
240 notEmptyCond_.notify_one();
252 template <
typename Rep,
class Period>
254 unique_guard g{lock_};
255 bool to = !notFullCond_.wait_for(g, relTime, [
this] {
256 return que_.size() < cap_ || closed_;
261 que_.emplace(std::move(val));
262 notEmptyCond_.notify_one();
275 template <
class Clock,
class Duration>
277 value_type val,
const std::chrono::time_point<Clock, Duration>& absTime
279 unique_guard g{lock_};
280 bool to = !notFullCond_.wait_until(g, absTime, [
this] {
281 return que_.size() < cap_ || closed_;
287 que_.emplace(std::move(val));
288 notEmptyCond_.notify_one();
301 unique_guard g{lock_};
302 notEmptyCond_.wait(g, [
this] {
return !que_.empty() || closed_; });
306 *val = std::move(que_.front());
308 notFullCond_.notify_one();
318 unique_guard g{lock_};
319 notEmptyCond_.wait(g, [
this] {
return !que_.empty() || closed_; });
325 notFullCond_.notify_one();
344 *val = std::move(que_.front());
346 notFullCond_.notify_one();
359 template <
typename Rep,
class Period>
364 unique_guard g{lock_};
365 notEmptyCond_.wait_for(g, relTime, [
this] {
return !que_.empty() || closed_; });
370 *val = std::move(que_.front());
372 notFullCond_.notify_one();
385 template <
class Clock,
class Duration>
387 value_type* val,
const std::chrono::time_point<Clock, Duration>& absTime
392 unique_guard g{lock_};
393 notEmptyCond_.wait_until(g, absTime, [
this] {
return !que_.empty() || closed_; });
397 *val = std::move(que_.front());
399 notFullCond_.notify_one();
Definition thread_queue.h:43
queue_closed()
Definition thread_queue.h:45
typename Container::size_type size_type
Definition thread_queue.h:93
T value_type
Definition thread_queue.h:91
size_type size() const
Definition thread_queue.h:167
void capacity(size_type cap)
Definition thread_queue.h:157
bool done() const
Definition thread_queue.h:200
Container container_type
Definition thread_queue.h:89
bool try_put(value_type val)
Definition thread_queue.h:234
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:360
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:253
static constexpr size_type MAX_CAPACITY
Definition thread_queue.h:96
bool try_get(value_type *val)
Definition thread_queue.h:336
thread_queue()
Definition thread_queue.h:126
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:276
void clear()
Definition thread_queue.h:208
bool closed() const
Definition thread_queue.h:190
bool get(value_type *val)
Definition thread_queue.h:297
bool empty() const
Definition thread_queue.h:139
void close()
Definition thread_queue.h:177
size_type capacity() const
Definition thread_queue.h:147
thread_queue(size_t cap)
Definition thread_queue.h:133
void put(value_type val)
Definition thread_queue.h:219
value_type get()
Definition thread_queue.h:317
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:386
Definition async_client.h:60