forked from khuttun/PolyM
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathQueue.cpp
More file actions
134 lines (107 loc) · 3.01 KB
/
Queue.cpp
File metadata and controls
134 lines (107 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
#include "Queue.hpp"
#include <chrono>
#include <condition_variable>
#include <queue>
#include <map>
#include <mutex>
#include <utility>
namespace PolyM {
class Queue::Impl
{
public:
Impl()
: queue_(), queueMutex_(), queueCond_(), responseMap_(), responseMapMutex_()
{
}
void put(Msg&& msg)
{
{
std::lock_guard<std::mutex> lock(queueMutex_);
queue_.push(msg.move());
}
queueCond_.notify_one();
}
std::unique_ptr<Msg> get(int timeoutMillis)
{
std::unique_lock<std::mutex> lock(queueMutex_);
if (timeoutMillis <= 0)
queueCond_.wait(lock, [this]{return !queue_.empty();});
else
{
// wait_for returns false if the return is due to timeout
auto timeoutOccured = !queueCond_.wait_for(
lock,
std::chrono::milliseconds(timeoutMillis),
[this]{return !queue_.empty();});
if (timeoutOccured)
queue_.emplace(new Msg(MSG_TIMEOUT));
}
auto msg = queue_.front()->move();
queue_.pop();
return msg;
}
std::unique_ptr<Msg> request(Msg&& msg)
{
// Construct an ad hoc Queue to handle response Msg
std::unique_lock<std::mutex> lock(responseMapMutex_);
auto it = responseMap_.emplace(
std::make_pair(msg.getUniqueId(), std::unique_ptr<Queue>(new Queue))).first;
lock.unlock();
put(std::move(msg));
auto response = it->second->get(); // Block until response is put to the response Queue
lock.lock();
responseMap_.erase(it); // Delete the response Queue
lock.unlock();
return response;
}
void respondTo(MsgUID reqUid, Msg&& responseMsg)
{
std::lock_guard<std::mutex> lock(responseMapMutex_);
if (responseMap_.count(reqUid) > 0)
responseMap_[reqUid]->put(std::move(responseMsg));
}
int size()
{
std::lock_guard<std::mutex> lock(queueMutex_);
return queue_.size();
}
private:
// Queue for the Msgs
std::queue<std::unique_ptr<Msg>> queue_;
// Mutex to protect access to the queue
std::mutex queueMutex_;
// Condition variable to wait for when getting Msgs from the queue
std::condition_variable queueCond_;
// Map to keep track of which response handler queues are associated with which request Msgs
std::map<MsgUID, std::unique_ptr<Queue>> responseMap_;
// Mutex to protect access to response map
std::mutex responseMapMutex_;
};
Queue::Queue()
: impl_(new Impl)
{
}
Queue::~Queue()
{
}
void Queue::put(Msg&& msg)
{
impl_->put(std::move(msg));
}
std::unique_ptr<Msg> Queue::get(int timeoutMillis)
{
return impl_->get(timeoutMillis);
}
std::unique_ptr<Msg> Queue::request(Msg&& msg)
{
return impl_->request(std::move(msg));
}
void Queue::respondTo(MsgUID reqUid, Msg&& responseMsg)
{
impl_->respondTo(reqUid, std::move(responseMsg));
}
int Queue::size()
{
return impl_->size();
}
}