View Javadoc

1   /*
2    * Copyright 2012 The Netty Project
3    *
4    * The Netty Project licenses this file to you under the Apache License,
5    * version 2.0 (the "License"); you may not use this file except in compliance
6    * with the License. You may obtain a copy of the License at:
7    *
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12   * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13   * License for the specific language governing permissions and limitations
14   * under the License.
15   */
16  package io.netty.handler.codec.compression;
17  
18  import io.netty.buffer.ByteBuf;
19  import io.netty.channel.ChannelHandlerContext;
20  import io.netty.handler.codec.MessageToByteEncoder;
21  
22  import static io.netty.handler.codec.compression.Snappy.*;
23  
24  /**
25   * Compresses a [email protected] ByteBuf} using the Snappy framing format.
26   *
27   * See http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
28   */
29  public class SnappyFrameEncoder extends MessageToByteEncoder<ByteBuf> {
30      /**
31       * The minimum amount that we'll consider actually attempting to compress.
32       * This value is preamble + the minimum length our Snappy service will
33       * compress (instead of just emitting a literal).
34       */
35      private static final int MIN_COMPRESSIBLE_LENGTH = 18;
36  
37      /**
38       * All streams should start with the "Stream identifier", containing chunk
39       * type 0xff, a length field of 0x6, and 'sNaPpY' in ASCII.
40       */
41      private static final byte[] STREAM_START = {
42          (byte) 0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59
43      };
44  
45      private final Snappy snappy = new Snappy();
46      private boolean started;
47  
48      @Override
49      protected void encode(ChannelHandlerContext ctx, ByteBuf in, ByteBuf out) throws Exception {
50          if (!in.isReadable()) {
51              return;
52          }
53  
54          if (!started) {
55              started = true;
56              out.writeBytes(STREAM_START);
57          }
58  
59          int dataLength = in.readableBytes();
60          if (dataLength > MIN_COMPRESSIBLE_LENGTH) {
61              for (;;) {
62                  final int lengthIdx = out.writerIndex() + 1;
63                  if (dataLength < MIN_COMPRESSIBLE_LENGTH) {
64                      ByteBuf slice = in.readSlice(dataLength);
65                      writeUnencodedChunk(slice, out, dataLength);
66                      break;
67                  }
68  
69                  out.writeInt(0);
70                  if (dataLength > Short.MAX_VALUE) {
71                      ByteBuf slice = in.readSlice(Short.MAX_VALUE);
72                      calculateAndWriteChecksum(slice, out);
73                      snappy.encode(slice, out, Short.MAX_VALUE);
74                      setChunkLength(out, lengthIdx);
75                      dataLength -= Short.MAX_VALUE;
76                  } else {
77                      ByteBuf slice = in.readSlice(dataLength);
78                      calculateAndWriteChecksum(slice, out);
79                      snappy.encode(slice, out, dataLength);
80                      setChunkLength(out, lengthIdx);
81                      break;
82                  }
83              }
84          } else {
85              writeUnencodedChunk(in, out, dataLength);
86          }
87      }
88  
89      private static void writeUnencodedChunk(ByteBuf in, ByteBuf out, int dataLength) {
90          out.writeByte(1);
91          writeChunkLength(out, dataLength + 4);
92          calculateAndWriteChecksum(in, out);
93          out.writeBytes(in, dataLength);
94      }
95  
96      private static void setChunkLength(ByteBuf out, int lengthIdx) {
97          int chunkLength = out.writerIndex() - lengthIdx - 3;
98          if (chunkLength >>> 24 != 0) {
99              throw new CompressionException("compressed data too large: " + chunkLength);
100         }
101         out.setMediumLE(lengthIdx, chunkLength);
102     }
103 
104     /**
105      * Writes the 2-byte chunk length to the output buffer.
106      *
107      * @param out The buffer to write to
108      * @param chunkLength The length to write
109      */
110     private static void writeChunkLength(ByteBuf out, int chunkLength) {
111         out.writeMediumLE(chunkLength);
112     }
113 
114     /**
115      * Calculates and writes the 4-byte checksum to the output buffer
116      *
117      * @param slice The data to calculate the checksum for
118      * @param out The output buffer to write the checksum to
119      */
120     private static void calculateAndWriteChecksum(ByteBuf slice, ByteBuf out) {
121         out.writeIntLE(calculateChecksum(slice));
122     }
123 }