1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package io.netty.handler.codec.compression;
17
18 import io.netty.buffer.ByteBuf;
19 import io.netty.channel.ChannelHandlerContext;
20 import io.netty.handler.codec.MessageToByteEncoder;
21
22 import static io.netty.handler.codec.compression.Snappy.*;
23
24
25
26
27
28
29 public class SnappyFrameEncoder extends MessageToByteEncoder<ByteBuf> {
30
31
32
33
34
35 private static final int MIN_COMPRESSIBLE_LENGTH = 18;
36
37
38
39
40
41 private static final byte[] STREAM_START = {
42 (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
43 };
44
45 private final Snappy snappy = new Snappy();
46 private boolean started;
47
48 @Override
49 protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
50 if (!in.isReadable()) {
51 return;
52 }
53
54 if (!started) {
55 started = true;
56 out.writeBytes(STREAM_START);
57 }
58
59 int dataLength = in.readableBytes();
60 if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
61 for (;;) {
62 final int lengthIdx = out.writerIndex() + 1;
63 if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
64 ByteBuf slice = in.readSlice(dataLength);
65 writeUnencodedChunk(slice, out, dataLength);
66 break;
67 }
68
69 out.writeInt(0);
70 if (dataLength > Short.MAX_VALUE) {
71 ByteBuf slice = in.readSlice(Short.MAX_VALUE);
72 calculateAndWriteChecksum(slice, out);
73 snappy.encode(slice, out, Short.MAX_VALUE);
74 setChunkLength(out, lengthIdx);
75 dataLength -= Short.MAX_VALUE;
76 } else {
77 ByteBuf slice = in.readSlice(dataLength);
78 calculateAndWriteChecksum(slice, out);
79 snappy.encode(slice, out, dataLength);
80 setChunkLength(out, lengthIdx);
81 break;
82 }
83 }
84 } else {
85 writeUnencodedChunk(in, out, dataLength);
86 }
87 }
88
89 private static void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
90 out.writeByte(1);
91 writeChunkLength(out, dataLength + 4);
92 calculateAndWriteChecksum(in, out);
93 out.writeBytes(in, dataLength);
94 }
95
96 private static void setChunkLength(ByteBuf out, int lengthIdx) {
97 int chunkLength = out.writerIndex() - lengthIdx - 3;
98 if (chunkLength >>> 24 != 0) {
99 throw new CompressionException("compressed data too large: " + chunkLength);
100 }
101 out.setMediumLE(lengthIdx, chunkLength);
102 }
103
104
105
106
107
108
109
110 private static void writeChunkLength(ByteBuf out, int chunkLength) {
111 out.writeMediumLE(chunkLength);
112 }
113
114
115
116
117
118
119
120 private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
121 out.writeIntLE(calculateChecksum(slice));
122 }
123 }