PahoMqttCpp
MQTT C++ Client for POSIX and Windows
Loading...
Searching...
No Matches
thread_queue.h
Go to the documentation of this file.
1
8
9/*******************************************************************************
10 * Copyright (c) 2017-2022 Frank Pagliughi <fpagliughi@mindspring.com>
11 *
12 * All rights reserved. This program and the accompanying materials
13 * are made available under the terms of the Eclipse Public License v2.0
14 * and Eclipse Distribution License v1.0 which accompany this distribution.
15 *
16 * The Eclipse Public License is available at
17 * http://www.eclipse.org/legal/epl-v20.html
18 * and the Eclipse Distribution License is available at
19 * http://www.eclipse.org/org/documents/edl-v10.php.
20 *
21 * Contributors:
22 * Frank Pagliughi - initial implementation and documentation
23 *******************************************************************************/
24
25#ifndef __mqtt_thread_queue_h
26#define __mqtt_thread_queue_h
27
28#include <algorithm>
29#include <condition_variable>
30#include <deque>
31#include <limits>
32#include <mutex>
33#include <queue>
34#include <thread>
35
36namespace mqtt {
37
42class queue_closed : public std::runtime_error
43{
44public:
45 queue_closed() : std::runtime_error("queue is closed") {}
46};
47
49
84template <typename T, class Container = std::deque<T>>
86{
87public:
89 using container_type = Container;
91 using value_type = T;
93 using size_type = typename Container::size_type;
94
96 static constexpr size_type MAX_CAPACITY = std::numeric_limits<size_type>::max();
97
98private:
100 mutable std::mutex lock_;
102 std::condition_variable notEmptyCond_;
104 std::condition_variable notFullCond_;
108 bool closed_{false};
109
111 std::queue<T, Container> que_;
112
114 using guard = std::lock_guard<std::mutex>;
116 using unique_guard = std::unique_lock<std::mutex>;
117
119 bool is_done() const { return closed_ && que_.empty(); }
120
121public:
133 explicit thread_queue(size_t cap) : cap_(std::max<size_type>(cap, 1)) {}
139 bool empty() const {
140 guard g{lock_};
141 return que_.empty();
142 }
143
148 guard g{lock_};
149 return cap_;
150 }
151
157 void capacity(size_type cap) {
158 guard g{lock_};
159 cap_ = std::max<size_type>(cap, 1);
160 if (cap_ > que_.size())
161 notFullCond_.notify_all();
162 }
163
167 size_type size() const {
168 guard g{lock_};
169 return que_.size();
170 }
171
177 void close() {
178 guard g{lock_};
179 closed_ = true;
180 notFullCond_.notify_all();
181 notEmptyCond_.notify_all();
182 }
183
190 bool closed() const {
191 guard g{lock_};
192 return closed_;
193 }
194
200 bool done() const {
201 guard g{lock_};
202 return is_done();
203 }
204
208 void clear() {
209 guard g{lock_};
210 while (!que_.empty()) que_.pop();
211 notFullCond_.notify_all();
212 }
213
219 void put(value_type val) {
220 unique_guard g{lock_};
221 notFullCond_.wait(g, [this] { return que_.size() < cap_ || closed_; });
222 if (closed_)
223 throw queue_closed{};
224
225 que_.emplace(std::move(val));
226 notEmptyCond_.notify_one();
227 }
228
234 bool try_put(value_type val) {
235 guard g{lock_};
236 if (que_.size() >= cap_ || closed_)
237 return false;
238
239 que_.emplace(std::move(val));
240 notEmptyCond_.notify_one();
241 return true;
242 }
243
252 template <typename Rep, class Period>
253 bool try_put_for(value_type val, const std::chrono::duration<Rep, Period>& relTime) {
254 unique_guard g{lock_};
255 bool to = !notFullCond_.wait_for(g, relTime, [this] {
256 return que_.size() < cap_ || closed_;
257 });
258 if (to || closed_)
259 return false;
260
261 que_.emplace(std::move(val));
262 notEmptyCond_.notify_one();
263 return true;
264 }
265
275 template <class Clock, class Duration>
277 value_type val, const std::chrono::time_point<Clock, Duration>& absTime
278 ) {
279 unique_guard g{lock_};
280 bool to = !notFullCond_.wait_until(g, absTime, [this] {
281 return que_.size() < cap_ || closed_;
282 });
283
284 if (to || closed_)
285 return false;
286
287 que_.emplace(std::move(val));
288 notEmptyCond_.notify_one();
289 return true;
290 }
291
297 bool get(value_type* val) {
298 if (!val)
299 return false;
300
301 unique_guard g{lock_};
302 notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
303 if (que_.empty()) // We must be done
304 return false;
305
306 *val = std::move(que_.front());
307 que_.pop();
308 notFullCond_.notify_one();
309 return true;
310 }
311
318 unique_guard g{lock_};
319 notEmptyCond_.wait(g, [this] { return !que_.empty() || closed_; });
320 if (que_.empty()) // We must be done
321 throw queue_closed{};
322
323 value_type val = std::move(que_.front());
324 que_.pop();
325 notFullCond_.notify_one();
326 return val;
327 }
328
336 bool try_get(value_type* val) {
337 if (!val)
338 return false;
339
340 guard g{lock_};
341 if (que_.empty())
342 return false;
343
344 *val = std::move(que_.front());
345 que_.pop();
346 notFullCond_.notify_one();
347 return true;
348 }
349
359 template <typename Rep, class Period>
360 bool try_get_for(value_type* val, const std::chrono::duration<Rep, Period>& relTime) {
361 if (!val)
362 return false;
363
364 unique_guard g{lock_};
365 notEmptyCond_.wait_for(g, relTime, [this] { return !que_.empty() || closed_; });
366
367 if (que_.empty())
368 return false;
369
370 *val = std::move(que_.front());
371 que_.pop();
372 notFullCond_.notify_one();
373 return true;
374 }
375
385 template <class Clock, class Duration>
387 value_type* val, const std::chrono::time_point<Clock, Duration>& absTime
388 ) {
389 if (!val)
390 return false;
391
392 unique_guard g{lock_};
393 notEmptyCond_.wait_until(g, absTime, [this] { return !que_.empty() || closed_; });
394 if (que_.empty())
395 return false;
396
397 *val = std::move(que_.front());
398 que_.pop();
399 notFullCond_.notify_one();
400 return true;
401 }
402};
403
405} // namespace mqtt
406
407#endif // __mqtt_thread_queue_h
Definition thread_queue.h:43
queue_closed()
Definition thread_queue.h:45
typename Container::size_type size_type
Definition thread_queue.h:93
T value_type
Definition thread_queue.h:91
size_type size() const
Definition thread_queue.h:167
void capacity(size_type cap)
Definition thread_queue.h:157
bool done() const
Definition thread_queue.h:200
Container container_type
Definition thread_queue.h:89
bool try_put(value_type val)
Definition thread_queue.h:234
bool try_get_for(value_type *val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:360
bool try_put_for(value_type val, const std::chrono::duration< Rep, Period > &relTime)
Definition thread_queue.h:253
static constexpr size_type MAX_CAPACITY
Definition thread_queue.h:96
bool try_get(value_type *val)
Definition thread_queue.h:336
thread_queue()
Definition thread_queue.h:126
bool try_put_until(value_type val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:276
void clear()
Definition thread_queue.h:208
bool closed() const
Definition thread_queue.h:190
bool get(value_type *val)
Definition thread_queue.h:297
bool empty() const
Definition thread_queue.h:139
void close()
Definition thread_queue.h:177
size_type capacity() const
Definition thread_queue.h:147
thread_queue(size_t cap)
Definition thread_queue.h:133
void put(value_type val)
Definition thread_queue.h:219
value_type get()
Definition thread_queue.h:317
bool try_get_until(value_type *val, const std::chrono::time_point< Clock, Duration > &absTime)
Definition thread_queue.h:386
Definition async_client.h:60