TrinityCore
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Socket.h
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2008-2016 TrinityCore <http://www.trinitycore.org/>
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License as published by the
6  * Free Software Foundation; either version 2 of the License, or (at your
7  * option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11  * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12  * more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program. If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 #ifndef __SOCKET_H__
19 #define __SOCKET_H__
20 
21 #include "MessageBuffer.h"
22 #include "Log.h"
23 #include <atomic>
24 #include <queue>
25 #include <memory>
26 #include <functional>
27 #include <type_traits>
28 #include <boost/asio/ip/tcp.hpp>
29 
30 using boost::asio::ip::tcp;
31 
32 #define READ_BLOCK_SIZE 4096
33 #ifdef BOOST_ASIO_HAS_IOCP
34 #define TC_SOCKET_USE_IOCP
35 #endif
36 
64 template<class T, class Stream = tcp::socket>
65 class Socket : public std::enable_shared_from_this<T>
66 {
67 public:
68  explicit Socket(tcp::socket&& socket) : _socket(std::move(socket)), _remoteAddress(_socket.remote_endpoint().address()),
70  {
72  }
73 
74  virtual ~Socket()
75  {
76  _closed = true;
77  boost::system::error_code error;
78  _socket.close(error);
79  }
80 
81  virtual void Start() = 0;
82 
83  virtual bool Update()
84  {
85  if (_closed)
86  return false;
87 
88 #ifndef TC_SOCKET_USE_IOCP
89  if (_isWritingAsync || (_writeQueue.empty() && !_closing))
90  return true;
91 
92  for (; HandleQueue();)
93  ;
94 #endif
95 
96  return true;
97  }
98 
99  boost::asio::ip::address GetRemoteIpAddress() const
100  {
101  return _remoteAddress;
102  }
103 
105  {
106  return _remotePort;
107  }
108 
109  void AsyncRead()
110  {
111  if (!IsOpen())
112  return;
113 
116  _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
117  std::bind(&Socket<T, Stream>::ReadHandlerInternal, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
118  }
119 
120  void AsyncReadWithCallback(void (T::*callback)(boost::system::error_code, std::size_t))
121  {
122  if (!IsOpen())
123  return;
124 
127  _socket.async_read_some(boost::asio::buffer(_readBuffer.GetWritePointer(), _readBuffer.GetRemainingSpace()),
128  std::bind(callback, this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
129  }
130 
131  void QueuePacket(MessageBuffer&& buffer)
132  {
133  _writeQueue.push(std::move(buffer));
134 
135 #ifdef TC_SOCKET_USE_IOCP
137 #endif
138  }
139 
140  bool IsOpen() const { return !_closed && !_closing; }
141 
142  void CloseSocket()
143  {
144  if (_closed.exchange(true))
145  return;
146 
147  boost::system::error_code shutdownError;
148  _socket.shutdown(boost::asio::socket_base::shutdown_send, shutdownError);
149  if (shutdownError)
150  TC_LOG_DEBUG("network", "Socket::CloseSocket: %s errored when shutting down socket: %i (%s)", GetRemoteIpAddress().to_string().c_str(),
151  shutdownError.value(), shutdownError.message().c_str());
152 
153  OnClose();
154  }
155 
157  void DelayedCloseSocket() { _closing = true; }
158 
160 
161 protected:
162  virtual void OnClose() { }
163 
164  virtual void ReadHandler() = 0;
165 
167  {
168  if (_isWritingAsync)
169  return false;
170 
171  _isWritingAsync = true;
172 
173 #ifdef TC_SOCKET_USE_IOCP
174  MessageBuffer& buffer = _writeQueue.front();
175  _socket.async_write_some(boost::asio::buffer(buffer.GetReadPointer(), buffer.GetActiveSize()), std::bind(&Socket<T, Stream>::WriteHandler,
176  this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
177 #else
178  _socket.async_write_some(boost::asio::null_buffers(), std::bind(&Socket<T, Stream>::WriteHandlerWrapper,
179  this->shared_from_this(), std::placeholders::_1, std::placeholders::_2));
180 #endif
181 
182  return false;
183  }
184 
185  void SetNoDelay(bool enable)
186  {
187  boost::system::error_code err;
188  _socket.set_option(tcp::no_delay(enable), err);
189  if (err)
190  TC_LOG_DEBUG("network", "Socket::SetNoDelay: failed to set_option(boost::asio::ip::tcp::no_delay) for %s - %d (%s)",
191  GetRemoteIpAddress().to_string().c_str(), err.value(), err.message().c_str());
192  }
193 
195  {
196  return _socket;
197  }
198 
199 private:
200  void ReadHandlerInternal(boost::system::error_code error, size_t transferredBytes)
201  {
202  if (error)
203  {
204  CloseSocket();
205  return;
206  }
207 
208  _readBuffer.WriteCompleted(transferredBytes);
209  ReadHandler();
210  }
211 
212 #ifdef TC_SOCKET_USE_IOCP
213 
214  void WriteHandler(boost::system::error_code error, std::size_t transferedBytes)
215  {
216  if (!error)
217  {
218  _isWritingAsync = false;
219  _writeQueue.front().ReadCompleted(transferedBytes);
220  if (!_writeQueue.front().GetActiveSize())
221  _writeQueue.pop();
222 
223  if (!_writeQueue.empty())
225  else if (_closing)
226  CloseSocket();
227  }
228  else
229  CloseSocket();
230  }
231 
232 #else
233 
234  void WriteHandlerWrapper(boost::system::error_code /*error*/, std::size_t /*transferedBytes*/)
235  {
236  _isWritingAsync = false;
237  HandleQueue();
238  }
239 
240  bool HandleQueue()
241  {
242  if (_writeQueue.empty())
243  return false;
244 
245  MessageBuffer& queuedMessage = _writeQueue.front();
246 
247  std::size_t bytesToSend = queuedMessage.GetActiveSize();
248 
249  boost::system::error_code error;
250  std::size_t bytesSent = _socket.write_some(boost::asio::buffer(queuedMessage.GetReadPointer(), bytesToSend), error);
251 
252  if (error)
253  {
254  if (error == boost::asio::error::would_block || error == boost::asio::error::try_again)
255  return AsyncProcessQueue();
256 
257  _writeQueue.pop();
258  if (_closing && _writeQueue.empty())
259  CloseSocket();
260  return false;
261  }
262  else if (bytesSent == 0)
263  {
264  _writeQueue.pop();
265  if (_closing && _writeQueue.empty())
266  CloseSocket();
267  return false;
268  }
269  else if (bytesSent < bytesToSend) // now n > 0
270  {
271  queuedMessage.ReadCompleted(bytesSent);
272  return AsyncProcessQueue();
273  }
274 
275  _writeQueue.pop();
276  if (_closing && _writeQueue.empty())
277  CloseSocket();
278  return !_writeQueue.empty();
279  }
280 
281 #endif
282 
283  Stream _socket;
284 
285  boost::asio::ip::address _remoteAddress;
287 
289  std::queue<MessageBuffer> _writeQueue;
290 
291  std::atomic<bool> _closed;
292  std::atomic<bool> _closing;
293 
295 };
296 
297 #endif // __SOCKET_H__
Definition: Socket.h:65
void WriteHandlerWrapper(boost::system::error_code, std::size_t)
Definition: Socket.h:234
Socket(tcp::socket &&socket)
Definition: Socket.h:68
void AsyncReadWithCallback(void(T::*callback)(boost::system::error_code, std::size_t))
Definition: Socket.h:120
size_type GetRemainingSpace() const
Definition: MessageBuffer.h:68
boost::asio::ip::address _remoteAddress
Definition: Socket.h:285
bool _isWritingAsync
Definition: Socket.h:294
virtual bool Update()
Definition: Socket.h:83
boost::asio::ip::address GetRemoteIpAddress() const
Definition: Socket.h:99
STL namespace.
#define false
Definition: CascPort.h:18
#define TC_LOG_DEBUG(filterType__,...)
Definition: Log.h:198
bool AsyncProcessQueue()
Definition: Socket.h:166
MessageBuffer _readBuffer
Definition: Socket.h:288
uint8 * GetWritePointer()
Definition: MessageBuffer.h:60
bool HandleQueue()
Definition: Socket.h:240
void SetNoDelay(bool enable)
Definition: Socket.h:185
void WriteCompleted(size_type bytes)
Definition: MessageBuffer.h:64
bool IsOpen() const
Definition: Socket.h:140
virtual ~Socket()
Definition: Socket.h:74
void EnsureFreeSpace()
Definition: MessageBuffer.h:85
void ReadCompleted(size_type bytes)
Definition: MessageBuffer.h:62
void DelayedCloseSocket()
Marks the socket for closing after write buffer becomes empty.
Definition: Socket.h:157
uint16 _remotePort
Definition: Socket.h:286
uint16_t uint16
Definition: Define.h:151
void QueuePacket(MessageBuffer &&buffer)
Definition: Socket.h:131
#define READ_BLOCK_SIZE
Definition: Socket.h:32
uint16 GetRemotePort() const
Definition: Socket.h:104
size_type GetActiveSize() const
Definition: MessageBuffer.h:66
virtual void Start()=0
void Resize(size_type bytes)
Definition: MessageBuffer.h:51
std::atomic< bool > _closing
Definition: Socket.h:292
std::queue< MessageBuffer > _writeQueue
Definition: Socket.h:289
virtual void ReadHandler()=0
Stream _socket
Definition: Socket.h:283
MessageBuffer & GetReadBuffer()
Definition: Socket.h:159
void Normalize()
Definition: MessageBuffer.h:73
void AsyncRead()
Definition: Socket.h:109
virtual void OnClose()
Definition: Socket.h:162
uint8 * GetReadPointer()
Definition: MessageBuffer.h:58
std::atomic< bool > _closed
Definition: Socket.h:291
void CloseSocket()
Definition: Socket.h:142
Stream & underlying_stream()
Definition: Socket.h:194
Definition: MessageBuffer.h:24
void ReadHandlerInternal(boost::system::error_code error, size_t transferredBytes)
Definition: Socket.h:200