1212
1313public  class  CompressOutputStream  extends  OutputStream  {
1414 private  static  final  int  MIN_COMPRESSION_SIZE  = 1536 ; // TCP-IP single packet 
15-  private  static  final  int  SMALL_BUFFER_SIZE  = 8192 ;
16-  private  static  final  int  MEDIUM_BUFFER_SIZE  = 128  * 1024 ;
17-  private  static  final  int  LARGE_BUFFER_SIZE  = 1024  * 1024 ;
18-  private  static  final  int  MAX_PACKET_LENGTH  = 0x00ffffff  + 7 ;
19-  private  int  maxPacketLength  = MAX_PACKET_LENGTH ;
2015 private  final  OutputStream  out ;
2116 private  final  MutableInt  sequence ;
22-  private  byte [] buf  = new  byte [SMALL_BUFFER_SIZE ];
23-  private  int  pos  = 7 ;
17+  private  final  byte [] header  = new  byte [7 ];
2418
2519 public  CompressOutputStream (OutputStream  out , MutableInt  compressionSequence ) {
2620 this .out  = out ;
2721 this .sequence  = compressionSequence ;
2822 }
2923
30-  public  void  setMaxAllowedPacket (int  maxAllowedPacket ) {
31-  maxPacketLength  = Math .min (MAX_PACKET_LENGTH , maxAllowedPacket  + 7 );
32-  }
33- 
34-  /** 
35-  * buf growing use 4 size only to avoid creating/copying that are expensive operations. possible 
36-  * size 
37-  * 
38-  * <ol> 
39-  * <li>SMALL_buf_SIZE = 8k (default) 
40-  * <li>MEDIUM_buf_SIZE = 128k 
41-  * <li>LARGE_buf_SIZE = 1M 
42-  * <li>getMaxPacketLength = 16M (+ 4 if using no compression) 
43-  * </ol> 
44-  * 
45-  * @param len length to add 
46-  */ 
47-  private  void  growBuffer (int  len ) {
48-  int  bufLength  = buf .length ;
49-  int  newCapacity ;
50-  if  (bufLength  == SMALL_BUFFER_SIZE ) {
51-  if  (len  + pos  < MEDIUM_BUFFER_SIZE ) {
52-  newCapacity  = MEDIUM_BUFFER_SIZE ;
53-  } else  if  (len  + pos  < LARGE_BUFFER_SIZE ) {
54-  newCapacity  = LARGE_BUFFER_SIZE ;
55-  } else  {
56-  newCapacity  = maxPacketLength ;
57-  }
58-  } else  if  (bufLength  == MEDIUM_BUFFER_SIZE ) {
59-  if  (len  + pos  < LARGE_BUFFER_SIZE ) {
60-  newCapacity  = LARGE_BUFFER_SIZE ;
61-  } else  {
62-  newCapacity  = maxPacketLength ;
63-  }
64-  } else  {
65-  newCapacity  = maxPacketLength ;
66-  }
67- 
68-  byte [] newBuf  = new  byte [newCapacity ];
69-  System .arraycopy (buf , 0 , newBuf , 0 , pos );
70-  buf  = newBuf ;
71-  }
72- 
7324 /** 
7425 * Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> 
7526 * to this output stream. The general contract for <code>write(b, off, len)</code> is that some 
@@ -95,77 +46,47 @@ private void growBuffer(int len) {
9546 */ 
9647 @ Override 
9748 public  void  write (byte [] b , int  off , int  len ) throws  IOException  {
98-  if  (pos  + len  >= buf .length ) growBuffer (len );
99-  if  (pos  + len  >= buf .length ) {
100-  // fill buffer, then flush 
101-  while  (len  > buf .length  - pos ) {
102-  int  writeLen  = buf .length  - pos ;
103-  System .arraycopy (b , off , buf , pos , writeLen );
104-  pos  += writeLen ;
105-  writeSocket (false );
106-  off  += writeLen ;
107-  len  -= writeLen ;
108-  }
109-  }
110-  // enough place in buffer. 
111-  System .arraycopy (b , off , buf , pos , len );
112-  pos  += len ;
113-  }
114- 
115-  private  void  writeSocket (boolean  end ) throws  IOException  {
116-  if  (pos  > 7 ) {
117-  if  (pos  < MIN_COMPRESSION_SIZE ) {
118-  // ******************************************************************************* 
119-  // small packet, no compression 
120-  // ******************************************************************************* 
121- 
122-  buf [0 ] = (byte ) (pos  - 7 );
123-  buf [1 ] = (byte ) ((pos  - 7 ) >>> 8 );
124-  buf [2 ] = 0 ;
125-  buf [3 ] = sequence .incrementAndGet ();
126-  buf [4 ] = 0 ;
127-  buf [5 ] = 0 ;
128-  buf [6 ] = 0 ;
129- 
130-  out .write (buf , 0 , pos );
49+  if  (len  < MIN_COMPRESSION_SIZE ) {
50+  // ******************************************************************************* 
51+  // small packet, no compression 
52+  // ******************************************************************************* 
53+ 
54+  header [0 ] = (byte ) len ;
55+  header [1 ] = (byte ) (len  >>> 8 );
56+  header [2 ] = 0 ;
57+  header [3 ] = sequence .incrementAndGet ();
58+  header [4 ] = 0 ;
59+  header [5 ] = 0 ;
60+  header [6 ] = 0 ;
61+  out .write (header , 0 , 7 );
62+  out .write (b , off , len );
13163
132-  } else  {
133- 
134-  // ******************************************************************************* 
135-  // compressing packet 
136-  // ******************************************************************************* 
137-  try  (ByteArrayOutputStream  baos  = new  ByteArrayOutputStream ()) {
138-  try  (DeflaterOutputStream  deflater  = new  DeflaterOutputStream (baos )) {
139-  deflater .write (buf , 7 , pos  - 7 );
140-  deflater .finish ();
141-  }
64+  } else  {
14265
143-  byte [] compressedBytes  = baos .toByteArray ();
66+  // ******************************************************************************* 
67+  // compressing packet 
68+  // ******************************************************************************* 
69+  try  (ByteArrayOutputStream  baos  = new  ByteArrayOutputStream ()) {
70+  try  (DeflaterOutputStream  deflater  = new  DeflaterOutputStream (baos )) {
71+  deflater .write (b , off , len );
72+  deflater .finish ();
73+  }
14474
145-    int   compressLen  =  compressedBytes . length ;
75+  byte []  compressedBytes  =  baos . toByteArray () ;
14676
147-  byte [] header  = new  byte [7 ];
148-  header [0 ] = (byte ) compressLen ;
149-  header [1 ] = (byte ) (compressLen  >>> 8 );
150-  header [2 ] = (byte ) (compressLen  >>> 16 );
151-  header [3 ] = sequence .incrementAndGet ();
152-  header [4 ] = (byte ) (pos  - 7 );
153-  header [5 ] = (byte ) ((pos  - 7 ) >>> 8 );
154-  header [6 ] = (byte ) ((pos  - 7 ) >>> 16 );
77+  int  compressLen  = compressedBytes .length ;
15578
156-  out .write (header , 0 , 7 );
157-  out .write (compressedBytes , 0 , compressLen );
158-  }
159-  }
79+  header [0 ] = (byte ) compressLen ;
80+  header [1 ] = (byte ) (compressLen  >>> 8 );
81+  header [2 ] = (byte ) (compressLen  >>> 16 );
82+  header [3 ] = sequence .incrementAndGet ();
83+  header [4 ] = (byte ) len ;
84+  header [5 ] = (byte ) (len  >>> 8 );
85+  header [6 ] = (byte ) (len  >>> 16 );
16086
161-  if  (end ) {
162-  // if buf is big, and last query doesn't use at least half of it, resize buf to default 
163-  // value 
164-  if  (buf .length  > SMALL_BUFFER_SIZE  && pos  * 2  < buf .length ) {
165-  buf  = new  byte [SMALL_BUFFER_SIZE ];
166-  }
87+  out .write (header , 0 , 7 );
88+  out .write (compressedBytes , 0 , compressLen );
16789 }
168-  pos  = 7 ;
16990 }
17091 }
17192
@@ -186,7 +107,6 @@ private void writeSocket(boolean end) throws IOException {
186107 */ 
187108 @ Override 
188109 public  void  flush () throws  IOException  {
189-  writeSocket (true );
190110 out .flush ();
191111 sequence .set ((byte ) -1 );
192112 }
0 commit comments