@@ -15,6 +15,7 @@ public class CompressOutputStream extends OutputStream {
1515 private  final  OutputStream  out ;
1616 private  final  MutableInt  sequence ;
1717 private  final  byte [] header  = new  byte [7 ];
18+  private  byte [] longPacketBuffer  = null ;
1819
1920 public  CompressOutputStream (OutputStream  out , MutableInt  compressionSequence ) {
2021 this .out  = out ;
@@ -46,11 +47,26 @@ public CompressOutputStream(OutputStream out, MutableInt compressionSequence) {
4647 */ 
4748 @ Override 
4849 public  void  write (byte [] b , int  off , int  len ) throws  IOException  {
49-  if  (len  < MIN_COMPRESSION_SIZE ) {
50+  if  (len  + (( longPacketBuffer  !=  null ) ?  longPacketBuffer . length  :  0 )  < MIN_COMPRESSION_SIZE ) {
5051 // ******************************************************************************* 
5152 // small packet, no compression 
5253 // ******************************************************************************* 
5354
55+  if  (longPacketBuffer  != null ) {
56+  header [0 ] = (byte ) (len  + longPacketBuffer .length );
57+  header [1 ] = (byte ) ((len  + longPacketBuffer .length ) >>> 8 );
58+  header [2 ] = 0 ;
59+  header [3 ] = sequence .incrementAndGet ();
60+  header [4 ] = 0 ;
61+  header [5 ] = 0 ;
62+  header [6 ] = 0 ;
63+  out .write (header , 0 , 7 );
64+  out .write (longPacketBuffer , 0 , longPacketBuffer .length );
65+  out .write (b , off , len );
66+  longPacketBuffer  = null ;
67+  return ;
68+  }
69+ 
5470 header [0 ] = (byte ) len ;
5571 header [1 ] = (byte ) (len  >>> 8 );
5672 header [2 ] = 0 ;
@@ -62,31 +78,51 @@ public void write(byte[] b, int off, int len) throws IOException {
6278 out .write (b , off , len );
6379
6480 } else  {
81+  // ******************************************************************************* 
82+  // compressing packet 
83+  // ******************************************************************************* 
84+  int  sent  = 0 ;
85+  try  (ByteArrayOutputStream  baos  = new  ByteArrayOutputStream ()) {
86+  try  (DeflaterOutputStream  deflater  = new  DeflaterOutputStream (baos )) {
6587
66-  // ******************************************************************************* 
67-  // compressing packet 
68-  // ******************************************************************************* 
69-  try  (ByteArrayOutputStream  baos  = new  ByteArrayOutputStream ()) {
70-  try  (DeflaterOutputStream  deflater  = new  DeflaterOutputStream (baos )) {
71-  deflater .write (b , off , len );
72-  deflater .finish ();
73-  }
88+  /** 
89+  * For multi packet, len will be 0x00ffffff + 4 bytes for header. 
90+  * but compression can only compress up to 0x00ffffff bytes (header initial length size cannot be > 3 bytes) 
91+  * so, for this specific case, a buffer will be 
92+  */ 
7493
75-  byte [] compressedBytes  = baos .toByteArray ();
94+  if  (longPacketBuffer  != null ) {
95+  deflater .write (longPacketBuffer , 0 , longPacketBuffer .length );
96+  sent  = longPacketBuffer .length ;
97+  longPacketBuffer  = null ;
98+  }
99+  if  ( len  + sent  > 0x00ffffff ) {
100+  int  remaining  = len  + sent  - 0x00ffffff ;
101+  longPacketBuffer  = new  byte [remaining ];
102+  System .arraycopy (b , off  + 0x00ffffff  - sent , longPacketBuffer , 0 , remaining );
103+  }
76104
77-  int  compressLen  = compressedBytes .length ;
105+  int  bufLenSent  = Math .min (0x00ffffff  - sent , len );
106+  deflater .write (b , off , bufLenSent );
107+  sent  += bufLenSent ;
108+  deflater .finish ();
109+  }
78110
79-  header [0 ] = (byte ) compressLen ;
80-  header [1 ] = (byte ) (compressLen  >>> 8 );
81-  header [2 ] = (byte ) (compressLen  >>> 16 );
82-  header [3 ] = sequence .incrementAndGet ();
83-  header [4 ] = (byte ) len ;
84-  header [5 ] = (byte ) (len  >>> 8 );
85-  header [6 ] = (byte ) (len  >>> 16 );
111+  byte [] compressedBytes  = baos .toByteArray ();
86112
87-  out .write (header , 0 , 7 );
88-  out .write (compressedBytes , 0 , compressLen );
89-  }
113+  int  compressLen  = compressedBytes .length ;
114+ 
115+  header [0 ] = (byte ) compressLen ;
116+  header [1 ] = (byte ) (compressLen  >>> 8 );
117+  header [2 ] = (byte ) (compressLen  >>> 16 );
118+  header [3 ] = sequence .incrementAndGet ();
119+  header [4 ] = (byte ) sent ;
120+  header [5 ] = (byte ) (sent  >>> 8 );
121+  header [6 ] = (byte ) (sent  >>> 16 );
122+ 
123+  out .write (header , 0 , 7 );
124+  out .write (compressedBytes , 0 , compressLen );
125+  }
90126 }
91127 }
92128
@@ -107,6 +143,11 @@ public void write(byte[] b, int off, int len) throws IOException {
107143 */ 
108144 @ Override 
109145 public  void  flush () throws  IOException  {
146+  if  (longPacketBuffer  != null ) {
147+  byte [] b  = longPacketBuffer ;
148+  longPacketBuffer  = null ;
149+  write (b , 0 , b .length );
150+  }
110151 out .flush ();
111152 sequence .set ((byte ) -1 );
112153 }
0 commit comments