14
14
15
15
package traindb .engine ;
16
16
17
+ import com .google .common .base .Joiner ;
17
18
import java .io .IOException ;
18
19
import java .nio .channels .SocketChannel ;
20
+ import java .nio .charset .StandardCharsets ;
21
+ import java .sql .ResultSet ;
22
+ import java .sql .ResultSetMetaData ;
23
+ import java .sql .SQLException ;
24
+ import java .sql .Types ;
25
+ import java .util .ArrayList ;
26
+ import java .util .List ;
27
+ import java .util .Properties ;
19
28
import java .util .Random ;
29
+ import org .apache .calcite .avatica .ConnectStringParser ;
30
+ import org .apache .calcite .config .CalciteConnectionConfig ;
31
+ import org .apache .calcite .sql .parser .SqlParser ;
32
+ import org .apache .calcite .sql .parser .SqlParserImplFactory ;
20
33
import org .apache .commons .lang3 .exception .ExceptionUtils ;
21
- import org .apache . commons . logging . Log ;
22
- import org .apache . commons . logging . LogFactory ;
34
+ import org .json . simple . JSONObject ;
35
+ import org .json . simple . parser . JSONParser ;
23
36
import traindb .catalog .CatalogContext ;
37
+ import traindb .common .TrainDBException ;
38
+ import traindb .common .TrainDBLogger ;
39
+ import traindb .engine .nio .ByteBuffers ;
40
+ import traindb .engine .nio .Message ;
41
+ import traindb .engine .nio .MessageStream ;
42
+ import traindb .jdbc .Driver ;
43
+ import traindb .jdbc .TrainDBConnectionImpl ;
44
+ import traindb .jdbc .TrainDBJdbc41Factory ;
45
+ import traindb .jdbc .TrainDBStatement ;
24
46
import traindb .schema .SchemaManager ;
47
+ import traindb .sql .TrainDBSql ;
48
+ import traindb .sql .TrainDBSqlCommand ;
49
+ import traindb .sql .calcite .TrainDBSqlCalciteParserImpl ;
25
50
26
51
public final class Session implements Runnable {
27
- private static final Log LOG = LogFactory . getLog (Session .class );
52
+ private static TrainDBLogger LOG = TrainDBLogger . getLogger (Session .class );
28
53
private static final ThreadLocal <Session > LOCAL_SESSION = new ThreadLocal <>();
29
54
private final CancelContext cancelContext ;
30
55
private final int sessionId ;
@@ -35,12 +60,15 @@ public final class Session implements Runnable {
35
60
private final CatalogContext catalogContext ;
36
61
private final SchemaManager schemaManager ;
37
62
63
+ final MessageStream messageStream ;
64
+
38
65
Session (SocketChannel clientChannel , EventHandler eventHandler ,
39
66
CatalogContext catalogContext , SchemaManager schemaManager ) {
40
67
sessionId = new Random (this .hashCode ()).nextInt ();
41
68
cancelContext = new CancelContext (this );
42
69
this .clientChannel = clientChannel ;
43
70
this .eventHandler = eventHandler ;
71
+ this .messageStream = new MessageStream (clientChannel );
44
72
45
73
this .catalogContext = catalogContext ;
46
74
this .schemaManager = schemaManager ;
@@ -71,23 +99,270 @@ public void run() {
71
99
try {
72
100
messageLoop ();
73
101
} catch (Exception e ) {
74
- LOG .fatal (ExceptionUtils .getStackTrace (e ));
102
+ LOG .error (ExceptionUtils .getStackTrace (e ));
75
103
}
76
104
LOCAL_SESSION .remove ();
77
105
78
106
close ();
79
107
}
80
108
109
+ public void sendError (Exception e ) throws IOException {
110
+ Message .Builder builder = Message .builder ('E' )
111
+ .putChar ('S' ).putCString ("ERROR" )
112
+ .putChar ('C' ).putCString ("" )
113
+ .putChar ('M' ).putCString (e .getMessage ())
114
+ .putChar ('\0' );
115
+ messageStream .putMessageAndFlush (builder .build ());
116
+ }
117
+
81
118
private void messageLoop () throws Exception {
119
+ SessionHandler sessHandler = new SessionHandler ();
120
+
82
121
while (true ) {
83
122
try {
123
+ Message msg = messageStream .getMessage ();
124
+ char type = msg .getType ();
125
+ LOG .debug ("received data type=" + msg .getType ());
126
+
84
127
// TODO: handle messages
128
+ switch (type ) {
129
+ case 'S' :
130
+ JSONParser jsonParser = new JSONParser ();
131
+ JSONObject jsonMsg = (JSONObject ) jsonParser .parse (msg .getBodyString ());
132
+ Properties info = new Properties ();
133
+ info .put ("user" , jsonMsg .get ("user" ).toString ());
134
+ info .put ("password" , jsonMsg .get ("password" ).toString ());
135
+ sessHandler .setConnection (makeConnection (jsonMsg .get ("url" ).toString (), info ));
136
+ break ;
137
+ case 'E' :
138
+ sessHandler .handleQuery (msg .getBodyString ());
139
+ break ;
140
+ default :
141
+ throw new TrainDBException ("invalid message type '" + type + "'" );
142
+ }
85
143
} catch (Exception e ) {
86
144
87
145
}
88
146
}
89
147
}
90
148
149
+ private TrainDBConnectionImpl makeConnection (String url , Properties info ) throws SQLException {
150
+ try {
151
+ String newUrl = url ;
152
+ int urlPrefixLength = 0 ;
153
+ String [] tokens = url .split (":" );
154
+ if (tokens .length >= 2 && (tokens [1 ].equalsIgnoreCase ("traindb" ))) {
155
+ List <String > newTokens = new ArrayList <>();
156
+ for (int i = 0 ; i < tokens .length ; ++i ) {
157
+ if (i != 1 ) {
158
+ newTokens .add (tokens [i ]);
159
+ }
160
+ }
161
+ newUrl = Joiner .on (":" ).join (newTokens );
162
+ String [] suffixTokens = url .split ("\\ ?" );
163
+ urlPrefixLength = suffixTokens [0 ].length ();
164
+ }
165
+ String urlSuffix = url .substring (urlPrefixLength );
166
+ Properties info2 = ConnectStringParser .parse (urlSuffix , info );
167
+
168
+ TrainDBJdbc41Factory factory = new TrainDBJdbc41Factory ();
169
+ return factory .newConnection (new Driver (), factory , newUrl , info2 , null , null , schemaManager );
170
+ } catch (Exception e ) {
171
+ throw new SQLException (e .getMessage ());
172
+ }
173
+ }
174
+
175
+
176
+ class SessionHandler {
177
+ private TrainDBConnectionImpl conn ;
178
+ private TrainDBStatement stmt ;
179
+ private TrainDBQueryEngine queryEngine ;
180
+ private SqlParser .Config parserConfig ;
181
+
182
+ SessionHandler () {
183
+ this .conn = null ;
184
+ this .stmt = null ;
185
+ this .queryEngine = null ;
186
+ }
187
+
188
+ private void checkConnection () throws TrainDBException {
189
+ if (conn == null ) {
190
+ throw new TrainDBException ("no datasource connection" );
191
+ }
192
+ }
193
+
194
+ public void setConnection (TrainDBConnectionImpl newConn ) {
195
+ try {
196
+ if (conn != null ) {
197
+ if (stmt != null ) {
198
+ stmt .close ();
199
+ }
200
+ conn .close ();
201
+ }
202
+ } catch (SQLException e ) {
203
+ LOG .debug ("old connection close error" );
204
+ }
205
+ conn = newConn ;
206
+ queryEngine = new TrainDBQueryEngine (newConn );
207
+
208
+ final CalciteConnectionConfig config = conn .config ();
209
+ parserConfig = SqlParser .config ()
210
+ .withQuotedCasing (config .quotedCasing ())
211
+ .withUnquotedCasing (config .unquotedCasing ())
212
+ .withQuoting (config .quoting ())
213
+ .withConformance (config .conformance ())
214
+ .withCaseSensitive (config .caseSensitive ());
215
+ final SqlParserImplFactory parserFactory =
216
+ config .parserFactory (SqlParserImplFactory .class , TrainDBSqlCalciteParserImpl .FACTORY );
217
+ if (parserFactory != null ) {
218
+ parserConfig = parserConfig .withParserFactory (parserFactory );
219
+ }
220
+ }
221
+
222
+ public void handleQuery (String sqlQuery ) throws TrainDBException , IOException {
223
+ checkConnection ();
224
+ LOG .debug ("handleQuery: " + sqlQuery );
225
+
226
+ try {
227
+ if (stmt == null ) {
228
+ stmt = conn .createStatement (
229
+ ResultSet .TYPE_FORWARD_ONLY , ResultSet .CONCUR_READ_ONLY , conn .getHoldability ());
230
+ }
231
+
232
+ // First check input query with TrainDB sql grammar
233
+ List <TrainDBSqlCommand > commands = null ;
234
+ try {
235
+ commands = TrainDBSql .parse (sqlQuery , parserConfig );
236
+ } catch (Exception e ) {
237
+ if (commands != null ) {
238
+ commands .clear ();
239
+ }
240
+ }
241
+
242
+ if (commands != null && commands .size () > 0
243
+ && isTrainDBStmtWithResultSet (commands .get (0 ).getType ())) { // TrainDB DDL
244
+ stmt .execute (sqlQuery );
245
+ sendCommandComplete (commands .get (0 ).getType ().toString ());
246
+ } else {
247
+ // stmt.setFetchSize(0);
248
+ ResultSet rs = stmt .executeQuery (sqlQuery );
249
+ sendRowDesc (rs .getMetaData ());
250
+ sendDataRow (rs );
251
+ sendCommandComplete ("SELECT" ); // FIXME
252
+ }
253
+ } catch (IOException ioe ) {
254
+ sendError (ioe );
255
+ } catch (SQLException se ) {
256
+ sendError (se );
257
+ }
258
+ }
259
+
260
+ }
261
+
262
+ private boolean isTrainDBStmtWithResultSet (TrainDBSqlCommand .Type type ) {
263
+ return type .toString ().startsWith ("SHOW" )|| type .toString ().startsWith ("DESCRIBE" );
264
+ }
265
+
266
+ private void sendRowDesc (ResultSetMetaData md ) {
267
+ try {
268
+ int columnCount = md .getColumnCount ();
269
+ Message .Builder msgBld = Message .builder ('T' ).putShort ((short ) columnCount );
270
+ for (int i = 1 ; i <= columnCount ; i ++) {
271
+ msgBld .putCString (md .getColumnName (i ));
272
+ int type = md .getColumnType (i );
273
+ msgBld .putInt (type );
274
+ if (type == Types .VARCHAR ) {
275
+ msgBld .putInt (md .getPrecision (i ));
276
+ msgBld .putShort ((short ) 0 ); // Field.TEXT_FORMAT
277
+ } else {
278
+ msgBld .putInt (getTypeSize (type ));
279
+ msgBld .putShort ((short ) 1 ); // Field.BINARY_FORMAT
280
+ }
281
+ }
282
+ messageStream .putMessage (msgBld .build ());
283
+ } catch (SQLException e ) {
284
+ throw new RuntimeException (e );
285
+ } catch (IOException e ) {
286
+ throw new RuntimeException (e );
287
+ }
288
+ }
289
+
290
+ private void sendDataRow (ResultSet rs ) throws IOException {
291
+ try {
292
+ ResultSetMetaData md = rs .getMetaData ();
293
+ int columnCount = md .getColumnCount ();
294
+ while (rs .next ()) {
295
+ Message .Builder msgBld = Message .builder ('D' ).putShort ((short ) columnCount );
296
+ // Column Data
297
+ for (int i = 1 ; i <= columnCount ; i ++) {
298
+ int type = md .getColumnType (i );
299
+ switch (type ) {
300
+ case Types .TINYINT :
301
+ msgBld .putInt (getTypeSize (type )).putByte (rs .getByte (i ));
302
+ break ;
303
+ case Types .SMALLINT :
304
+ msgBld .putInt (getTypeSize (type )).putShort (rs .getShort (i ));
305
+ break ;
306
+ case Types .INTEGER :
307
+ msgBld .putInt (getTypeSize (type )).putInt (rs .getInt (i ));
308
+ break ;
309
+ case Types .BIGINT :
310
+ msgBld .putInt (getTypeSize (type )).putLong (rs .getLong (i ));
311
+ break ;
312
+ case Types .FLOAT :
313
+ msgBld .putInt (getTypeSize (type )).putFloat (rs .getFloat (i ));
314
+ break ;
315
+ case Types .DOUBLE :
316
+ msgBld .putInt (getTypeSize (type )).putDouble (rs .getDouble (i ));
317
+ break ;
318
+ case Types .VARCHAR :
319
+ byte [] bytes = rs .getString (i ).getBytes (StandardCharsets .UTF_8 );
320
+ msgBld .putInt (bytes .length ).putBytes (bytes );
321
+ break ;
322
+ // TODO support more data types
323
+ default :
324
+ throw new TrainDBException ("Not supported data type: " + type );
325
+ }
326
+ }
327
+ messageStream .putMessage (msgBld .build ());
328
+ }
329
+ } catch (SQLException e ) {
330
+ sendError (e );
331
+ } catch (TrainDBException e ) {
332
+ sendError (e );
333
+ }
334
+ }
335
+
336
+ private int getTypeSize (int type ) {
337
+ switch (type ) {
338
+ case Types .TINYINT :
339
+ return ByteBuffers .BYTE_BYTES ;
340
+ case Types .SMALLINT :
341
+ return ByteBuffers .SHORT_BYTES ;
342
+ case Types .INTEGER :
343
+ return ByteBuffers .INTEGER_BYTES ;
344
+ case Types .BIGINT :
345
+ return ByteBuffers .LONG_BYTES ;
346
+ case Types .FLOAT :
347
+ return ByteBuffers .FLOAT_BYTES ;
348
+ case Types .DOUBLE :
349
+ return ByteBuffers .DOUBLE_BYTES ;
350
+ default :
351
+ return -1 ;
352
+ }
353
+ }
354
+
355
+ private void sendCommandComplete (String tag ) throws IOException {
356
+ LOG .debug ("send CommandComplete message" );
357
+ if (tag == null ) {
358
+ tag = "" ;
359
+ }
360
+ Message msg = Message .builder ('C' )
361
+ .putCString (tag )
362
+ .build ();
363
+ messageStream .putMessageAndFlush (msg );
364
+ }
365
+
91
366
void reject () {
92
367
close ();
93
368
}
0 commit comments