The following example is a composite showing both the suspend/resume as well as the NIO streams. First we'll show the server side. Note that the following code snippets are part of a single example that makes use nested classes.
304 /** 305 * This handler using non-blocking streams to read POST data and echo it 306 * back to the client. 307 */ 308 private static class NonBlockingEchoHandler extends HttpHandler { 309 310 311 // -------------------------------------------- Methods from HttpHandler 312 313 314 @Override 315 public void service(final Request request, 316 final Response response) throws Exception { 317 318 final char[] buf = new char[128]; 319 final NIOReader in = request.getNIOReader(); // put the stream in non-blocking mode 320 final NIOWriter out = response.getNIOWriter(); 321 322 response.suspend(); 323 324 // If we don't have more data to read - onAllDataRead() will be called 325 in.notifyAvailable(new ReadHandler() { 326 327 @Override 328 public void onDataAvailable() throws Exception { 329 System.out.printf("[onDataAvailable] echoing %d bytes\n", in.readyData()); 330 echoAvailableData(in, out, buf); 331 in.notifyAvailable(this); 332 } 333 334 @Override 335 public void onError(Throwable t) { 336 System.out.println("[onError]" + t); 337 response.resume(); 338 } 339 340 @Override 341 public void onAllDataRead() throws Exception { 342 System.out.printf("[onAllDataRead] length: %d\n", in.readyData()); 343 try { 344 echoAvailableData(in, out, buf); 345 } finally { 346 try { 347 in.close(); 348 } catch (IOException ignored) { 349 } 350 351 try { 352 out.close(); 353 } catch (IOException ignored) { 354 } 355 356 response.resume(); 357 } 358 } 359 }); 360 361 } 362 363 private void echoAvailableData(NIOReader in, NIOWriter out, char[] buf) 364 throws IOException { 365 366 while(in.isReady()) { 367 int len = in.read(buf); 368 out.write(buf, 0, len); 369 } 370 } 371 372 } // END NonBlockingEchoHandler
As can be gleened from the name of the class, this HttpHandler implementation simply echoes POST data back to the client.
Let's cover the major points of this part of the example:
line 318 - Configures the stream for non-blocking use cases.
lines 321 - Response is suspended. service() method will exit. ReadHandler implementation will be notified as data becomes available.
line 327-330 - onDataAvailable callback invoked as data is received by the server
lines 338-356 - onAllDataRead callback invoked when client has finished message. All content is read and echoed back to the client.
Now we need to create a server and install this HttpHandler:
108 public static void main(String[] args) { 109 110 // create a basic server that listens on port 8080. 111 final HttpServer server = HttpServer.createSimpleServer(); 112 113 final ServerConfiguration config = server.getServerConfiguration(); 114 115 // Map the path, /echo, to the NonBlockingEchoHandler 116 config.addHttpHandler(new NonBlockingEchoHandler(), "/echo"); 117 118 try { 119 server.start(); 120 Client client = new Client(); 121 client.run(); 122 } catch (IOException ioe) { 123 LOGGER.log(Level.SEVERE, ioe.toString(), ioe); 124 } finally { 125 server.stop(); 126 } 127 }
This part of the example is pretty straight forward. Create the server, install the HttpHandler to service requests made to /echo, and start the server.
The client code which will follow will be sending data slowly to exercise the non-blocking HttpHandler. The client code relies on the http module primitives, so there's a little more code here to get this part of the example going. Let's start with the client Filter that sends request.
199 private static final class ClientFilter extends BaseFilter { 200 201 private static final String[] CONTENT = { 202 "contentA-", 203 "contentB-", 204 "contentC-", 205 "contentD" 206 }; 207 208 private FutureImpl<String> future; 209 210 private StringBuilder sb = new StringBuilder(); 211 212 // ---------------------------------------------------- Constructors 213 214 215 private ClientFilter(FutureImpl<String> future) { 216 this.future = future; 217 } 218 219 220 // ----------------------------------------- Methods from BaseFilter 221 222 223 @SuppressWarnings({"unchecked"}) 224 @Override 225 public NextAction handleConnect(FilterChainContext ctx) throws IOException { 226 System.out.println("\nClient connected!\n"); 227 228 HttpRequestPacket request = createRequest(); 229 System.out.println("Writing request:\n"); 230 System.out.println(request.toString()); 231 ctx.write(request); // write the request 232 233 // for each of the content parts in CONTENT, wrap in a Buffer, 234 // create the HttpContent to wrap the buffer and write the 235 // content. 236 MemoryManager mm = ctx.getConnection().getTransport().getMemoryManager(); 237 for (int i = 0, len = CONTENT.length; i < len; i++) { 238 HttpContent.Builder contentBuilder = request.httpContentBuilder(); 239 Buffer b = Buffers.wrap(mm, CONTENT[i]); 240 contentBuilder.content(b); 241 HttpContent content = contentBuilder.build(); 242 System.out.printf("(Client writing: %s)\n", b.toStringContent()); 243 ctx.write(content); 244 try { 245 Thread.sleep(2000); 246 } catch (InterruptedException e) { 247 e.printStackTrace(); 248 } 249 } 250 251 // since the request created by createRequest() is chunked, 252 // we need to write the trailer to signify the end of the 253 // POST data 254 ctx.write(request.httpTrailerBuilder().build()); 255 256 System.out.println("\n"); 257 258 return ctx.getStopAction(); // discontinue filter chain execution 259 260 } 261 262 263 @Override 264 public NextAction handleRead(FilterChainContext ctx) throws IOException { 265 266 HttpContent c = (HttpContent) ctx.getMessage(); 267 Buffer b = c.getContent(); 268 if (b.hasRemaining()) { 269 sb.append(b.toStringContent()); 270 } 271 272 // Last content from the server, set the future result so 273 // the client can display the result and gracefully exit. 274 if (c.isLast()) { 275 future.result(sb.toString()); 276 } 277 return ctx.getStopAction(); // discontinue filter chain execution 278 279 } 280 281 282 // ------------------------------------------------- Private Methods 283 284 285 private HttpRequestPacket createRequest() { 286 287 HttpRequestPacket.Builder builder = HttpRequestPacket.builder(); 288 builder.method("POST"); 289 builder.protocol("HTTP/1.1"); 290 builder.uri("/echo"); 291 builder.chunked(true); 292 HttpRequestPacket packet = builder.build(); 293 packet.addHeader("Host", HOST + ':' + PORT); 294 return packet; 295 296 } 297 298 } 299 300 } // END Client
High level points about this code:
lines 226-261 - When the connection is established with the server, the handleConnect() method of this Filter will be invoked. When this happens we create a HttpRequestPacket, which contains only message part of a POST request. wThe request is then written.
lines 237-249 writes the body of the message in 2 second intervals.
line 255 - Write the trailer to signify the end of the request since the chunked transfer encoding is being used.
lines 265-280 - Read the response from the server. Store the final result in a future to be retrieved later.
Lastly we need to define a client that utilizes the Filter that was just described:
133 private static final class Client { 134 135 private static final String HOST = "localhost"; 136 private static final int PORT = 8080; 137 138 public void run() throws IOException { 139 final FutureImpl<String> completeFuture = SafeFutureImpl.create(); 140 141 // Build HTTP client filter chain 142 FilterChainBuilder clientFilterChainBuilder = FilterChainBuilder.stateless(); 143 // Add transport filter 144 clientFilterChainBuilder.add(new TransportFilter()); 145 146 // Add HttpClientFilter, which transforms Buffer <-> HttpContent 147 clientFilterChainBuilder.add(new HttpClientFilter()); 148 // Add ClientFilter 149 clientFilterChainBuilder.add(new ClientFilter(completeFuture)); 150 151 152 // Initialize Transport 153 final TCPNIOTransport transport = 154 TCPNIOTransportBuilder.newInstance().build(); 155 // Set filterchain as a Transport Processor 156 transport.setProcessor(clientFilterChainBuilder.build()); 157 158 try { 159 // start the transport 160 transport.start(); 161 162 Connection connection = null; 163 164 // Connecting to a remote Web server 165 Future<Connection> connectFuture = transport.connect(HOST, PORT); 166 try { 167 // Wait until the client connect operation will be completed 168 // Once connection has been established, the POST will 169 // be sent to the server. 170 connection = connectFuture.get(10, TimeUnit.SECONDS); 171 172 // Wait no longer than 30 seconds for the response from the 173 // server to be complete. 174 String result = completeFuture.get(30, TimeUnit.SECONDS); 175 176 // Display the echoed content 177 System.out.println("\nEchoed POST Data: " + result + '\n'); 178 } catch (Exception e) { 179 if (connection == null) { 180 LOGGER.log(Level.WARNING, "Connection failed. Server is not listening."); 181 } else { 182 LOGGER.log(Level.WARNING, "Unexpected error communicating with the server."); 183 } 184 } finally { 185 // Close the client connection 186 if (connection != null) { 187 connection.close(); 188 } 189 } 190 } finally { 191 // stop the transport 192 transport.stop(); 193 } 194 }
The comments within the Client code should be sufficient to explain what's going on here. When running the complete example the output will look something like:
Sep 22, 2011 3:22:58 PM org.glassfish.grizzly.http.server.NetworkListener start INFO: Started listener bound to [0.0.0.0:8080] Sep 22, 2011 3:22:58 PM org.glassfish.grizzly.http.server.HttpServer start INFO: [HttpServer] Started. Client connected! Writing request: HttpRequestPacket ( method=POST url=/echo query=null protocol=HTTP/1.1 content-length=-1 headers=[ Host=localhost:8080] ) (Client writing: contentA-) [onDataAvailable] echoing 9 bytes (delay 2 seconds) (Client writing: contentB-) [onDataAvailable] echoing 9 bytes (delay 2 seconds) (Client writing: contentC-) [onDataAvailable] echoing 9 bytes (delay 2 seconds) (Client writing: contentD) [onDataAvailable] echoing 8 bytes [onAllDataRead] length: 0 Echoed POST Data: contentA-contentB-contentC-contentD Sep 22, 2011 3:23:06 PM org.glassfish.grizzly.http.server.NetworkListener stop INFO: Stopped listener bound to [0.0.0.0:8080]
A quick note about the output above, the (delay 2 seconds) isn't actually output. It's been added to visualize the artificial delay added by the Filter used by the client.
This example in its entirety is available within the samples section of the Grizzly 2.2.10 repository.