1919import java .nio .ByteBuffer ;
2020import java .nio .charset .Charset ;
2121import java .nio .charset .StandardCharsets ;
22- import java .util .Objects ;
2322import java .util .function .Function ;
2423import java .util .function .IntPredicate ;
2524
2625import org .eclipse .jetty .util .BufferUtil ;
2726import org .eclipse .jetty .websocket .api .Callback ;
2827import org .eclipse .jetty .websocket .api .Session ;
29- import org .reactivestreams .Subscriber ;
30- import org .reactivestreams .Subscription ;
3128
3229import org .springframework .core .io .buffer .CloseableDataBuffer ;
3330import org .springframework .core .io .buffer .DataBuffer ;
4744 * @author Rossen Stoyanchev
4845 * @since 5.0
4946 */
50- @ SuppressWarnings ("NullAway" )
5147public class JettyWebSocketHandlerAdapter implements Session .Listener {
48+
5249private final WebSocketHandler delegateHandler ;
5350
5451private final Function <Session , JettyWebSocketSession > sessionFactory ;
5552
5653@ Nullable
5754private JettyWebSocketSession delegateSession ;
5855
56+
5957public JettyWebSocketHandlerAdapter (WebSocketHandler handler ,
6058Function <Session , JettyWebSocketSession > sessionFactory ) {
6159
@@ -67,32 +65,16 @@ public JettyWebSocketHandlerAdapter(WebSocketHandler handler,
6765
6866@ Override
6967public void onWebSocketOpen (Session session ) {
70- this .delegateSession = Objects .requireNonNull (this .sessionFactory .apply (session ));
71- this .delegateHandler .handle (this .delegateSession )
72- .subscribe (new Subscriber <>() {
73- @ Override
74- public void onSubscribe (Subscription s ) {
75- s .request (Long .MAX_VALUE );
76- }
77-
78- @ Override
79- public void onNext (Void unused ) {
80- }
81-
82- @ Override
83- public void onError (Throwable t ) {
84- delegateSession .onHandlerError (t );
85- }
86-
87- @ Override
88- public void onComplete () {
89- delegateSession .onHandleComplete ();
90- }
91- });
68+ JettyWebSocketSession delegateSession = this .sessionFactory .apply (session );
69+ this .delegateSession = delegateSession ;
70+ this .delegateHandler .handle (delegateSession )
71+ .checkpoint (session .getUpgradeRequest ().getRequestURI () + " [JettyWebSocketHandlerAdapter]" )
72+ .subscribe (unused -> {}, delegateSession ::onHandlerError , delegateSession ::onHandleComplete );
9273}
9374
9475@ Override
9576public void onWebSocketText (String message ) {
77+ Assert .state (this .delegateSession != null , "No delegate session available" );
9678byte [] bytes = message .getBytes (StandardCharsets .UTF_8 );
9779DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (bytes );
9880WebSocketMessage webSocketMessage = new WebSocketMessage (Type .TEXT , buffer );
@@ -101,37 +83,42 @@ public void onWebSocketText(String message) {
10183
10284@ Override
10385public void onWebSocketBinary (ByteBuffer byteBuffer , Callback callback ) {
86+ Assert .state (this .delegateSession != null , "No delegate session available" );
10487DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (byteBuffer );
105- buffer = new JettyDataBuffer (buffer , callback );
88+ buffer = new JettyCallbackDataBuffer (buffer , callback );
10689WebSocketMessage webSocketMessage = new WebSocketMessage (Type .BINARY , buffer );
10790this .delegateSession .handleMessage (webSocketMessage );
10891}
10992
11093@ Override
11194public void onWebSocketPong (ByteBuffer payload ) {
95+ Assert .state (this .delegateSession != null , "No delegate session available" );
11296DataBuffer buffer = this .delegateSession .bufferFactory ().wrap (BufferUtil .copy (payload ));
11397WebSocketMessage webSocketMessage = new WebSocketMessage (Type .PONG , buffer );
11498this .delegateSession .handleMessage (webSocketMessage );
11599}
116100
117101@ Override
118102public void onWebSocketClose (int statusCode , String reason ) {
103+ Assert .state (this .delegateSession != null , "No delegate session available" );
119104this .delegateSession .handleClose (CloseStatus .create (statusCode , reason ));
120105}
121106
122107@ Override
123108public void onWebSocketError (Throwable cause ) {
109+ Assert .state (this .delegateSession != null , "No delegate session available" );
124110this .delegateSession .handleError (cause );
125111}
126112
127113
128- private static final class JettyDataBuffer implements CloseableDataBuffer {
114+ private static final class JettyCallbackDataBuffer implements CloseableDataBuffer {
129115
130116private final DataBuffer delegate ;
131117
132118private final Callback callback ;
133119
134- public JettyDataBuffer (DataBuffer delegate , Callback callback ) {
120+
121+ public JettyCallbackDataBuffer (DataBuffer delegate , Callback callback ) {
135122Assert .notNull (delegate , "'delegate` must not be null" );
136123Assert .notNull (callback , "Callback must not be null" );
137124this .delegate = delegate ;
@@ -266,13 +253,13 @@ public DataBuffer write(ByteBuffer... buffers) {
266253@ Deprecated
267254public DataBuffer slice (int index , int length ) {
268255DataBuffer delegateSlice = this .delegate .slice (index , length );
269- return new JettyDataBuffer (delegateSlice , this .callback );
256+ return new JettyCallbackDataBuffer (delegateSlice , this .callback );
270257}
271258
272259@ Override
273260public DataBuffer split (int index ) {
274261DataBuffer delegateSplit = this .delegate .split (index );
275- return new JettyDataBuffer (delegateSplit , this .callback );
262+ return new JettyCallbackDataBuffer (delegateSplit , this .callback );
276263}
277264
278265@ Override
0 commit comments