1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.http;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.buffer.ByteBufHolder;
20 import io.netty.channel.ChannelHandlerContext;
21 import io.netty.channel.embedded.EmbeddedChannel;
22 import io.netty.handler.codec.MessageToMessageCodec;
23 import io.netty.handler.codec.http.HttpHeaders.Names;
24 import io.netty.handler.codec.http.HttpHeaders.Values;
25 import io.netty.util.ReferenceCountUtil;
26
27 import java.util.ArrayDeque;
28 import java.util.List;
29 import java.util.Queue;
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 public abstract class HttpContentEncoder extends MessageToMessageCodec<HttpRequest, HttpObject> {
54
55 private enum State {
56 PASS_THROUGH,
57 AWAIT_HEADERS,
58 AWAIT_CONTENT
59 }
60
61 private static final CharSequence ZERO_LENGTH_HEAD = "HEAD";
62 private static final CharSequence ZERO_LENGTH_CONNECT = "CONNECT";
63 private static final int CONTINUE_CODE = HttpResponseStatus.CONTINUE.code();
64
65 private final Queue<CharSequence> acceptEncodingQueue = new ArrayDeque<CharSequence>();
66 private CharSequence acceptEncoding;
67 private EmbeddedChannel encoder;
68 private State state = State.AWAIT_HEADERS;
69
70 @Override
71 public boolean acceptOutboundMessage(Object msg) throws Exception {
72 return msg instanceof HttpContent || msg instanceof HttpResponse;
73 }
74
75 @Override
76 protected void decode(ChannelHandlerContext ctx, HttpRequest msg, List<Object> out)
77 throws Exception {
78 CharSequence acceptedEncoding = msg.headers().get(HttpHeaders.Names.ACCEPT_ENCODING);
79 if (acceptedEncoding == null) {
80 acceptedEncoding = HttpHeaders.Values.IDENTITY;
81 }
82
83 HttpMethod meth = msg.getMethod();
84 if (meth == HttpMethod.HEAD) {
85 acceptedEncoding = ZERO_LENGTH_HEAD;
86 } else if (meth == HttpMethod.CONNECT) {
87 acceptedEncoding = ZERO_LENGTH_CONNECT;
88 }
89
90 acceptEncodingQueue.add(acceptedEncoding);
91 out.add(ReferenceCountUtil.retain(msg));
92 }
93
94 @Override
95 protected void encode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out) throws Exception {
96 final boolean isFull = msg instanceof HttpResponse && msg instanceof LastHttpContent;
97 switch (state) {
98 case AWAIT_HEADERS: {
99 ensureHeaders(msg);
100 assert encoder == null;
101
102 final HttpResponse res = (HttpResponse) msg;
103 final int code = res.getStatus().code();
104 if (code == CONTINUE_CODE) {
105
106
107 acceptEncoding = null;
108 } else {
109
110 acceptEncoding = acceptEncodingQueue.poll();
111 if (acceptEncoding == null) {
112 throw new IllegalStateException("cannot send more responses than requests");
113 }
114 }
115
116
117
118
119
120
121
122
123
124
125
126
127 if (isPassthru(code, acceptEncoding)) {
128 if (isFull) {
129 out.add(ReferenceCountUtil.retain(res));
130 } else {
131 out.add(res);
132
133 state = State.PASS_THROUGH;
134 }
135 break;
136 }
137
138 if (isFull) {
139
140 if (!((ByteBufHolder) res).content().isReadable()) {
141 out.add(ReferenceCountUtil.retain(res));
142 break;
143 }
144 }
145
146
147 final Result result = beginEncode(res, acceptEncoding.toString());
148
149
150 if (result == null) {
151 if (isFull) {
152 out.add(ReferenceCountUtil.retain(res));
153 } else {
154 out.add(res);
155
156 state = State.PASS_THROUGH;
157 }
158 break;
159 }
160
161 encoder = result.contentEncoder();
162
163
164
165 res.headers().set(Names.CONTENT_ENCODING, result.targetContentEncoding());
166
167
168 res.headers().remove(Names.CONTENT_LENGTH);
169 res.headers().set(Names.TRANSFER_ENCODING, Values.CHUNKED);
170
171
172 if (isFull) {
173
174 HttpResponse newRes = new DefaultHttpResponse(res.getProtocolVersion(), res.getStatus());
175 newRes.headers().set(res.headers());
176 out.add(newRes);
177
178 } else {
179 out.add(res);
180 state = State.AWAIT_CONTENT;
181 if (!(msg instanceof HttpContent)) {
182
183
184 break;
185 }
186
187 }
188 }
189 case AWAIT_CONTENT: {
190 ensureContent(msg);
191 if (encodeContent((HttpContent) msg, out)) {
192 state = State.AWAIT_HEADERS;
193 }
194 break;
195 }
196 case PASS_THROUGH: {
197 ensureContent(msg);
198 out.add(ReferenceCountUtil.retain(msg));
199
200 if (msg instanceof LastHttpContent) {
201 state = State.AWAIT_HEADERS;
202 }
203 break;
204 }
205 }
206 }
207
208 private static boolean isPassthru(int code, CharSequence httpMethod) {
209 return code < 200 || code == 204 || code == 304 ||
210 (httpMethod == ZERO_LENGTH_HEAD || (httpMethod == ZERO_LENGTH_CONNECT && code == 200));
211 }
212
213 private static void ensureHeaders(HttpObject msg) {
214 if (!(msg instanceof HttpResponse)) {
215 throw new IllegalStateException(
216 "unexpected message type: " +
217 msg.getClass().getName() + " (expected: " + HttpResponse.class.getSimpleName() + ')');
218 }
219 }
220
221 private static void ensureContent(HttpObject msg) {
222 if (!(msg instanceof HttpContent)) {
223 throw new IllegalStateException(
224 "unexpected message type: " +
225 msg.getClass().getName() + " (expected: " + HttpContent.class.getSimpleName() + ')');
226 }
227 }
228
229 private boolean encodeContent(HttpContent c, List<Object> out) {
230 ByteBuf content = c.content();
231
232 encode(content, out);
233
234 if (c instanceof LastHttpContent) {
235 finishEncode(out);
236 LastHttpContent last = (LastHttpContent) c;
237
238
239
240 HttpHeaders headers = last.trailingHeaders();
241 if (headers.isEmpty()) {
242 out.add(LastHttpContent.EMPTY_LAST_CONTENT);
243 } else {
244 out.add(new ComposedLastHttpContent(headers));
245 }
246 return true;
247 }
248 return false;
249 }
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265 protected abstract Result beginEncode(HttpResponse headers, String acceptEncoding) throws Exception;
266
267 @Override
268 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
269 cleanup();
270 super.handlerRemoved(ctx);
271 }
272
273 @Override
274 public void channelInactive(ChannelHandlerContext ctx) throws Exception {
275 cleanup();
276 super.channelInactive(ctx);
277 }
278
279 private void cleanup() {
280 if (encoder != null) {
281
282 if (encoder.finish()) {
283 for (;;) {
284 ByteBuf buf = (ByteBuf) encoder.readOutbound();
285 if (buf == null) {
286 break;
287 }
288
289
290 buf.release();
291 }
292 }
293 encoder = null;
294 }
295 }
296
297 private void encode(ByteBuf in, List<Object> out) {
298
299 encoder.writeOutbound(in.retain());
300 fetchEncoderOutput(out);
301 }
302
303 private void finishEncode(List<Object> out) {
304 if (encoder.finish()) {
305 fetchEncoderOutput(out);
306 }
307 encoder = null;
308 }
309
310 private void fetchEncoderOutput(List<Object> out) {
311 for (;;) {
312 ByteBuf buf = (ByteBuf) encoder.readOutbound();
313 if (buf == null) {
314 break;
315 }
316 if (!buf.isReadable()) {
317 buf.release();
318 continue;
319 }
320 out.add(new DefaultHttpContent(buf));
321 }
322 }
323
324 public static final class Result {
325 private final String targetContentEncoding;
326 private final EmbeddedChannel contentEncoder;
327
328 public Result(String targetContentEncoding, EmbeddedChannel contentEncoder) {
329 if (targetContentEncoding == null) {
330 throw new NullPointerException("targetContentEncoding");
331 }
332 if (contentEncoder == null) {
333 throw new NullPointerException("contentEncoder");
334 }
335
336 this.targetContentEncoding = targetContentEncoding;
337 this.contentEncoder = contentEncoder;
338 }
339
340 public String targetContentEncoding() {
341 return targetContentEncoding;
342 }
343
344 public EmbeddedChannel contentEncoder() {
345 return contentEncoder;
346 }
347 }
348 }