-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathzmq_client.cpp
More file actions
76 lines (68 loc) · 1.93 KB
/
Copy pathzmq_client.cpp
File metadata and controls
76 lines (68 loc) · 1.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include "zmq_client.hpp"
Client::Client(): _context(1), client(_context, ZMQ_REQ){}
bool Client::start(std::string &errStr){
bool result = true;
client.setsockopt(ZMQ_RCVTIMEO, timRcvMsec);
client.setsockopt(ZMQ_SNDTIMEO, timSnsMsec);
_stop = false;
try{
if(_in_queue_size) client.setsockopt(ZMQ_RCVHWM, _in_queue_size);
if(_out_queue_size) client.setsockopt(ZMQ_SNDHWM, _out_queue_size);
for(const auto &its: senderPoints)
client.connect(its);
}
catch(std::exception &e){
errStr = std::string(e.what());
result = false;
}
_stopped = false;
return result;
}
bool Client::stop(){
_stop = true;
bool result = true;
for(const auto &its: senderPoints){
try{client.disconnect(its);}
catch(std::exception &e){result = false;}
}
_stopped = true;
return result;
}
bool Client::receive(zmq::message_t *message, std::string &errStr, uint8_t efforts){
try{
uint8_t count = 0;
do{
if(!_stopped && client.recv(message, _block ? 0 : ZMQ_DONTWAIT))
return true;
std::this_thread::sleep_for(std::chrono::milliseconds(3));
if(efforts > 0){
count++;
if(count >= efforts)
break;
}
}
while(!_stop);
}
catch(std::exception &e){
errStr = std::string(e.what());
}
return false;
}
bool Client::send(zmq::message_t &message, std::string &errStr){
try{
do{
if(!_stopped && client.send(message, _block ? 0 : ZMQ_DONTWAIT)){
return true;
}
std::this_thread::sleep_for(std::chrono::milliseconds(3));
}
while(!_stop);
}
catch(const std::exception &e){
errStr = std::string(e.what());
}
return false;
}
void Client::add_sender_point(const std::string& sp){
senderPoints.push_back(sp);
}