1 #ifndef CAFFE2_UTILS_ZMQ_HELPER_H_ 2 #define CAFFE2_UTILS_ZMQ_HELPER_H_ 6 #include "caffe2/core/logging.h" 12 explicit ZmqContext(
int io_threads) : ptr_(zmq_ctx_new()) {
13 CAFFE_ENFORCE(ptr_ !=
nullptr,
"Failed to create zmq context.");
14 int rc = zmq_ctx_set(ptr_, ZMQ_IO_THREADS, io_threads);
15 CAFFE_ENFORCE_EQ(rc, 0);
16 rc = zmq_ctx_set(ptr_, ZMQ_MAX_SOCKETS, ZMQ_MAX_SOCKETS_DFLT);
17 CAFFE_ENFORCE_EQ(rc, 0);
20 int rc = zmq_ctx_destroy(ptr_);
21 CAFFE_ENFORCE_EQ(rc, 0);
24 void* ptr() {
return ptr_; }
35 int rc = zmq_msg_init(&msg_);
36 CAFFE_ENFORCE_EQ(rc, 0);
40 int rc = zmq_msg_close(&msg_);
41 CAFFE_ENFORCE_EQ(rc, 0);
44 zmq_msg_t* msg() {
return &msg_; }
46 void* data() {
return zmq_msg_data(&msg_); }
47 size_t size() {
return zmq_msg_size(&msg_); }
57 : context_(1), ptr_(zmq_socket(context_.ptr(), type)) {
58 CAFFE_ENFORCE(ptr_ !=
nullptr,
"Faild to create zmq socket.");
62 int rc = zmq_close(ptr_);
63 CAFFE_ENFORCE_EQ(rc, 0);
66 void Bind(
const string& addr) {
67 int rc = zmq_bind(ptr_, addr.c_str());
68 CAFFE_ENFORCE_EQ(rc, 0);
71 void Unbind(
const string& addr) {
72 int rc = zmq_unbind(ptr_, addr.c_str());
73 CAFFE_ENFORCE_EQ(rc, 0);
76 void Connect(
const string& addr) {
77 int rc = zmq_connect(ptr_, addr.c_str());
78 CAFFE_ENFORCE_EQ(rc, 0);
81 void Disconnect(
const string& addr) {
82 int rc = zmq_disconnect(ptr_, addr.c_str());
83 CAFFE_ENFORCE_EQ(rc, 0);
86 int Send(
const string& msg,
int flags) {
87 int nbytes = zmq_send(ptr_, msg.c_str(), msg.size(), flags);
90 }
else if (zmq_errno() == EAGAIN) {
93 LOG(FATAL) <<
"Cannot send zmq message. Error number: " 99 int SendTillSuccess(
const string& msg,
int flags) {
100 CAFFE_ENFORCE(msg.size(),
"You cannot send an empty message.");
103 nbytes = Send(msg, flags);
104 }
while (nbytes == 0);
109 int nbytes = zmq_msg_recv(msg->msg(), ptr_, 0);
112 }
else if (zmq_errno() == EAGAIN || zmq_errno() == EINTR) {
115 LOG(FATAL) <<
"Cannot receive zmq message. Error number: " 125 }
while (nbytes == 0);
137 #endif // CAFFE2_UTILS_ZMQ_HELPER_H_
Simple registry implementation in Caffe2 that uses static variables to register object creators durin...