The Battle for Wesnoth  1.13.4+dev
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
network_worker.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2003 - 2016 by David White <[email protected]>
3  Part of the Battle for Wesnoth Project http://www.wesnoth.org/
4 
5  This program is free software; you can redistribute it and/or modify
6  it under the terms of the GNU General Public License as published by
7  the Free Software Foundation; either version 2 of the License, or
8  (at your option) any later version.
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY.
11 
12  See the COPYING file for more details.
13 */
14 
15 /**
16  * Network worker handles data transfers in threads
17  * Remember to use mutexs as little as possible
18  * All global vars should be used in mutex
19  * FIXME: @todo All code which holds a mutex should run O(1) time
20  * for scalability. Implement read/write locks.
21  * (postponed for 1.5)
22  */
23 
24 #include "global.hpp"
25 
26 #include "scoped_resource.hpp"
27 #include "log.hpp"
28 #include "network_worker.hpp"
29 #include "filesystem.hpp"
30 #include "thread.hpp"
32 #include "serialization/parser.hpp"
33 #include "wesconfig.h"
34 
35 #include <boost/iostreams/filter/gzip.hpp>
36 #include <boost/exception/info.hpp>
37 
38 #include <cerrno>
39 #include <deque>
40 #include <sstream>
41 
42 #ifdef HAVE_SENDFILE
43 #include <sys/sendfile.h>
44 #include <netinet/in.h>
45 #include <netinet/tcp.h>
46 #endif
47 
48 
49 #if defined(_WIN32) || defined(__WIN32__) || defined (WIN32)
50 # undef INADDR_ANY
51 # undef INADDR_BROADCAST
52 # undef INADDR_NONE
53 # ifndef NOMINMAX
54 # define NOMINMAX
55 # endif
56 # include <windows.h>
57 # define USE_SELECT 1
58 typedef int socklen_t;
59 #else
60 # include <sys/types.h>
61 # include <sys/socket.h>
62 # include <fcntl.h>
63 # define SOCKET int
64 # ifdef HAVE_POLL_H
65 # define USE_POLL 1
66 # include <poll.h>
67 # elif defined(HAVE_SYS_POLL_H)
68 # define USE_POLL 1
69 # include <sys/poll.h>
70 # endif
71 # ifndef USE_POLL
72 # define USE_SELECT 1
73 # ifdef HAVE_SYS_SELECT_H
74 # include <sys/select.h>
75 # else
76 # include <sys/time.h>
77 # include <sys/types.h>
78 # include <unistd.h>
79 # endif
80 # endif
81 #endif
82 
83 #pragma GCC diagnostic ignored "-Wold-style-cast"
84 
85 static lg::log_domain log_network("network");
86 #define DBG_NW LOG_STREAM(debug, log_network)
87 #define LOG_NW LOG_STREAM(info, log_network)
88 #define ERR_NW LOG_STREAM(err, log_network)
89 
90 namespace {
91 struct _TCPsocket {
92  int ready;
94  IPaddress remoteAddress;
95  IPaddress localAddress;
96  int sflag;
97 };
98 
99 #ifndef NUM_SHARDS
100 #define NUM_SHARDS 1
101 #endif
102 
103 unsigned int waiting_threads[NUM_SHARDS];
104 size_t min_threads = 0;
105 size_t max_threads = 0;
106 
107 size_t get_shard(TCPsocket sock) { return reinterpret_cast<uintptr_t>(sock)%NUM_SHARDS; }
108 
109 struct buffer {
110  explicit buffer(TCPsocket sock) :
111  sock(sock),
112  config_buf(),
113  config_error(""),
114  stream(),
115  raw_buffer()
116  {}
117 
118  TCPsocket sock;
119  mutable config config_buf;
120  std::string config_error;
121  std::ostringstream stream;
122 
123  /**
124  * This field is used if we're sending a raw buffer instead of through a
125  * config object. It will contain the entire contents of the buffer being
126  * sent.
127  */
128  std::vector<char> raw_buffer;
129 };
130 
131 
132 bool managed = false, raw_data_only = false;
133 typedef std::vector< buffer* > buffer_set;
134 buffer_set outgoing_bufs[NUM_SHARDS];
135 
136 /** a queue of sockets that we are waiting to receive on */
137 typedef std::vector<TCPsocket> receive_list;
138 receive_list pending_receives[NUM_SHARDS];
139 
140 typedef std::deque<buffer*> received_queue;
141 received_queue received_data_queue; // receive_mutex
142 
143 enum SOCKET_STATE { SOCKET_READY, SOCKET_LOCKED, SOCKET_ERRORED, SOCKET_INTERRUPT };
144 typedef std::map<TCPsocket,SOCKET_STATE> socket_state_map;
145 typedef std::map<TCPsocket, std::pair<network::statistics,network::statistics> > socket_stats_map;
146 
147 socket_state_map sockets_locked[NUM_SHARDS];
148 socket_stats_map transfer_stats; // stats_mutex
149 
150 int socket_errors[NUM_SHARDS];
151 threading::mutex* shard_mutexes[NUM_SHARDS];
152 threading::mutex* stats_mutex = nullptr;
153 threading::mutex* received_mutex = nullptr;
155 
156 std::map<Uint32,threading::thread*> threads[NUM_SHARDS];
157 std::vector<Uint32> to_clear[NUM_SHARDS];
158 #if 0
159 int system_send_buffer_size = 0;
160 #endif
161 bool network_use_system_sendfile = false;
162 
163 int receive_bytes(TCPsocket s, char* buf, size_t nbytes)
164 {
165 #ifdef NETWORK_USE_RAW_SOCKETS
166  _TCPsocket* sock = reinterpret_cast<_TCPsocket*>(s);
167  int res = 0;
168  do {
169  errno = 0;
170  res = recv(sock->channel, buf, nbytes, MSG_DONTWAIT);
171  } while(errno == EINTR);
172  sock->ready = 0;
173  return res;
174 #else
175  return SDLNet_TCP_Recv(s, buf, nbytes);
176 #endif
177 }
178 
179 #if 0
180 void check_send_buffer_size(TCPsocket& s)
181 {
182  if (system_send_buffer_size)
183  return;
184  _TCPsocket* sock = reinterpret_cast<_TCPsocket*>(s);
185  socklen_t len = sizeof(system_send_buffer_size);
186 #ifdef _WIN32
187  getsockopt(sock->channel, SOL_SOCKET, SO_RCVBUF,reinterpret_cast<char*>(&system_send_buffer_size), &len);
188 #else
189  getsockopt(sock->channel, SOL_SOCKET, SO_RCVBUF,&system_send_buffer_size, &len);
190 #endif
191  --system_send_buffer_size;
192  DBG_NW << "send buffer size: " << system_send_buffer_size << "\n";
193 }
194 #endif
195 bool receive_with_timeout(TCPsocket s, char* buf, size_t nbytes,
196  bool update_stats=false, int idle_timeout_ms=30000,
197  int total_timeout_ms=300000)
198 {
199 #if !defined(USE_POLL) && !defined(USE_SELECT)
200  int startTicks = SDL_GetTicks();
201  int time_used = 0;
202 #endif
203  int timeout_ms = idle_timeout_ms;
204  while(nbytes > 0) {
205  const int bytes_read = receive_bytes(s, buf, nbytes);
206  if(bytes_read == 0) {
207  return false;
208  } else if(bytes_read < 0) {
209 #if defined(EAGAIN) && !defined(_WIN32)
210  if(errno == EAGAIN)
211 #elif defined(_WIN32) && defined(WSAEWOULDBLOCK)
212  //it seems like 'errno == EWOULDBLOCK' compiles on msvc2010, but doesnt work properly at rumtime.
213  if(WSAGetLastError() == WSAEWOULDBLOCK)
214 #elif defined(EWOULDBLOCK)
215  if(errno == EWOULDBLOCK)
216 #else
217  // assume non-recoverable error.
218  if(false)
219 #endif
220  {
221 #ifdef USE_POLL
222  struct pollfd fd = { reinterpret_cast<_TCPsocket*>(s)->channel, POLLIN, 0 };
223  int poll_res;
224 
225  //we timeout of the poll every 100ms. This lets us check to
226  //see if we have been disconnected, in which case we should
227  //abort the receive.
228  const int poll_timeout = std::min(timeout_ms, 100);
229  do {
230  poll_res = poll(&fd, 1, poll_timeout);
231 
232  if(poll_res == 0) {
233  timeout_ms -= poll_timeout;
234  total_timeout_ms -= poll_timeout;
235  if(timeout_ms <= 0 || total_timeout_ms <= 0) {
236  //we've been waiting too long; abort the receive
237  //as having failed due to timeout.
238  return false;
239  }
240 
241  //check to see if we've been interrupted
242  const size_t shard = get_shard(s);
243  const threading::lock lock(*shard_mutexes[shard]);
244  socket_state_map::iterator lock_it = sockets_locked[shard].find(s);
245  assert(lock_it != sockets_locked[shard].end());
246  if(lock_it->second == SOCKET_INTERRUPT) {
247  return false;
248  }
249  }
250 
251  } while(poll_res == 0 || (poll_res == -1 && errno == EINTR));
252 
253  if (poll_res < 1)
254  return false;
255 #elif defined(USE_SELECT)
256  int retval;
257  const int select_timeout = std::min(timeout_ms, 100);
258  do {
259  fd_set readfds;
260  FD_ZERO(&readfds);
261  FD_SET(((_TCPsocket*)s)->channel, &readfds);
262  struct timeval tv;
263  tv.tv_sec = select_timeout/1000;
264  tv.tv_usec = select_timeout % 1000 * 1000;
265  retval = select(((_TCPsocket*)s)->channel + 1, &readfds, nullptr, nullptr, &tv);
266  DBG_NW << "select retval: " << retval << ", timeout idle " << timeout_ms
267  << " total " << total_timeout_ms << " (ms)\n";
268  if(retval == 0) {
269  timeout_ms -= select_timeout;
270  total_timeout_ms -= select_timeout;
271  if(timeout_ms <= 0 || total_timeout_ms <= 0) {
272  //we've been waiting too long; abort the receive
273  //as having failed due to timeout.
274  return false;
275  }
276 
277  //check to see if we've been interrupted
278  const size_t shard = get_shard(s);
279  const threading::lock lock(*shard_mutexes[shard]);
280  socket_state_map::iterator lock_it = sockets_locked[shard].find(s);
281  assert(lock_it != sockets_locked[shard].end());
282  if(lock_it->second == SOCKET_INTERRUPT) {
283  return false;
284  }
285  }
286  } while(retval == 0 || (retval == -1 && errno == EINTR));
287 
288  if (retval < 1) {
289  return false;
290  }
291 #else
292  //TODO: consider replacing this with a select call
293  time_used = SDL_GetTicks() - startTicks;
294  if(time_used >= timeout_ms) {
295  return false;
296  }
297  SDL_Delay(20);
298 #endif
299  } else {
300  return false;
301  }
302  } else {
303  timeout_ms = idle_timeout_ms;
304  buf += bytes_read;
305  if(update_stats && !raw_data_only) {
306  const threading::lock lock(*stats_mutex);
307  transfer_stats[s].second.transfer(static_cast<size_t>(bytes_read));
308  }
309 
310  if(bytes_read > static_cast<int>(nbytes)) {
311  return false;
312  }
313  nbytes -= bytes_read;
314  // We got some data from server so reset start time so slow conenction won't timeout.
315 #if !defined(USE_POLL) && !defined(USE_SELECT)
316  startTicks = SDL_GetTicks();
317 #endif
318  }
319  {
320  const size_t shard = get_shard(s);
321  const threading::lock lock(*shard_mutexes[shard]);
322  socket_state_map::iterator lock_it = sockets_locked[shard].find(s);
323  assert(lock_it != sockets_locked[shard].end());
324  if(lock_it->second == SOCKET_INTERRUPT) {
325  return false;
326  }
327  }
328  }
329 
330  return true;
331 }
332 
333 /**
334  * @todo See if the TCPsocket argument should be removed.
335  */
336 static void output_to_buffer(TCPsocket /*sock*/, const config& cfg, std::ostringstream& compressor)
337 {
338  config_writer writer(compressor, true);
339  writer.write(cfg);
340 }
341 
342 static void make_network_buffer(const char* input, int len, std::vector<char>& buf)
343 {
344  buf.resize(4 + len);
345  SDLNet_Write32(len, &buf[0]);
346  memcpy(&buf[4], input, len);
347 }
348 
349 static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf, int in_size = -1)
350 {
351 // check_send_buffer_size(sock);
352  size_t upto = 0;
353  size_t size = buf.size();
354  if (in_size != -1)
355  size = in_size;
356  int send_len = 0;
357 
358  if (!raw_data_only)
359  {
360  const threading::lock lock(*stats_mutex);
361  transfer_stats[sock].first.fresh_current(size);
362  }
363 
364  while(true) {
365  {
366  const size_t shard = get_shard(sock);
367  // check if the socket is still locked
368  const threading::lock lock(*shard_mutexes[shard]);
369  if(sockets_locked[shard][sock] != SOCKET_LOCKED)
370  {
371  return SOCKET_ERRORED;
372  }
373  }
374  send_len = static_cast<int>(size - upto);
375  const int res = SDLNet_TCP_Send(sock, &buf[upto],send_len);
376 
377 
378  if( res == send_len) {
379  if (!raw_data_only)
380  {
381  const threading::lock lock(*stats_mutex);
382  transfer_stats[sock].first.transfer(static_cast<size_t>(res));
383  }
384  return SOCKET_READY;
385  }
386 #if defined(_WIN32)
387  if(WSAGetLastError() == WSAEWOULDBLOCK)
388 #elif defined(EAGAIN)
389  if(errno == EAGAIN)
390 #elif defined(EWOULDBLOCK)
391  if(errno == EWOULDBLOCK)
392 #endif
393  {
394  // update how far we are
395  upto += static_cast<size_t>(res);
396  if (!raw_data_only)
397  {
398  const threading::lock lock(*stats_mutex);
399  transfer_stats[sock].first.transfer(static_cast<size_t>(res));
400  }
401 
402 #ifdef USE_POLL
403  struct pollfd fd = { (reinterpret_cast<_TCPsocket*>(sock))->channel, POLLOUT, 0 };
404  int poll_res;
405  do {
406  poll_res = poll(&fd, 1, 60000);
407  } while(poll_res == -1 && errno == EINTR);
408 
409 
410  if(poll_res > 0)
411  continue;
412 #elif defined(USE_SELECT)
413  fd_set writefds;
414  FD_ZERO(&writefds);
415  FD_SET(((_TCPsocket*)sock)->channel, &writefds);
416  int retval;
417  struct timeval tv;
418  tv.tv_sec = 60;
419  tv.tv_usec = 0;
420 
421  do {
422  retval = select(((_TCPsocket*)sock)->channel + 1, nullptr, &writefds, nullptr, &tv);
423  } while(retval == -1 && errno == EINTR);
424 
425  if(retval > 0)
426  continue;
427 #endif
428  }
429 
430  return SOCKET_ERRORED;
431  }
432 }
433 
434 #ifdef HAVE_SENDFILE
435 
436 #ifdef TCP_CORK
437  struct cork_setter {
438  cork_setter(int socket) : cork_(1), socket_(socket)
439  {
440  setsockopt(socket_, IPPROTO_TCP, TCP_CORK, &cork_, sizeof(cork_));;
441  }
442  ~cork_setter()
443  {
444  cork_ = 0;
445  setsockopt(socket_, IPPROTO_TCP, TCP_CORK, &cork_, sizeof(cork_));
446  }
447  private:
448  int cork_;
449  int socket_;
450  };
451 #else
452  struct cork_setter
453  {
454  cork_setter(int) {}
455  };
456 #endif
457 
458 struct close_fd {
459  void operator()(int fd) const { close(fd); }
460 };
461 typedef util::scoped_resource<int, close_fd> scoped_fd;
462 #endif
463 
464 static SOCKET_STATE send_file(buffer* buf)
465 {
466  size_t upto = 0;
467  size_t filesize = filesystem::file_size(buf->config_error);
468 #ifdef HAVE_SENDFILE
469  // implements linux sendfile support
470  LOG_NW << "send_file use system sendfile: " << (network_use_system_sendfile?"yes":"no") << "\n";
471  if (network_use_system_sendfile)
472  {
473  std::vector<char> buffer;
474  buffer.resize(4);
475  SDLNet_Write32(filesize,&buffer[0]);
476  int socket = reinterpret_cast<_TCPsocket*>(buf->sock)->channel;
477  const scoped_fd in_file(open(buf->config_error.c_str(), O_RDONLY));
478  cork_setter set_socket_cork(socket);
479  int poll_res;
480  struct pollfd fd = {socket, POLLOUT, 0 };
481  do {
482  poll_res = poll(&fd, 1, 600000);
483  } while(poll_res == -1 && errno == EINTR);
484 
486  if (poll_res > 0)
487  result = send_buffer(buf->sock, buffer, 4);
488  else
489  result = SOCKET_ERRORED;
490 
491 
492  if (result != SOCKET_READY)
493  {
494  return result;
495  }
496  result = SOCKET_READY;
497 
498  while (true)
499  {
500 
501  do {
502  poll_res = poll(&fd, 1, 600000);
503  } while(poll_res == -1 && errno == EINTR);
504 
505  if (poll_res <= 0 )
506  {
507  result = SOCKET_ERRORED;
508  break;
509  }
510 
511 
512  int bytes = ::sendfile(socket, in_file, 0, filesize);
513 
514  if (bytes == -1)
515  {
516  if (errno == EAGAIN)
517  continue;
518  result = SOCKET_ERRORED;
519  break;
520  }
521  upto += bytes;
522 
523 
524  if (upto == filesize)
525  {
526  break;
527  }
528  }
529 
530  return result;
531  }
532 #endif
533  // default sendfile implementation
534  // if no system implementation is enabled
535  int send_size = 0;
536  // reserve 1024*8 bytes buffer
537  buf->raw_buffer.resize(std::min<size_t>(1024*8, filesize));
538  SDLNet_Write32(filesize,&buf->raw_buffer[0]);
539  filesystem::scoped_istream file_stream = filesystem::istream_file(buf->config_error);
540  SOCKET_STATE result = send_buffer(buf->sock, buf->raw_buffer, 4);
541 
542  if (!file_stream->good()) {
543  ERR_NW << "send_file: Couldn't open file " << buf->config_error << std::endl;
544  }
545  if (result != SOCKET_READY)
546  {
547  return result;
548  }
549  while (file_stream->good())
550  {
551  // read data
552  file_stream->read(&buf->raw_buffer[0], buf->raw_buffer.size());
553  send_size = file_stream->gcount();
554  upto += send_size;
555  // send data to socket
556  result = send_buffer(buf->sock, buf->raw_buffer, send_size);
557  if (result != SOCKET_READY)
558  {
559  break;
560  }
561  if (upto == filesize)
562  {
563  break;
564  }
565 
566  }
567  if (upto != filesize && !file_stream->good()) {
568  ERR_NW << "send_file failed because the stream from file '"
569  << buf->config_error << "' is not good. Sent up to: " << upto
570  << " of file size: " << filesize << "\n";
571  }
572  return result;
573 }
574 
575 static SOCKET_STATE receive_buf(TCPsocket sock, std::vector<char>& buf)
576 {
577  union {
578  char buf[4] ALIGN_4;
579  Uint32 num;
580  } num_buf;
581  bool res = receive_with_timeout(sock,num_buf.buf,4,false);
582 
583  if(!res) {
584  return SOCKET_ERRORED;
585  }
586 
587  const int len = SDLNet_Read32(&num_buf);
588 
589  if(len < 1 || len > 100000000) {
590  return SOCKET_ERRORED;
591  }
592 
593  buf.resize(len);
594  char* beg = &buf[0];
595  const char* const end = beg + len;
596 
597  if (!raw_data_only)
598  {
599  const threading::lock lock(*stats_mutex);
600  transfer_stats[sock].second.fresh_current(len);
601  }
602 
603  res = receive_with_timeout(sock, beg, end - beg, true);
604  if(!res) {
605  return SOCKET_ERRORED;
606  }
607 
608  return SOCKET_READY;
609 }
610 
611 inline void check_socket_result(TCPsocket& sock, SOCKET_STATE& result)
612 {
613  const size_t shard = get_shard(sock);
614  const threading::lock lock(*shard_mutexes[shard]);
615  socket_state_map::iterator lock_it = sockets_locked[shard].find(sock);
616  assert(lock_it != sockets_locked[shard].end());
617  lock_it->second = result;
618  if(result == SOCKET_ERRORED) {
619  ++socket_errors[shard];
620  }
621 }
622 
623 static int process_queue(void* shard_num)
624 {
625  size_t shard = reinterpret_cast<uintptr_t>(shard_num);
626  DBG_NW << "thread started...\n";
627  for(;;) {
628 
629  //if we find a socket to send data to, sent_buf will be non-nullptr. If we find a socket
630  //to receive data from, sent_buf will be nullptr. 'sock' will always refer to the socket
631  //that data is being sent to/received from
632  TCPsocket sock = nullptr;
633  buffer* sent_buf = 0;
634 
635  {
636  const threading::lock lock(*shard_mutexes[shard]);
637  while(managed && !to_clear[shard].empty()) {
638  Uint32 tmp = to_clear[shard].back();
639  to_clear[shard].pop_back();
640  threading::thread *zombie = threads[shard][tmp];
641  threads[shard].erase(tmp);
642  delete zombie;
643 
644  }
645  if(min_threads && waiting_threads[shard] >= min_threads) {
646  DBG_NW << "worker thread exiting... not enough jobs\n";
647  to_clear[shard].push_back(threading::get_current_thread_id());
648  return 0;
649  }
650  waiting_threads[shard]++;
651  for(;;) {
652 
653  buffer_set::iterator itor = outgoing_bufs[shard].begin(), itor_end = outgoing_bufs[shard].end();
654  for(; itor != itor_end; ++itor) {
655  socket_state_map::iterator lock_it = sockets_locked[shard].find((*itor)->sock);
656  assert(lock_it != sockets_locked[shard].end());
657  if(lock_it->second == SOCKET_READY) {
658  lock_it->second = SOCKET_LOCKED;
659  sent_buf = *itor;
660  sock = sent_buf->sock;
661  outgoing_bufs[shard].erase(itor);
662  break;
663  }
664  }
665 
666  if(sock == nullptr) {
667  receive_list::iterator itor = pending_receives[shard].begin(), itor_end = pending_receives[shard].end();
668  for(; itor != itor_end; ++itor) {
669  socket_state_map::iterator lock_it = sockets_locked[shard].find(*itor);
670  assert(lock_it != sockets_locked[shard].end());
671  if(lock_it->second == SOCKET_READY) {
672  lock_it->second = SOCKET_LOCKED;
673  sock = *itor;
674  pending_receives[shard].erase(itor);
675  break;
676  }
677  }
678  }
679 
680  if(sock != nullptr) {
681  break;
682  }
683 
684  if(managed == false) {
685  DBG_NW << "worker thread exiting...\n";
686  waiting_threads[shard]--;
687  to_clear[shard].push_back(threading::get_current_thread_id());
688  return 0;
689  }
690 
691  cond[shard]->wait(*shard_mutexes[shard]); // temporarily release the mutex and wait for a buffer
692  }
693  waiting_threads[shard]--;
694  // if we are the last thread in the pool, create a new one
695  if(!waiting_threads[shard] && managed == true) {
696  // max_threads of 0 is unlimited
697  if(!max_threads || max_threads >threads[shard].size()) {
698  threading::thread * tmp = new threading::thread(process_queue,shard_num);
699  threads[shard][tmp->get_id()] =tmp;
700  }
701  }
702  }
703 
704  assert(sock);
705 
706  DBG_NW << "thread found a buffer...\n";
707 
708  SOCKET_STATE result = SOCKET_READY;
709  std::vector<char> buf;
710 
711  if(sent_buf) {
712 
713  if(!sent_buf->config_error.empty())
714  {
715  // We have file to send over net
716  result = send_file(sent_buf);
717  } else {
718  if(sent_buf->raw_buffer.empty()) {
719  const std::string &value = sent_buf->stream.str();
720  make_network_buffer(value.c_str(), value.size(), sent_buf->raw_buffer);
721  }
722 
723  result = send_buffer(sent_buf->sock, sent_buf->raw_buffer);
724  }
725  delete sent_buf;
726  } else {
727  result = receive_buf(sock,buf);
728  }
729 
730 
731  if(result != SOCKET_READY || buf.empty())
732  {
733  check_socket_result(sock,result);
734  continue;
735  }
736  //if we received data, add it to the queue
737  buffer* received_data = new buffer(sock);
738 
739  if(raw_data_only) {
740  received_data->raw_buffer.swap(buf);
741  } else {
742  std::string buffer(buf.begin(), buf.end());
743  std::istringstream stream(buffer);
744  try {
745  read_gz(received_data->config_buf, stream);
746  } catch(boost::iostreams::gzip_error&) {
747  received_data->config_error = "Malformed compressed data";
748  } catch(config::error &e) {
749  received_data->config_error = e.message;
750  }
751  }
752 
753  {
754  // Now add data
755  const threading::lock lock_received(*received_mutex);
756  received_data_queue.push_back(received_data);
757  }
758  check_socket_result(sock,result);
759  }
760  // unreachable
761 }
762 
763 } //anonymous namespace
764 
766 {
767 
768 manager::manager(size_t p_min_threads,size_t p_max_threads) : active_(!managed)
769 {
770  if(active_) {
771  managed = true;
772  for(int i = 0; i != NUM_SHARDS; ++i) {
773  shard_mutexes[i] = new threading::mutex();
774  cond[i] = new threading::condition();
775  }
776  stats_mutex = new threading::mutex();
777  received_mutex = new threading::mutex();
778 
779  min_threads = p_min_threads;
780  max_threads = p_max_threads;
781 
782  for(size_t shard = 0; shard != NUM_SHARDS; ++shard) {
783  const threading::lock lock(*shard_mutexes[shard]);
784  for(size_t n = 0; n != p_min_threads; ++n) {
785  threading::thread * tmp = new threading::thread(process_queue,reinterpret_cast<void*>(shard));
786  threads[shard][tmp->get_id()] = tmp;
787  }
788  }
789  }
790 }
791 
793 {
794  if(active_) {
795  managed = false;
796 
797  for(size_t shard = 0; shard != NUM_SHARDS; ++shard) {
798  {
799  const threading::lock lock(*shard_mutexes[shard]);
800  socket_errors[shard] = 0;
801  }
802 
803  cond[shard]->notify_all();
804 
805  for(std::map<Uint32,threading::thread*>::const_iterator i = threads[shard].begin(); i != threads[shard].end(); ++i) {
806 
807  DBG_NW << "waiting for thread " << i->first << " to exit...\n";
808  delete i->second;
809  DBG_NW << "thread exited...\n";
810  }
811 
812  // Condition variables must be deleted first as
813  // they make reference to mutexs. If the mutexs
814  // are destroyed first, the condition variables
815  // will access memory already freed by way of
816  // stale mutex. Bad things will follow. ;)
817  threads[shard].clear();
818  // Have to clean up to_clear so no bogus clearing of threads
819  to_clear[shard].clear();
820  delete cond[shard];
821  cond[shard] = nullptr;
822  delete shard_mutexes[shard];
823  shard_mutexes[shard] = nullptr;
824  }
825 
826  delete stats_mutex;
827  delete received_mutex;
828  stats_mutex = 0;
829  received_mutex = 0;
830 
831  for(int i = 0; i != NUM_SHARDS; ++i) {
832  sockets_locked[i].clear();
833  }
834  transfer_stats.clear();
835 
836  DBG_NW << "exiting manager::~manager()\n";
837  }
838 }
839 
841 {
843  stats.npending_sends = 0;
844  stats.nbytes_pending_sends = 0;
845  for(size_t shard = 0; shard != NUM_SHARDS; ++shard) {
846  const threading::lock lock(*shard_mutexes[shard]);
847  stats.npending_sends += outgoing_bufs[shard].size();
848  for(buffer_set::const_iterator i = outgoing_bufs[shard].begin(); i != outgoing_bufs[shard].end(); ++i) {
849  stats.nbytes_pending_sends += (*i)->raw_buffer.size();
850  }
851  }
852 
853  return stats;
854 }
855 
857 {
858  raw_data_only = true;
859 }
860 
862 {
863  network_use_system_sendfile = use;
864 }
865 
866 void receive_data(TCPsocket sock)
867 {
868  {
869  const size_t shard = get_shard(sock);
870  const threading::lock lock(*shard_mutexes[shard]);
871  pending_receives[shard].push_back(sock);
872 
873  socket_state_map::const_iterator i = sockets_locked[shard].insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
874  if(i->second == SOCKET_READY || i->second == SOCKET_ERRORED) {
875  cond[shard]->notify_one();
876  }
877  }
878 }
879 
880 TCPsocket get_received_data(TCPsocket sock, config& cfg, network::bandwidth_in_ptr& bandwidth_in)
881 {
882  assert(!raw_data_only);
883  const threading::lock lock_received(*received_mutex);
884  received_queue::iterator itor = received_data_queue.begin();
885  if(sock != nullptr) {
886  for(; itor != received_data_queue.end(); ++itor) {
887  if((*itor)->sock == sock) {
888  break;
889  }
890  }
891  }
892 
893  if(itor == received_data_queue.end()) {
894  return nullptr;
895  } else if (!(*itor)->config_error.empty()){
896  // throw the error in parent thread
897  std::string error = (*itor)->config_error;
898  buffer* buf = *itor;
899  TCPsocket err_sock = (*itor)->sock;
900  received_data_queue.erase(itor);
901  delete buf;
902  throw config::error(error) << network::tcpsocket_info(err_sock);
903  } else {
904  cfg.swap((*itor)->config_buf);
905  const TCPsocket res = (*itor)->sock;
906  buffer* buf = *itor;
907  bandwidth_in.reset(new network::bandwidth_in((*itor)->raw_buffer.size()));
908  received_data_queue.erase(itor);
909  delete buf;
910  return res;
911  }
912 }
913 
914 TCPsocket get_received_data(std::vector<char>& out)
915 {
916  assert(raw_data_only);
917  const threading::lock lock_received(*received_mutex);
918  if(received_data_queue.empty()) {
919  return nullptr;
920  }
921 
922  buffer* buf = received_data_queue.front();
923  received_data_queue.pop_front();
924  out.swap(buf->raw_buffer);
925  const TCPsocket res = buf->sock;
926  delete buf;
927  return res;
928 }
929 
930 static void queue_buffer(TCPsocket sock, buffer* queued_buf)
931 {
932  const size_t shard = get_shard(sock);
933  const threading::lock lock(*shard_mutexes[shard]);
934  outgoing_bufs[shard].push_back(queued_buf);
935  socket_state_map::const_iterator i = sockets_locked[shard].insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
936  if(i->second == SOCKET_READY || i->second == SOCKET_ERRORED) {
937  cond[shard]->notify_one();
938  }
939 
940 }
941 
942 void queue_raw_data(TCPsocket sock, const char* buf, int len)
943 {
944  buffer* queued_buf = new buffer(sock);
945  assert(*buf == 31);
946  make_network_buffer(buf, len, queued_buf->raw_buffer);
947  queue_buffer(sock, queued_buf);
948 }
949 
950 
951 void queue_file(TCPsocket sock, const std::string& filename)
952 {
953  buffer* queued_buf = new buffer(sock);
954  queued_buf->config_error = filename;
955  queue_buffer(sock, queued_buf);
956 }
957 
958 size_t queue_data(TCPsocket sock,const config& buf, const std::string& packet_type)
959 {
960  DBG_NW << "queuing data...\n";
961 
962  buffer* queued_buf = new buffer(sock);
963  output_to_buffer(sock, buf, queued_buf->stream);
964  const size_t size = queued_buf->stream.str().size();
965 
966  network::add_bandwidth_out(packet_type, size);
967  queue_buffer(sock, queued_buf);
968  return size;
969 }
970 
971 namespace
972 {
973 
974 /** Caller has to make sure to own the mutex for this shard */
975 void remove_buffers(TCPsocket sock)
976 {
977  {
978  const size_t shard = get_shard(sock);
979  for(buffer_set::iterator i = outgoing_bufs[shard].begin(); i != outgoing_bufs[shard].end();) {
980  if ((*i)->sock == sock)
981  {
982  buffer* buf = *i;
983  i = outgoing_bufs[shard].erase(i);
984  delete buf;
985  }
986  else
987  {
988  ++i;
989  }
990  }
991  }
992 
993  {
994  const threading::lock lock_receive(*received_mutex);
995 
996  for(received_queue::iterator j = received_data_queue.begin(); j != received_data_queue.end(); ) {
997  if((*j)->sock == sock) {
998  buffer *buf = *j;
999  j = received_data_queue.erase(j);
1000  delete buf;
1001  } else {
1002  ++j;
1003  }
1004  }
1005  }
1006 }
1007 
1008 } // anonymous namespace
1009 
1010 bool is_locked(const TCPsocket sock) {
1011  const size_t shard = get_shard(sock);
1012  const threading::lock lock(*shard_mutexes[shard]);
1013  const socket_state_map::iterator lock_it = sockets_locked[shard].find(sock);
1014  if (lock_it == sockets_locked[shard].end()) return false;
1015  return (lock_it->second == SOCKET_LOCKED);
1016 }
1017 
1018 bool close_socket(TCPsocket sock)
1019 {
1020  {
1021  const size_t shard = get_shard(sock);
1022  const threading::lock lock(*shard_mutexes[shard]);
1023 
1024  pending_receives[shard].erase(std::remove(pending_receives[shard].begin(),pending_receives[shard].end(),sock),pending_receives[shard].end());
1025 
1026  const socket_state_map::iterator lock_it = sockets_locked[shard].find(sock);
1027  if(lock_it == sockets_locked[shard].end()) {
1028  remove_buffers(sock);
1029  return true;
1030  }
1031  if (!(lock_it->second == SOCKET_LOCKED || lock_it->second == SOCKET_INTERRUPT)) {
1032  sockets_locked[shard].erase(lock_it);
1033  remove_buffers(sock);
1034  return true;
1035  } else {
1036  lock_it->second = SOCKET_INTERRUPT;
1037  return false;
1038  }
1039 
1040  }
1041 
1042 
1043 }
1044 
1045 TCPsocket detect_error()
1046 {
1047  for(size_t shard = 0; shard != NUM_SHARDS; ++shard) {
1048  const threading::lock lock(*shard_mutexes[shard]);
1049  if(socket_errors[shard] > 0) {
1050  for(socket_state_map::iterator i = sockets_locked[shard].begin(); i != sockets_locked[shard].end();) {
1051  if(i->second == SOCKET_ERRORED) {
1052  --socket_errors[shard];
1053  const TCPsocket sock = i->first;
1054  sockets_locked[shard].erase(i++);
1055  pending_receives[shard].erase(std::remove(pending_receives[shard].begin(),pending_receives[shard].end(),sock),pending_receives[shard].end());
1056  remove_buffers(sock);
1057  return sock;
1058  }
1059  else
1060  {
1061  ++i;
1062  }
1063  }
1064  }
1065 
1066  socket_errors[shard] = 0;
1067  }
1068 
1069  return 0;
1070 }
1071 
1072 std::pair<network::statistics,network::statistics> get_current_transfer_stats(TCPsocket sock)
1073 {
1074  const threading::lock lock(*stats_mutex);
1075  return transfer_stats[sock];
1076 }
1077 
1078 } // network_worker_pool namespace
1079 
1080 
void receive_data(TCPsocket sock)
Function to asynchronously received data to the given socket.
IPaddress remoteAddress
Definition: network.cpp:391
static l_noret error(LoadState *S, const char *why)
Definition: lundump.cpp:29
void queue_raw_data(TCPsocket sock, const char *buf, int len)
#define ERR_NW
GLenum GLenum GLenum input
Definition: glew.h:10668
static lg::log_domain log_network("network")
#define LOG_NW
scoped_resource: class template, functions, helper policies etc. for resource management.
void read_gz(config &cfg, std::istream &file, abstract_validator *validator)
might throw a std::ios_base::failure especially a gzip_error
Definition: parser.cpp:454
int ready
Definition: network.cpp:389
size_t queue_data(TCPsocket sock, const config &buf, const std::string &packet_type)
bool is_locked(const TCPsocket sock)
GLuint GLuint stream
Definition: glew.h:5239
manager(size_t min_threads, size_t max_threads)
TCPsocket get_received_data(TCPsocket sock, config &cfg, network::bandwidth_in_ptr &bandwidth_in)
void send_file(const std::string &filename, connection connection_num, const std::string &packet_type)
Definition: network.cpp:1071
void swap(config &cfg)
Definition: config.cpp:1518
#define ALIGN_4
Aligns a variable on a 4 byte boundary.
SOCKET_STATE
GLuint GLuint end
Definition: glew.h:1221
GLuint64EXT * result
Definition: glew.h:10727
#define SOCKET
Network worker handles data transfers in threads Remember to use mutexs as little as possible All glo...
Class for writing a config out to a file in pieces.
void queue_file(TCPsocket sock, const std::string &filename)
GLsizei const GLfloat * value
Definition: glew.h:1817
GLenum GLsizei len
Definition: glew.h:5662
std::istream * istream_file(const std::string &fname, bool treat_failure_as_error=true)
A class template, scoped_resource, designed to implement the Resource Acquisition Is Initialization (...
GLuint num
Definition: glew.h:2552
GLenum GLuint GLsizei const char * buf
Definition: glew.h:2498
GLuint buffer
Definition: glew.h:1648
#define NUM_SHARDS
network::pending_statistics get_pending_stats()
Some defines: VERSION, PACKAGE, MIN_SAVEGAME_VERSION.
GLuint res
Definition: glew.h:9258
#define DBG_NW
std::map< std::string, tfilter >::iterator itor
Definition: filter.cpp:199
size_t i
Definition: function.cpp:1057
Declarations for File-IO.
static int writer(lua_State *L, const void *b, size_t size, void *B)
Definition: lstrlib.cpp:166
static void queue_buffer(TCPsocket sock, buffer *queued_buf)
int file_size(const std::string &fname)
Returns the size of a file, or -1 if the file doesn't exist.
GLsizeiptr size
Definition: glew.h:1649
IPaddress localAddress
Definition: network.cpp:392
GLclampd n
Definition: glew.h:5903
boost::uint32_t get_current_thread_id()
Definition: thread.cpp:30
boost::uint32_t get_id()
Definition: thread.cpp:28
static int cond(LexState *ls)
Definition: lparser.cpp:1168
Standard logging facilities (interface).
GLenum condition
Definition: glew.h:9969
std::string message
Definition: exceptions.hpp:29
void set_use_system_sendfile(bool use)
const std::string remove
remove directive
#define e
boost::error_info< struct tag_tcpsocket, TCPsocket > tcpsocket_info
Definition: network.hpp:270
std::string::const_iterator iterator
Definition: tokenizer.hpp:21
A config object defines a single node in a WML file, with access to child nodes.
Definition: config.hpp:83
GLdouble s
Definition: glew.h:1358
int sflag
Definition: network.cpp:393
bool close_socket(TCPsocket sock)
GLsizei const GLcharARB ** string
Definition: glew.h:4503
bool wait(const mutex &m)
Definition: thread.cpp:108
std::pair< network::statistics, network::statistics > get_current_transfer_stats(TCPsocket sock)
void add_bandwidth_out(const std::string &packet_type, size_t len)
Definition: network.cpp:1052
channel
Definition: utils.hpp:250