Samples

The following example is a composite showing both the suspend/resume as well as the NIO streams. First we'll show the server side. Note that the following code snippets are part of a single example that makes use nested classes.

304 /**
305  * This handler using non-blocking streams to read POST data and echo it
306  * back to the client.
307  */
308 private static class NonBlockingEchoHandler extends HttpHandler {
309 
310 
311     // -------------------------------------------- Methods from HttpHandler
312 
313 
314     @Override
315     public void service(final Request request,
316                         final Response response) throws Exception {
317 
318         final char[] buf = new char[128];
319         final NIOReader in = request.getNIOReader(); // put the stream in non-blocking mode
320         final NIOWriter out = response.getNIOWriter();
321 
322         response.suspend();
323 
324         // If we don't have more data to read - onAllDataRead() will be called
325         in.notifyAvailable(new ReadHandler() {
326 
327             @Override
328             public void onDataAvailable() throws Exception {
329                 System.out.printf("[onDataAvailable] echoing %d bytes\n", in.readyData());
330                 echoAvailableData(in, out, buf);
331                 in.notifyAvailable(this);
332             }
333 
334             @Override
335             public void onError(Throwable t) {
336                 System.out.println("[onError]" + t);
337                 response.resume();
338             }
339 
340             @Override
341             public void onAllDataRead() throws Exception {
342                 System.out.printf("[onAllDataRead] length: %d\n", in.readyData());
343                 try {
344                     echoAvailableData(in, out, buf);
345                 } finally {
346                     try {
347                         in.close();
348                     } catch (IOException ignored) {
349                     }
350 
351                     try {
352                         out.close();
353                     } catch (IOException ignored) {
354                     }
355 
356                     response.resume();
357                 }
358             }
359         });
360 
361     }
362 
363     private void echoAvailableData(NIOReader in, NIOWriter out, char[] buf)
364             throws IOException {
365             
366         while(in.isReady()) {
367             int len = in.read(buf);
368             out.write(buf, 0, len);
369         }
370     }
371 
372 } // END NonBlockingEchoHandler

As can be gleened from the name of the class, this HttpHandler implementation simply echoes POST data back to the client.

Let's cover the major points of this part of the example:

Now we need to create a server and install this HttpHandler:

108 public static void main(String[] args) {
109 
110     // create a basic server that listens on port 8080.
111     final HttpServer server = HttpServer.createSimpleServer();
112 
113     final ServerConfiguration config = server.getServerConfiguration();
114 
115     // Map the path, /echo, to the NonBlockingEchoHandler
116     config.addHttpHandler(new NonBlockingEchoHandler(), "/echo");
117 
118     try {
119         server.start();
120         Client client = new Client();
121         client.run();
122     } catch (IOException ioe) {
123         LOGGER.log(Level.SEVERE, ioe.toString(), ioe);
124     } finally {
125         server.stop();
126     }
127 }

This part of the example is pretty straight forward. Create the server, install the HttpHandler to service requests made to /echo, and start the server.

The client code which will follow will be sending data slowly to exercise the non-blocking HttpHandler. The client code relies on the http module primitives, so there's a little more code here to get this part of the example going. Let's start with the client Filter that sends request.

199 private static final class ClientFilter extends BaseFilter {
200 
201             private static final String[] CONTENT = {
202                 "contentA-",
203                 "contentB-",
204                 "contentC-",
205                 "contentD"
206             };
207 
208             private FutureImpl<String> future;
209 
210             private StringBuilder sb = new StringBuilder();
211 
212             // ---------------------------------------------------- Constructors
213 
214 
215             private ClientFilter(FutureImpl<String> future) {
216                 this.future = future;                
217             }
218 
219 
220             // ----------------------------------------- Methods from BaseFilter
221 
222 
223             @SuppressWarnings({"unchecked"})
224             @Override
225             public NextAction handleConnect(FilterChainContext ctx) throws IOException {
226                 System.out.println("\nClient connected!\n");
227 
228                 HttpRequestPacket request = createRequest();
229                 System.out.println("Writing request:\n");
230                 System.out.println(request.toString());
231                 ctx.write(request); // write the request
232 
233                 // for each of the content parts in CONTENT, wrap in a Buffer,
234                 // create the HttpContent to wrap the buffer and write the
235                 // content.
236                 MemoryManager mm = ctx.getConnection().getTransport().getMemoryManager();
237                 for (int i = 0, len = CONTENT.length; i < len; i++) {
238                     HttpContent.Builder contentBuilder = request.httpContentBuilder();
239                     Buffer b = Buffers.wrap(mm, CONTENT[i]);
240                     contentBuilder.content(b);
241                     HttpContent content = contentBuilder.build();
242                     System.out.printf("(Client writing: %s)\n", b.toStringContent());
243                     ctx.write(content);
244                     try {
245                         Thread.sleep(2000);
246                     } catch (InterruptedException e) {
247                         e.printStackTrace(); 
248                     }
249                 }
250 
251                 // since the request created by createRequest() is chunked,
252                 // we need to write the trailer to signify the end of the
253                 // POST data
254                 ctx.write(request.httpTrailerBuilder().build());
255 
256                 System.out.println("\n");
257 
258                 return ctx.getStopAction(); // discontinue filter chain execution
259 
260             }
261 
262 
263             @Override
264             public NextAction handleRead(FilterChainContext ctx) throws IOException {
265 
266                 HttpContent c = (HttpContent) ctx.getMessage();
267                 Buffer b = c.getContent();
268                 if (b.hasRemaining()) {
269                     sb.append(b.toStringContent());
270                 }
271 
272                 // Last content from the server, set the future result so
273                 // the client can display the result and gracefully exit.
274                 if (c.isLast()) {
275                     future.result(sb.toString());
276                 }
277                 return ctx.getStopAction(); // discontinue filter chain execution
278 
279             }
280 
281 
282             // ------------------------------------------------- Private Methods
283 
284 
285             private HttpRequestPacket createRequest() {
286 
287                 HttpRequestPacket.Builder builder = HttpRequestPacket.builder();
288                 builder.method("POST");
289                 builder.protocol("HTTP/1.1");
290                 builder.uri("/echo");
291                 builder.chunked(true);
292                 HttpRequestPacket packet = builder.build();
293                 packet.addHeader("Host", HOST + ':' + PORT);
294                 return packet;
295 
296             }
297 
298         }
299 
300     } // END Client

High level points about this code:

Lastly we need to define a client that utilizes the Filter that was just described:

133 private static final class Client {
134 
135     private static final String HOST = "localhost";
136     private static final int PORT = 8080;
137 
138     public void run() throws IOException {
139         final FutureImpl<String> completeFuture = SafeFutureImpl.create();
140 
141         // Build HTTP client filter chain
142         FilterChainBuilder clientFilterChainBuilder = FilterChainBuilder.stateless();
143         // Add transport filter
144         clientFilterChainBuilder.add(new TransportFilter());
145 
146         // Add HttpClientFilter, which transforms Buffer <-> HttpContent
147         clientFilterChainBuilder.add(new HttpClientFilter());
148         // Add ClientFilter
149         clientFilterChainBuilder.add(new ClientFilter(completeFuture));
150 
151 
152         // Initialize Transport
153         final TCPNIOTransport transport =
154                TCPNIOTransportBuilder.newInstance().build();
155         // Set filterchain as a Transport Processor
156         transport.setProcessor(clientFilterChainBuilder.build());
157 
158         try {
159             // start the transport
160             transport.start();
161 
162             Connection connection = null;
163 
164             // Connecting to a remote Web server
165             Future<Connection> connectFuture = transport.connect(HOST, PORT);
166             try {
167                 // Wait until the client connect operation will be completed
168                 // Once connection has been established, the POST will
169                 // be sent to the server.
170                 connection = connectFuture.get(10, TimeUnit.SECONDS);
171 
172                 // Wait no longer than 30 seconds for the response from the
173                 // server to be complete.
174                 String result = completeFuture.get(30, TimeUnit.SECONDS);
175 
176                 // Display the echoed content
177                 System.out.println("\nEchoed POST Data: " + result + '\n');
178             } catch (Exception e) {
179                 if (connection == null) {
180                     LOGGER.log(Level.WARNING, "Connection failed.  Server is not listening.");
181                 } else {
182                     LOGGER.log(Level.WARNING, "Unexpected error communicating with the server.");
183                 }
184             } finally {
185                 // Close the client connection
186                 if (connection != null) {
187                     connection.close();
188                 }
189             }
190         } finally {
191             // stop the transport
192             transport.stop();
193         }
194     }

The comments within the Client code should be sufficient to explain what's going on here. When running the complete example the output will look something like:

Sep 22, 2011 3:22:58 PM org.glassfish.grizzly.http.server.NetworkListener start
INFO: Started listener bound to [0.0.0.0:8080]
Sep 22, 2011 3:22:58 PM org.glassfish.grizzly.http.server.HttpServer start
INFO: [HttpServer] Started.

Client connected!

Writing request:

HttpRequestPacket (
   method=POST
   url=/echo
   query=null
   protocol=HTTP/1.1
   content-length=-1
   headers=[
      Host=localhost:8080]
)
(Client writing: contentA-)
[onDataAvailable] echoing 9 bytes

(delay 2 seconds)

(Client writing: contentB-)
[onDataAvailable] echoing 9 bytes

(delay 2 seconds)

(Client writing: contentC-)
[onDataAvailable] echoing 9 bytes

(delay 2 seconds)

(Client writing: contentD)
[onDataAvailable] echoing 8 bytes


[onAllDataRead] length: 0

Echoed POST Data: contentA-contentB-contentC-contentD

Sep 22, 2011 3:23:06 PM org.glassfish.grizzly.http.server.NetworkListener stop
INFO: Stopped listener bound to [0.0.0.0:8080]

A quick note about the output above, the (delay 2 seconds) isn't actually output. It's been added to visualize the artificial delay added by the Filter used by the client.

This example in its entirety is available within the samples section of the Grizzly 2.2.10 repository.