1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import com.jcraft.jzlib.Deflater;
19 import com.jcraft.jzlib.JZlib;
20 import io.netty.buffer.ByteBuf;
21 import io.netty.buffer.Unpooled;
22 import io.netty.channel.ChannelFuture;
23 import io.netty.channel.ChannelFutureListener;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.channel.ChannelPromise;
26 import io.netty.channel.ChannelPromiseNotifier;
27 import io.netty.util.concurrent.EventExecutor;
28 import io.netty.util.internal.EmptyArrays;
29 import io.netty.util.internal.OneTimeTask;
30
31 import java.util.concurrent.TimeUnit;
32
33
34
35
36 public class JZlibEncoder extends ZlibEncoder {
37
38 private final int wrapperOverhead;
39 private final Deflater z = new Deflater();
40 private volatile boolean finished;
41 private volatile ChannelHandlerContext ctx;
42
43
44
45
46
47
48
49
50 public JZlibEncoder() {
51 this(6);
52 }
53
54
55
56
57
58
59
60
61
62
63
64
65
66 public JZlibEncoder(int compressionLevel) {
67 this(ZlibWrapper.ZLIB, compressionLevel);
68 }
69
70
71
72
73
74
75
76
77 public JZlibEncoder(ZlibWrapper wrapper) {
78 this(wrapper, 6);
79 }
80
81
82
83
84
85
86
87
88
89
90
91
92
93 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel) {
94 this(wrapper, compressionLevel, 15, 8);
95 }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119 public JZlibEncoder(ZlibWrapper wrapper, int compressionLevel, int windowBits, int memLevel) {
120
121 if (compressionLevel < 0 || compressionLevel > 9) {
122 throw new IllegalArgumentException(
123 "compressionLevel: " + compressionLevel +
124 " (expected: 0-9)");
125 }
126 if (windowBits < 9 || windowBits > 15) {
127 throw new IllegalArgumentException(
128 "windowBits: " + windowBits + " (expected: 9-15)");
129 }
130 if (memLevel < 1 || memLevel > 9) {
131 throw new IllegalArgumentException(
132 "memLevel: " + memLevel + " (expected: 1-9)");
133 }
134 if (wrapper == null) {
135 throw new NullPointerException("wrapper");
136 }
137 if (wrapper == ZlibWrapper.ZLIB_OR_NONE) {
138 throw new IllegalArgumentException(
139 "wrapper '" + ZlibWrapper.ZLIB_OR_NONE + "' is not " +
140 "allowed for compression.");
141 }
142
143 int resultCode = z.init(
144 compressionLevel, windowBits, memLevel,
145 ZlibUtil.convertWrapperType(wrapper));
146 if (resultCode != JZlib.Z_OK) {
147 ZlibUtil.fail(z, "initialization failure", resultCode);
148 }
149
150 wrapperOverhead = ZlibUtil.wrapperOverhead(wrapper);
151 }
152
153
154
155
156
157
158
159
160
161
162
163
164 public JZlibEncoder(byte[] dictionary) {
165 this(6, dictionary);
166 }
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183 public JZlibEncoder(int compressionLevel, byte[] dictionary) {
184 this(compressionLevel, 15, 8, dictionary);
185 }
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212 public JZlibEncoder(int compressionLevel, int windowBits, int memLevel, byte[] dictionary) {
213 if (compressionLevel < 0 || compressionLevel > 9) {
214 throw new IllegalArgumentException("compressionLevel: " + compressionLevel + " (expected: 0-9)");
215 }
216 if (windowBits < 9 || windowBits > 15) {
217 throw new IllegalArgumentException(
218 "windowBits: " + windowBits + " (expected: 9-15)");
219 }
220 if (memLevel < 1 || memLevel > 9) {
221 throw new IllegalArgumentException(
222 "memLevel: " + memLevel + " (expected: 1-9)");
223 }
224 if (dictionary == null) {
225 throw new NullPointerException("dictionary");
226 }
227 int resultCode;
228 resultCode = z.deflateInit(
229 compressionLevel, windowBits, memLevel,
230 JZlib.W_ZLIB);
231 if (resultCode != JZlib.Z_OK) {
232 ZlibUtil.fail(z, "initialization failure", resultCode);
233 } else {
234 resultCode = z.deflateSetDictionary(dictionary, dictionary.length);
235 if (resultCode != JZlib.Z_OK) {
236 ZlibUtil.fail(z, "failed to set the dictionary", resultCode);
237 }
238 }
239
240 wrapperOverhead = ZlibUtil.wrapperOverhead(ZlibWrapper.ZLIB);
241 }
242
243 @Override
244 public ChannelFuture close() {
245 return close(ctx().channel().newPromise());
246 }
247
248 @Override
249 public ChannelFuture close(final ChannelPromise promise) {
250 ChannelHandlerContext ctx = ctx();
251 EventExecutor executor = ctx.executor();
252 if (executor.inEventLoop()) {
253 return finishEncode(ctx, promise);
254 } else {
255 final ChannelPromise p = ctx.newPromise();
256 executor.execute(new OneTimeTask() {
257 @Override
258 public void run() {
259 ChannelFuture f = finishEncode(ctx(), p);
260 f.addListener(new ChannelPromiseNotifier(promise));
261 }
262 });
263 return p;
264 }
265 }
266
267 private ChannelHandlerContext ctx() {
268 ChannelHandlerContext ctx = this.ctx;
269 if (ctx == null) {
270 throw new IllegalStateException("not added to a pipeline");
271 }
272 return ctx;
273 }
274
275 @Override
276 public boolean isClosed() {
277 return finished;
278 }
279
280 @Override
281 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
282 if (finished) {
283 out.writeBytes(in);
284 return;
285 }
286
287 int inputLength = in.readableBytes();
288 if (inputLength == 0) {
289 return;
290 }
291
292 try {
293
294 boolean inHasArray = in.hasArray();
295 z.avail_in = inputLength;
296 if (inHasArray) {
297 z.next_in = in.array();
298 z.next_in_index = in.arrayOffset() + in.readerIndex();
299 } else {
300 byte[] array = new byte[inputLength];
301 in.getBytes(in.readerIndex(), array);
302 z.next_in = array;
303 z.next_in_index = 0;
304 }
305 int oldNextInIndex = z.next_in_index;
306
307
308 int maxOutputLength = (int) Math.ceil(inputLength * 1.001) + 12 + wrapperOverhead;
309 out.ensureWritable(maxOutputLength);
310 z.avail_out = maxOutputLength;
311 z.next_out = out.array();
312 z.next_out_index = out.arrayOffset() + out.writerIndex();
313 int oldNextOutIndex = z.next_out_index;
314
315
316 int resultCode;
317 try {
318 resultCode = z.deflate(JZlib.Z_SYNC_FLUSH);
319 } finally {
320 in.skipBytes(z.next_in_index - oldNextInIndex);
321 }
322
323 if (resultCode != JZlib.Z_OK) {
324 ZlibUtil.fail(z, "compression failure", resultCode);
325 }
326
327 int outputLength = z.next_out_index - oldNextOutIndex;
328 if (outputLength > 0) {
329 out.writerIndex(out.writerIndex() + outputLength);
330 }
331 } finally {
332
333
334
335
336 z.next_in = null;
337 z.next_out = null;
338 }
339 }
340
341 @Override
342 public void close(
343 final ChannelHandlerContext ctx,
344 final ChannelPromise promise) {
345 ChannelFuture f = finishEncode(ctx, ctx.newPromise());
346 f.addListener(new ChannelFutureListener() {
347 @Override
348 public void operationComplete(ChannelFuture f) throws Exception {
349 ctx.close(promise);
350 }
351 });
352
353 if (!f.isDone()) {
354
355 ctx.executor().schedule(new OneTimeTask() {
356 @Override
357 public void run() {
358 ctx.close(promise);
359 }
360 }, 10, TimeUnit.SECONDS);
361 }
362 }
363
364 private ChannelFuture finishEncode(ChannelHandlerContext ctx, ChannelPromise promise) {
365 if (finished) {
366 promise.setSuccess();
367 return promise;
368 }
369 finished = true;
370
371 ByteBuf footer;
372 try {
373
374 z.next_in = EmptyArrays.EMPTY_BYTES;
375 z.next_in_index = 0;
376 z.avail_in = 0;
377
378
379 byte[] out = new byte[32];
380 z.next_out = out;
381 z.next_out_index = 0;
382 z.avail_out = out.length;
383
384
385 int resultCode = z.deflate(JZlib.Z_FINISH);
386 if (resultCode != JZlib.Z_OK && resultCode != JZlib.Z_STREAM_END) {
387 promise.setFailure(ZlibUtil.deflaterException(z, "compression failure", resultCode));
388 return promise;
389 } else if (z.next_out_index != 0) {
390 footer = Unpooled.wrappedBuffer(out, 0, z.next_out_index);
391 } else {
392 footer = Unpooled.EMPTY_BUFFER;
393 }
394 } finally {
395 z.deflateEnd();
396
397
398
399
400
401 z.next_in = null;
402 z.next_out = null;
403 }
404 return ctx.writeAndFlush(footer, promise);
405 }
406
407 @Override
408 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
409 this.ctx = ctx;
410 }
411 }