The following example is a composite showing both the suspend/resume as well as the NIO streams. First we'll show the server side. Note that the following code snippets are part of a single example that makes use nested classes.
304 /**
305 * This handler using non-blocking streams to read POST data and echo it
306 * back to the client.
307 */
308 private static class NonBlockingEchoHandler extends HttpHandler {
309
310
311 // -------------------------------------------- Methods from HttpHandler
312
313
314 @Override
315 public void service(final Request request,
316 final Response response) throws Exception {
317
318 final char[] buf = new char[128];
319 final NIOReader in = request.getNIOReader(); // put the stream in non-blocking mode
320 final NIOWriter out = response.getNIOWriter();
321
322 response.suspend();
323
324 // If we don't have more data to read - onAllDataRead() will be called
325 in.notifyAvailable(new ReadHandler() {
326
327 @Override
328 public void onDataAvailable() throws Exception {
329 System.out.printf("[onDataAvailable] echoing %d bytes\n", in.readyData());
330 echoAvailableData(in, out, buf);
331 in.notifyAvailable(this);
332 }
333
334 @Override
335 public void onError(Throwable t) {
336 System.out.println("[onError]" + t);
337 response.resume();
338 }
339
340 @Override
341 public void onAllDataRead() throws Exception {
342 System.out.printf("[onAllDataRead] length: %d\n", in.readyData());
343 try {
344 echoAvailableData(in, out, buf);
345 } finally {
346 try {
347 in.close();
348 } catch (IOException ignored) {
349 }
350
351 try {
352 out.close();
353 } catch (IOException ignored) {
354 }
355
356 response.resume();
357 }
358 }
359 });
360
361 }
362
363 private void echoAvailableData(NIOReader in, NIOWriter out, char[] buf)
364 throws IOException {
365
366 while(in.isReady()) {
367 int len = in.read(buf);
368 out.write(buf, 0, len);
369 }
370 }
371
372 } // END NonBlockingEchoHandlerAs can be gleened from the name of the class, this HttpHandler implementation simply echoes POST data back to the client.
Let's cover the major points of this part of the example:
line 318 - Configures the stream for non-blocking use cases.
lines 321 - Response is suspended. service() method will exit. ReadHandler implementation will be notified as data becomes available.
line 327-330 - onDataAvailable callback invoked as data is received by the server
lines 338-356 - onAllDataRead callback invoked when client has finished message. All content is read and echoed back to the client.
Now we need to create a server and install this HttpHandler:
108 public static void main(String[] args) {
109
110 // create a basic server that listens on port 8080.
111 final HttpServer server = HttpServer.createSimpleServer();
112
113 final ServerConfiguration config = server.getServerConfiguration();
114
115 // Map the path, /echo, to the NonBlockingEchoHandler
116 config.addHttpHandler(new NonBlockingEchoHandler(), "/echo");
117
118 try {
119 server.start();
120 Client client = new Client();
121 client.run();
122 } catch (IOException ioe) {
123 LOGGER.log(Level.SEVERE, ioe.toString(), ioe);
124 } finally {
125 server.stop();
126 }
127 }This part of the example is pretty straight forward. Create the server, install the HttpHandler to service requests made to /echo, and start the server.
The client code which will follow will be sending data slowly to exercise the non-blocking HttpHandler. The client code relies on the http module primitives, so there's a little more code here to get this part of the example going. Let's start with the client Filter that sends request.
199 private static final class ClientFilter extends BaseFilter {
200
201 private static final String[] CONTENT = {
202 "contentA-",
203 "contentB-",
204 "contentC-",
205 "contentD"
206 };
207
208 private FutureImpl<String> future;
209
210 private StringBuilder sb = new StringBuilder();
211
212 // ---------------------------------------------------- Constructors
213
214
215 private ClientFilter(FutureImpl<String> future) {
216 this.future = future;
217 }
218
219
220 // ----------------------------------------- Methods from BaseFilter
221
222
223 @SuppressWarnings({"unchecked"})
224 @Override
225 public NextAction handleConnect(FilterChainContext ctx) throws IOException {
226 System.out.println("\nClient connected!\n");
227
228 HttpRequestPacket request = createRequest();
229 System.out.println("Writing request:\n");
230 System.out.println(request.toString());
231 ctx.write(request); // write the request
232
233 // for each of the content parts in CONTENT, wrap in a Buffer,
234 // create the HttpContent to wrap the buffer and write the
235 // content.
236 MemoryManager mm = ctx.getConnection().getTransport().getMemoryManager();
237 for (int i = 0, len = CONTENT.length; i < len; i++) {
238 HttpContent.Builder contentBuilder = request.httpContentBuilder();
239 Buffer b = Buffers.wrap(mm, CONTENT[i]);
240 contentBuilder.content(b);
241 HttpContent content = contentBuilder.build();
242 System.out.printf("(Client writing: %s)\n", b.toStringContent());
243 ctx.write(content);
244 try {
245 Thread.sleep(2000);
246 } catch (InterruptedException e) {
247 e.printStackTrace();
248 }
249 }
250
251 // since the request created by createRequest() is chunked,
252 // we need to write the trailer to signify the end of the
253 // POST data
254 ctx.write(request.httpTrailerBuilder().build());
255
256 System.out.println("\n");
257
258 return ctx.getStopAction(); // discontinue filter chain execution
259
260 }
261
262
263 @Override
264 public NextAction handleRead(FilterChainContext ctx) throws IOException {
265
266 HttpContent c = (HttpContent) ctx.getMessage();
267 Buffer b = c.getContent();
268 if (b.hasRemaining()) {
269 sb.append(b.toStringContent());
270 }
271
272 // Last content from the server, set the future result so
273 // the client can display the result and gracefully exit.
274 if (c.isLast()) {
275 future.result(sb.toString());
276 }
277 return ctx.getStopAction(); // discontinue filter chain execution
278
279 }
280
281
282 // ------------------------------------------------- Private Methods
283
284
285 private HttpRequestPacket createRequest() {
286
287 HttpRequestPacket.Builder builder = HttpRequestPacket.builder();
288 builder.method("POST");
289 builder.protocol("HTTP/1.1");
290 builder.uri("/echo");
291 builder.chunked(true);
292 HttpRequestPacket packet = builder.build();
293 packet.addHeader("Host", HOST + ':' + PORT);
294 return packet;
295
296 }
297
298 }
299
300 } // END ClientHigh level points about this code:
lines 226-261 - When the connection is established with the server, the handleConnect() method of this Filter will be invoked. When this happens we create a HttpRequestPacket, which contains only message part of a POST request. wThe request is then written.
lines 237-249 writes the body of the message in 2 second intervals.
line 255 - Write the trailer to signify the end of the request since the chunked transfer encoding is being used.
lines 265-280 - Read the response from the server. Store the final result in a future to be retrieved later.
Lastly we need to define a client that utilizes the Filter that was just described:
133 private static final class Client {
134
135 private static final String HOST = "localhost";
136 private static final int PORT = 8080;
137
138 public void run() throws IOException {
139 final FutureImpl<String> completeFuture = SafeFutureImpl.create();
140
141 // Build HTTP client filter chain
142 FilterChainBuilder clientFilterChainBuilder = FilterChainBuilder.stateless();
143 // Add transport filter
144 clientFilterChainBuilder.add(new TransportFilter());
145
146 // Add HttpClientFilter, which transforms Buffer <-> HttpContent
147 clientFilterChainBuilder.add(new HttpClientFilter());
148 // Add ClientFilter
149 clientFilterChainBuilder.add(new ClientFilter(completeFuture));
150
151
152 // Initialize Transport
153 final TCPNIOTransport transport =
154 TCPNIOTransportBuilder.newInstance().build();
155 // Set filterchain as a Transport Processor
156 transport.setProcessor(clientFilterChainBuilder.build());
157
158 try {
159 // start the transport
160 transport.start();
161
162 Connection connection = null;
163
164 // Connecting to a remote Web server
165 Future<Connection> connectFuture = transport.connect(HOST, PORT);
166 try {
167 // Wait until the client connect operation will be completed
168 // Once connection has been established, the POST will
169 // be sent to the server.
170 connection = connectFuture.get(10, TimeUnit.SECONDS);
171
172 // Wait no longer than 30 seconds for the response from the
173 // server to be complete.
174 String result = completeFuture.get(30, TimeUnit.SECONDS);
175
176 // Display the echoed content
177 System.out.println("\nEchoed POST Data: " + result + '\n');
178 } catch (Exception e) {
179 if (connection == null) {
180 LOGGER.log(Level.WARNING, "Connection failed. Server is not listening.");
181 } else {
182 LOGGER.log(Level.WARNING, "Unexpected error communicating with the server.");
183 }
184 } finally {
185 // Close the client connection
186 if (connection != null) {
187 connection.close();
188 }
189 }
190 } finally {
191 // stop the transport
192 transport.stop();
193 }
194 }The comments within the Client code should be sufficient to explain what's going on here. When running the complete example the output will look something like:
Sep 22, 2011 3:22:58 PM org.glassfish.grizzly.http.server.NetworkListener start
INFO: Started listener bound to [0.0.0.0:8080]
Sep 22, 2011 3:22:58 PM org.glassfish.grizzly.http.server.HttpServer start
INFO: [HttpServer] Started.
Client connected!
Writing request:
HttpRequestPacket (
method=POST
url=/echo
query=null
protocol=HTTP/1.1
content-length=-1
headers=[
Host=localhost:8080]
)
(Client writing: contentA-)
[onDataAvailable] echoing 9 bytes
(delay 2 seconds)
(Client writing: contentB-)
[onDataAvailable] echoing 9 bytes
(delay 2 seconds)
(Client writing: contentC-)
[onDataAvailable] echoing 9 bytes
(delay 2 seconds)
(Client writing: contentD)
[onDataAvailable] echoing 8 bytes
[onAllDataRead] length: 0
Echoed POST Data: contentA-contentB-contentC-contentD
Sep 22, 2011 3:23:06 PM org.glassfish.grizzly.http.server.NetworkListener stop
INFO: Stopped listener bound to [0.0.0.0:8080]A quick note about the output above, the (delay 2 seconds) isn't actually output. It's been added to visualize the artificial delay added by the Filter used by the client.
This example in its entirety is available within the samples section of the Grizzly 2.2.10 repository.