28 #include <boost/function.hpp>
30 #include <graphlab/util/stl_util.hpp>
31 #include <graphlab/parallel/pthread_tools.hpp>
32 #include <graphlab/rpc/distributed_event_log.hpp>
33 #include <graphlab/rpc/get_last_dc_procid.hpp>
35 #include <graphlab/ui/mongoose/mongoose.h>
36 #include <graphlab/ui/metrics_server.hpp>
38 #include <graphlab/macros_def.hpp>
44 static mg_context* metric_context = NULL;
46 static rwlock& callback_lock() {
52 static std::map<std::string, http_redirect_callback_type>& callbacks() {
53 static std::map<std::string, http_redirect_callback_type> cback;
60 static void* process_request(
enum mg_event event,
61 struct mg_connection* conn,
62 const struct mg_request_info* info) {
63 if (event == MG_NEW_REQUEST) {
67 if (info->uri != NULL) url = info->uri;
69 if (url.length() >= 1) url = url.substr(1, url.length() - 1);
71 std::map<std::string, std::string> variable_map;
72 if (info->query_string != NULL) {
73 std::string qs = info->query_string;
74 std::vector<std::string> terms =
strsplit(qs,
"&",
true);
76 foreach(std::string& term, terms) {
78 std::vector<std::string> key_val =
strsplit(term,
"=",
true);
79 if (key_val.size() > 0) {
82 std::string key = key_val[0];
83 char val_target[8192];
84 int ret = mg_get_var(qs.c_str(), qs.length(),
85 key.c_str(), val_target, 8192);
86 if (ret >= 0) variable_map[key] = val_target;
90 callback_lock().readlock();
92 std::map<std::string, http_redirect_callback_type>::iterator iter =
93 callbacks().find(url);
95 if (iter != callbacks().end()) {
96 std::pair<std::string, std::string> returnval = iter->second(variable_map);
98 callback_lock().rdunlock();
100 std::string ctype = returnval.first;
101 std::string body = returnval.second;
103 "HTTP/1.1 200 OK\r\n"
104 "Access-Control-Allow-Origin: *\r\n"
105 "Access-Control-Allow-Methods: GET\r\n"
106 "Content-Type: %s\r\n"
107 "Content-Length: %d\r\n"
109 ctype.c_str(), (int) body.length());
110 mg_write(conn, body.c_str(), body.length());
113 std::map<std::string, http_redirect_callback_type>::iterator iter404 =
114 callbacks().find(
"404");
115 std::pair<std::string, std::string> returnval;
116 if (iter404 != callbacks().end()) returnval = iter404->second(variable_map);
118 callback_lock().rdunlock();
120 std::string ctype = returnval.first;
121 std::string body = returnval.second;
124 "HTTP/1.1 404 Not Found\r\n"
125 "Access-Control-Allow-Origin: *\r\n"
126 "Content-Type: %s\r\n"
127 "Content-Length: %d\r\n"
129 ctype.c_str(), (int)body.length());
130 mg_write(conn, body.c_str(), body.length());
144 std::pair<std::string, std::string>
145 four_oh_four(std::map<std::string, std::string>& varmap) {
146 return std::make_pair(std::string(
"text/html"),
147 std::string(
"Page Not Found"));
154 std::pair<std::string, std::string>
155 echo(std::map<std::string, std::string>& varmap) {
156 std::stringstream ret;
157 std::map<std::string, std::string>::iterator iter = varmap.begin();
159 while (iter != varmap.end()) {
160 ret << iter->first <<
" = " << iter->second <<
"<br>\n";
165 return std::make_pair(std::string(
"text/html"), ret.str());
168 std::pair<std::string, std::string>
169 index_page(std::map<std::string, std::string>& varmap) {
170 std::stringstream ret;
172 ret <<
"<h3>Registered Handlers:</h3>\n";
173 callback_lock().readlock();
174 std::map<std::string, http_redirect_callback_type>::const_iterator iter =
176 while (iter != callbacks().end()) {
178 if (iter->first !=
"") {
179 ret << iter->first <<
"<br>\n";
183 callback_lock().rdunlock();
186 return std::make_pair(std::string(
"text/html"), ret.str());
190 static void fill_builtin_callbacks() {
191 callbacks()[
"404"] = four_oh_four;
192 callbacks()[
"echo"] = echo;
193 callbacks()[
""] = index_page;
194 callbacks()[
"index.html"] = index_page;
200 callback_lock().writelock();
201 callbacks()[page] = callback;
202 callback_lock().wrunlock();
206 if (dc_impl::get_last_dc_procid() == 0) {
207 const char *options[] = {
"listening_ports",
"8090", NULL};
208 metric_context = mg_start(process_request, (
void*)(&(callbacks())), options);
209 if(metric_context == NULL) {
210 logstream(
LOG_ERROR) <<
"Unable to launch metrics server on port 8090. "
211 <<
"Metrics server will not be available" << std::endl;
214 fill_builtin_callbacks();
217 std::string strhostname;
218 if (gethostname(hostname, 1024) == 0) strhostname = hostname;
219 logstream(
LOG_EMPH) <<
"Metrics server now listening on "
220 <<
"http://" << strhostname <<
":8090" << std::endl;
225 if (dc_impl::get_last_dc_procid() == 0 && metric_context != NULL) {
226 std::cout <<
"Metrics server stopping." << std::endl;
227 mg_stop(metric_context);
232 if (dc_impl::get_last_dc_procid() == 0 && metric_context != NULL) {
235 logstream(
LOG_EMPH) <<
"Hit Ctrl-D to stop the metrics server" << std::endl;
236 while (fgets(buff, 128, stdin) != NULL );