35 #include <boost/iostreams/filter/gzip.hpp>
36 #include <boost/exception/info.hpp>
43 #include <sys/sendfile.h>
44 #include <netinet/in.h>
45 #include <netinet/tcp.h>
49 #if defined(_WIN32) || defined(__WIN32__) || defined (WIN32)
51 # undef INADDR_BROADCAST
58 typedef int socklen_t;
60 # include <sys/types.h>
61 # include <sys/socket.h>
67 # elif defined(HAVE_SYS_POLL_H)
69 # include <sys/poll.h>
73 # ifdef HAVE_SYS_SELECT_H
74 # include <sys/select.h>
76 # include <sys/time.h>
77 # include <sys/types.h>
83 #pragma GCC diagnostic ignored "-Wold-style-cast"
86 #define DBG_NW LOG_STREAM(debug, log_network)
87 #define LOG_NW LOG_STREAM(info, log_network)
88 #define ERR_NW LOG_STREAM(err, log_network)
104 size_t min_threads = 0;
105 size_t max_threads = 0;
107 size_t get_shard(TCPsocket sock) {
return reinterpret_cast<uintptr_t
>(sock)%
NUM_SHARDS; }
110 explicit buffer(TCPsocket sock) :
119 mutable config config_buf;
121 std::ostringstream
stream;
128 std::vector<char> raw_buffer;
132 bool managed =
false, raw_data_only =
false;
133 typedef std::vector< buffer* > buffer_set;
137 typedef std::vector<TCPsocket> receive_list;
140 typedef std::deque<buffer*> received_queue;
141 received_queue received_data_queue;
143 enum SOCKET_STATE { SOCKET_READY, SOCKET_LOCKED, SOCKET_ERRORED, SOCKET_INTERRUPT };
144 typedef std::map<TCPsocket,SOCKET_STATE> socket_state_map;
145 typedef std::map<TCPsocket, std::pair<network::statistics,network::statistics> > socket_stats_map;
148 socket_stats_map transfer_stats;
156 std::map<Uint32,threading::thread*> threads[
NUM_SHARDS];
159 int system_send_buffer_size = 0;
161 bool network_use_system_sendfile =
false;
163 int receive_bytes(TCPsocket
s,
char*
buf,
size_t nbytes)
165 #ifdef NETWORK_USE_RAW_SOCKETS
166 _TCPsocket* sock =
reinterpret_cast<_TCPsocket*
>(
s);
170 res = recv(sock->channel, buf, nbytes, MSG_DONTWAIT);
171 }
while(errno == EINTR);
175 return SDLNet_TCP_Recv(s, buf, nbytes);
180 void check_send_buffer_size(TCPsocket& s)
182 if (system_send_buffer_size)
184 _TCPsocket* sock =
reinterpret_cast<_TCPsocket*
>(
s);
185 socklen_t
len =
sizeof(system_send_buffer_size);
187 getsockopt(sock->channel, SOL_SOCKET, SO_RCVBUF,reinterpret_cast<char*>(&system_send_buffer_size), &len);
189 getsockopt(sock->channel, SOL_SOCKET, SO_RCVBUF,&system_send_buffer_size, &len);
191 --system_send_buffer_size;
192 DBG_NW <<
"send buffer size: " << system_send_buffer_size <<
"\n";
195 bool receive_with_timeout(TCPsocket s,
char* buf,
size_t nbytes,
196 bool update_stats=
false,
int idle_timeout_ms=30000,
197 int total_timeout_ms=300000)
199 #if !defined(USE_POLL) && !defined(USE_SELECT)
200 int startTicks = SDL_GetTicks();
203 int timeout_ms = idle_timeout_ms;
205 const int bytes_read = receive_bytes(s, buf, nbytes);
206 if(bytes_read == 0) {
208 }
else if(bytes_read < 0) {
209 #if defined(EAGAIN) && !defined(_WIN32)
211 #elif defined(_WIN32) && defined(WSAEWOULDBLOCK)
213 if(WSAGetLastError() == WSAEWOULDBLOCK)
214 #elif defined(EWOULDBLOCK)
215 if(errno == EWOULDBLOCK)
222 struct pollfd fd = {
reinterpret_cast<_TCPsocket*
>(
s)->
channel, POLLIN, 0 };
228 const int poll_timeout = std::min(timeout_ms, 100);
230 poll_res = poll(&fd, 1, poll_timeout);
233 timeout_ms -= poll_timeout;
234 total_timeout_ms -= poll_timeout;
235 if(timeout_ms <= 0 || total_timeout_ms <= 0) {
242 const size_t shard = get_shard(s);
245 assert(lock_it != sockets_locked[shard].
end());
246 if(lock_it->second == SOCKET_INTERRUPT) {
251 }
while(poll_res == 0 || (poll_res == -1 && errno == EINTR));
255 #elif defined(USE_SELECT)
257 const int select_timeout = std::min(timeout_ms, 100);
261 FD_SET(((_TCPsocket*)s)->
channel, &readfds);
263 tv.tv_sec = select_timeout/1000;
264 tv.tv_usec = select_timeout % 1000 * 1000;
265 retval = select(((_TCPsocket*)s)->
channel + 1, &readfds,
nullptr,
nullptr, &tv);
266 DBG_NW <<
"select retval: " << retval <<
", timeout idle " << timeout_ms
267 <<
" total " << total_timeout_ms <<
" (ms)\n";
269 timeout_ms -= select_timeout;
270 total_timeout_ms -= select_timeout;
271 if(timeout_ms <= 0 || total_timeout_ms <= 0) {
278 const size_t shard = get_shard(s);
281 assert(lock_it != sockets_locked[shard].
end());
282 if(lock_it->second == SOCKET_INTERRUPT) {
286 }
while(retval == 0 || (retval == -1 && errno == EINTR));
293 time_used = SDL_GetTicks() - startTicks;
294 if(time_used >= timeout_ms) {
303 timeout_ms = idle_timeout_ms;
305 if(update_stats && !raw_data_only) {
307 transfer_stats[
s].second.transfer(static_cast<size_t>(bytes_read));
310 if(bytes_read > static_cast<int>(nbytes)) {
313 nbytes -= bytes_read;
315 #if !defined(USE_POLL) && !defined(USE_SELECT)
316 startTicks = SDL_GetTicks();
320 const size_t shard = get_shard(s);
323 assert(lock_it != sockets_locked[shard].
end());
324 if(lock_it->second == SOCKET_INTERRUPT) {
336 static void output_to_buffer(TCPsocket ,
const config& cfg, std::ostringstream& compressor)
342 static void make_network_buffer(
const char*
input,
int len, std::vector<char>& buf)
345 SDLNet_Write32(len, &buf[0]);
346 memcpy(&buf[4], input, len);
349 static SOCKET_STATE send_buffer(TCPsocket sock, std::vector<char>& buf,
int in_size = -1)
353 size_t size = buf.size();
361 transfer_stats[sock].first.fresh_current(size);
366 const size_t shard = get_shard(sock);
369 if(sockets_locked[shard][sock] != SOCKET_LOCKED)
371 return SOCKET_ERRORED;
374 send_len =
static_cast<int>(size - upto);
375 const int res = SDLNet_TCP_Send(sock, &buf[upto],send_len);
378 if( res == send_len) {
382 transfer_stats[sock].first.transfer(static_cast<size_t>(res));
387 if(WSAGetLastError() == WSAEWOULDBLOCK)
388 #elif defined(EAGAIN)
390 #elif defined(EWOULDBLOCK)
391 if(errno == EWOULDBLOCK)
395 upto +=
static_cast<size_t>(
res);
399 transfer_stats[sock].first.transfer(static_cast<size_t>(res));
403 struct pollfd fd = { (
reinterpret_cast<_TCPsocket*
>(sock))->channel, POLLOUT, 0 };
406 poll_res = poll(&fd, 1, 60000);
407 }
while(poll_res == -1 && errno == EINTR);
412 #elif defined(USE_SELECT)
415 FD_SET(((_TCPsocket*)sock)->
channel, &writefds);
422 retval = select(((_TCPsocket*)sock)->
channel + 1,
nullptr, &writefds,
nullptr, &tv);
423 }
while(retval == -1 && errno == EINTR);
430 return SOCKET_ERRORED;
438 cork_setter(
int socket) : cork_(1), socket_(socket)
440 setsockopt(socket_, IPPROTO_TCP, TCP_CORK, &cork_,
sizeof(cork_));;
445 setsockopt(socket_, IPPROTO_TCP, TCP_CORK, &cork_,
sizeof(cork_));
459 void operator()(
int fd)
const { close(fd); }
470 LOG_NW <<
"send_file use system sendfile: " << (network_use_system_sendfile?
"yes":
"no") <<
"\n";
471 if (network_use_system_sendfile)
475 SDLNet_Write32(filesize,&buffer[0]);
476 int socket =
reinterpret_cast<_TCPsocket*
>(buf->sock)->
channel;
477 const scoped_fd in_file(open(buf->config_error.c_str(), O_RDONLY));
478 cork_setter set_socket_cork(socket);
480 struct pollfd fd = {socket, POLLOUT, 0 };
482 poll_res = poll(&fd, 1, 600000);
483 }
while(poll_res == -1 && errno == EINTR);
487 result = send_buffer(buf->sock, buffer, 4);
489 result = SOCKET_ERRORED;
492 if (result != SOCKET_READY)
496 result = SOCKET_READY;
502 poll_res = poll(&fd, 1, 600000);
503 }
while(poll_res == -1 && errno == EINTR);
507 result = SOCKET_ERRORED;
512 int bytes = ::sendfile(socket, in_file, 0, filesize);
518 result = SOCKET_ERRORED;
524 if (upto == filesize)
537 buf->raw_buffer.resize(std::min<size_t>(1024*8, filesize));
538 SDLNet_Write32(filesize,&buf->raw_buffer[0]);
540 SOCKET_STATE result = send_buffer(buf->sock, buf->raw_buffer, 4);
542 if (!file_stream->good()) {
543 ERR_NW <<
"send_file: Couldn't open file " << buf->config_error << std::endl;
545 if (result != SOCKET_READY)
549 while (file_stream->good())
552 file_stream->read(&buf->raw_buffer[0], buf->raw_buffer.size());
553 send_size = file_stream->gcount();
556 result = send_buffer(buf->sock, buf->raw_buffer, send_size);
557 if (result != SOCKET_READY)
561 if (upto == filesize)
567 if (upto != filesize && !file_stream->good()) {
568 ERR_NW <<
"send_file failed because the stream from file '"
569 << buf->config_error <<
"' is not good. Sent up to: " << upto
570 <<
" of file size: " << filesize <<
"\n";
575 static SOCKET_STATE receive_buf(TCPsocket sock, std::vector<char>& buf)
581 bool res = receive_with_timeout(sock,num_buf.buf,4,
false);
584 return SOCKET_ERRORED;
587 const int len = SDLNet_Read32(&num_buf);
590 return SOCKET_ERRORED;
595 const char*
const end = beg +
len;
600 transfer_stats[sock].second.fresh_current(len);
603 res = receive_with_timeout(sock, beg, end - beg,
true);
605 return SOCKET_ERRORED;
611 inline void check_socket_result(TCPsocket& sock,
SOCKET_STATE& result)
613 const size_t shard = get_shard(sock);
616 assert(lock_it != sockets_locked[shard].
end());
618 if(result == SOCKET_ERRORED) {
619 ++socket_errors[shard];
623 static int process_queue(
void* shard_num)
625 size_t shard =
reinterpret_cast<uintptr_t
>(shard_num);
626 DBG_NW <<
"thread started...\n";
632 TCPsocket sock =
nullptr;
633 buffer* sent_buf = 0;
637 while(managed && !to_clear[shard].empty()) {
638 Uint32 tmp = to_clear[shard].back();
639 to_clear[shard].pop_back();
641 threads[shard].erase(tmp);
645 if(min_threads && waiting_threads[shard] >= min_threads) {
646 DBG_NW <<
"worker thread exiting... not enough jobs\n";
650 waiting_threads[shard]++;
654 for(; itor != itor_end; ++
itor) {
656 assert(lock_it != sockets_locked[shard].
end());
657 if(lock_it->second == SOCKET_READY) {
658 lock_it->second = SOCKET_LOCKED;
660 sock = sent_buf->sock;
661 outgoing_bufs[shard].erase(itor);
666 if(sock ==
nullptr) {
668 for(; itor != itor_end; ++
itor) {
670 assert(lock_it != sockets_locked[shard].
end());
671 if(lock_it->second == SOCKET_READY) {
672 lock_it->second = SOCKET_LOCKED;
674 pending_receives[shard].erase(itor);
680 if(sock !=
nullptr) {
684 if(managed ==
false) {
685 DBG_NW <<
"worker thread exiting...\n";
686 waiting_threads[shard]--;
691 cond[shard]->
wait(*shard_mutexes[shard]);
693 waiting_threads[shard]--;
695 if(!waiting_threads[shard] && managed ==
true) {
697 if(!max_threads || max_threads >threads[shard].
size()) {
699 threads[shard][tmp->
get_id()] =tmp;
706 DBG_NW <<
"thread found a buffer...\n";
709 std::vector<char>
buf;
713 if(!sent_buf->config_error.empty())
718 if(sent_buf->raw_buffer.empty()) {
720 make_network_buffer(value.c_str(), value.size(), sent_buf->raw_buffer);
723 result = send_buffer(sent_buf->sock, sent_buf->raw_buffer);
727 result = receive_buf(sock,buf);
731 if(result != SOCKET_READY || buf.empty())
733 check_socket_result(sock,result);
737 buffer* received_data =
new buffer(sock);
740 received_data->raw_buffer.swap(buf);
743 std::istringstream
stream(buffer);
746 }
catch(boost::iostreams::gzip_error&) {
747 received_data->config_error =
"Malformed compressed data";
749 received_data->config_error = e.
message;
756 received_data_queue.push_back(received_data);
758 check_socket_result(sock,result);
779 min_threads = p_min_threads;
780 max_threads = p_max_threads;
782 for(
size_t shard = 0; shard !=
NUM_SHARDS; ++shard) {
784 for(
size_t n = 0;
n != p_min_threads; ++
n) {
786 threads[shard][tmp->
get_id()] = tmp;
797 for(
size_t shard = 0; shard !=
NUM_SHARDS; ++shard) {
800 socket_errors[shard] = 0;
805 for(std::map<Uint32,threading::thread*>::const_iterator
i = threads[shard].begin();
i != threads[shard].end(); ++
i) {
807 DBG_NW <<
"waiting for thread " <<
i->first <<
" to exit...\n";
809 DBG_NW <<
"thread exited...\n";
817 threads[shard].clear();
819 to_clear[shard].clear();
821 cond[shard] =
nullptr;
822 delete shard_mutexes[shard];
823 shard_mutexes[shard] =
nullptr;
827 delete received_mutex;
832 sockets_locked[
i].clear();
834 transfer_stats.clear();
836 DBG_NW <<
"exiting manager::~manager()\n";
845 for(
size_t shard = 0; shard !=
NUM_SHARDS; ++shard) {
848 for(buffer_set::const_iterator
i = outgoing_bufs[shard].begin();
i != outgoing_bufs[shard].end(); ++
i) {
858 raw_data_only =
true;
863 network_use_system_sendfile = use;
869 const size_t shard = get_shard(sock);
871 pending_receives[shard].push_back(sock);
873 socket_state_map::const_iterator
i = sockets_locked[shard].insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
874 if(i->second == SOCKET_READY || i->second == SOCKET_ERRORED) {
882 assert(!raw_data_only);
885 if(sock !=
nullptr) {
886 for(; itor != received_data_queue.end(); ++
itor) {
887 if((*itor)->sock == sock) {
893 if(itor == received_data_queue.end()) {
895 }
else if (!(*itor)->config_error.empty()){
899 TCPsocket err_sock = (*itor)->sock;
900 received_data_queue.erase(itor);
904 cfg.
swap((*itor)->config_buf);
905 const TCPsocket res = (*itor)->sock;
908 received_data_queue.erase(itor);
916 assert(raw_data_only);
918 if(received_data_queue.empty()) {
922 buffer* buf = received_data_queue.front();
923 received_data_queue.pop_front();
924 out.swap(buf->raw_buffer);
925 const TCPsocket res = buf->sock;
932 const size_t shard = get_shard(sock);
934 outgoing_bufs[shard].push_back(queued_buf);
935 socket_state_map::const_iterator
i = sockets_locked[shard].insert(std::pair<TCPsocket,SOCKET_STATE>(sock,SOCKET_READY)).first;
936 if(i->second == SOCKET_READY || i->second == SOCKET_ERRORED) {
944 buffer* queued_buf =
new buffer(sock);
946 make_network_buffer(buf, len, queued_buf->raw_buffer);
953 buffer* queued_buf =
new buffer(sock);
954 queued_buf->config_error = filename;
960 DBG_NW <<
"queuing data...\n";
962 buffer* queued_buf =
new buffer(sock);
963 output_to_buffer(sock, buf, queued_buf->stream);
964 const size_t size = queued_buf->stream.str().size();
975 void remove_buffers(TCPsocket sock)
978 const size_t shard = get_shard(sock);
980 if ((*i)->sock == sock)
983 i = outgoing_bufs[shard].erase(
i);
997 if((*j)->sock == sock) {
999 j = received_data_queue.erase(j);
1011 const size_t shard = get_shard(sock);
1014 if (lock_it == sockets_locked[shard].
end())
return false;
1015 return (lock_it->second == SOCKET_LOCKED);
1021 const size_t shard = get_shard(sock);
1024 pending_receives[shard].erase(
std::remove(pending_receives[shard].begin(),pending_receives[shard].
end(),sock),pending_receives[shard].
end());
1027 if(lock_it == sockets_locked[shard].
end()) {
1028 remove_buffers(sock);
1031 if (!(lock_it->second == SOCKET_LOCKED || lock_it->second == SOCKET_INTERRUPT)) {
1032 sockets_locked[shard].erase(lock_it);
1033 remove_buffers(sock);
1036 lock_it->second = SOCKET_INTERRUPT;
1047 for(
size_t shard = 0; shard !=
NUM_SHARDS; ++shard) {
1049 if(socket_errors[shard] > 0) {
1051 if(
i->second == SOCKET_ERRORED) {
1052 --socket_errors[shard];
1053 const TCPsocket sock =
i->first;
1054 sockets_locked[shard].erase(
i++);
1055 pending_receives[shard].erase(
std::remove(pending_receives[shard].begin(),pending_receives[shard].
end(),sock),pending_receives[shard].
end());
1056 remove_buffers(sock);
1066 socket_errors[shard] = 0;
1075 return transfer_stats[sock];
void receive_data(TCPsocket sock)
Function to asynchronously received data to the given socket.
static l_noret error(LoadState *S, const char *why)
void queue_raw_data(TCPsocket sock, const char *buf, int len)
GLenum GLenum GLenum input
static lg::log_domain log_network("network")
scoped_resource: class template, functions, helper policies etc. for resource management.
void read_gz(config &cfg, std::istream &file, abstract_validator *validator)
might throw a std::ios_base::failure especially a gzip_error
size_t queue_data(TCPsocket sock, const config &buf, const std::string &packet_type)
bool is_locked(const TCPsocket sock)
manager(size_t min_threads, size_t max_threads)
TCPsocket get_received_data(TCPsocket sock, config &cfg, network::bandwidth_in_ptr &bandwidth_in)
void send_file(const std::string &filename, connection connection_num, const std::string &packet_type)
#define ALIGN_4
Aligns a variable on a 4 byte boundary.
#define SOCKET
Network worker handles data transfers in threads Remember to use mutexs as little as possible All glo...
Class for writing a config out to a file in pieces.
void queue_file(TCPsocket sock, const std::string &filename)
GLsizei const GLfloat * value
std::istream * istream_file(const std::string &fname, bool treat_failure_as_error=true)
A class template, scoped_resource, designed to implement the Resource Acquisition Is Initialization (...
GLenum GLuint GLsizei const char * buf
network::pending_statistics get_pending_stats()
Some defines: VERSION, PACKAGE, MIN_SAVEGAME_VERSION.
std::map< std::string, tfilter >::iterator itor
Declarations for File-IO.
static int writer(lua_State *L, const void *b, size_t size, void *B)
static void queue_buffer(TCPsocket sock, buffer *queued_buf)
int file_size(const std::string &fname)
Returns the size of a file, or -1 if the file doesn't exist.
boost::uint32_t get_current_thread_id()
static int cond(LexState *ls)
Standard logging facilities (interface).
void set_use_system_sendfile(bool use)
const std::string remove
remove directive
boost::error_info< struct tag_tcpsocket, TCPsocket > tcpsocket_info
A config object defines a single node in a WML file, with access to child nodes.
bool close_socket(TCPsocket sock)
GLsizei const GLcharARB ** string
bool wait(const mutex &m)
std::pair< network::statistics, network::statistics > get_current_transfer_stats(TCPsocket sock)
void add_bandwidth_out(const std::string &packet_type, size_t len)