Reactive Oracle Client

The Reactive Oracle Client is a client for Oracle with a straightforward API focusing on scalability and low overhead.

Features

  • Event driven

  • Built-in connection pooling

  • Java 8 Date and Time

  • SSL/TLS

  • RxJava API

  • Cursor

  • Row streaming

Not supported yet

  • Prepared queries caching

  • Stored Procedures

Usage

To use the Reactive Oracle Client add the following dependency to the dependencies section of your build descriptor:

  • Maven (in your pom.xml):

<dependency>  <groupId>io.vertx</groupId>  <artifactId>vertx-oracle-client</artifactId>  <version>5.0.4</version> </dependency>
  • Gradle (in your build.gradle file):

dependencies {  compile 'io.vertx:vertx-oracle-client:5.0.4' }

Getting started

Here is the simplest way to connect, query and disconnect

OracleConnectOptions connectOptions = new OracleConnectOptions()  .setPort(1521)  .setHost("the-host")  .setDatabase("the-db")  .setUser("user")  .setPassword("secret");  // Pool options PoolOptions poolOptions = new PoolOptions()  .setMaxSize(5);  // Create the client pool Pool client = OracleBuilder.pool()  .with(poolOptions)  .connectingTo(connectOptions)  .build();  // A simple query client  .query("SELECT * FROM users WHERE id='julien'")  .execute()  .onComplete(ar -> {  if (ar.succeeded()) {  RowSet<Row> result = ar.result();  System.out.println("Got " + result.size() + " rows ");  } else {  System.out.println("Failure: " + ar.cause().getMessage());  }   // Now close the pool  client.close();  });

Connecting to Oracle

Most of the time you will use a pool to connect to Oracle:

OracleConnectOptions connectOptions = new OracleConnectOptions()  .setPort(1521)  .setHost("the-host")  .setDatabase("the-db")  .setUser("user")  .setPassword("secret");  // Pool options PoolOptions poolOptions = new PoolOptions()  .setMaxSize(5);  // Create the pooled client Pool client = OracleBuilder.pool()  .with(poolOptions)  .connectingTo(connectOptions)  .using(vertx)  .build();

The pooled client uses a connection pool and any operation will borrow a connection from the pool to execute the operation and release it to the pool.

If you are running with Vert.x you can pass it your Vertx instance:

OracleConnectOptions connectOptions = new OracleConnectOptions()  .setPort(1521)  .setHost("the-host")  .setDatabase("the-db")  .setUser("user")  .setPassword("secret");  // Pool options PoolOptions poolOptions = new PoolOptions()  .setMaxSize(5); // Create the pooled client Pool client = OracleBuilder.pool()  .with(poolOptions)  .connectingTo(connectOptions)  .using(vertx)  .build();

You need to release the pool when you don’t need it anymore:

pool.close();

When you need to execute several operations on the same connection, you need to use a client connection.

You can easily get one from the pool:

OracleConnectOptions connectOptions = new OracleConnectOptions()  .setPort(1521)  .setHost("the-host")  .setDatabase("the-db")  .setUser("user")  .setPassword("secret");  // Pool options PoolOptions poolOptions = new PoolOptions()  .setMaxSize(5);  // Create the pooled client Pool client = OracleBuilder.pool()  .with(poolOptions)  .connectingTo(connectOptions)  .using(vertx)  .build();  // Get a connection from the pool client.getConnection().compose(conn -> {  System.out.println("Got a connection from the pool");   // All operations execute on the same connection  return conn  .query("SELECT * FROM users WHERE id='julien'")  .execute()  .compose(res -> conn  .query("SELECT * FROM users WHERE id='emad'")  .execute())  .onComplete(ar -> {  // Release the connection to the pool  conn.close();  }); }).onComplete(ar -> {  if (ar.succeeded()) {   System.out.println("Done");  } else {  System.out.println("Something went wrong " + ar.cause().getMessage());  } });

Once you are done with the connection you must close it to release it to the pool, so it can be reused.

Pool sharing

You can share an pool between multiple verticles or instances of the same verticle. Such pool should be created outside a verticle otherwise it will be closed when the verticle that created it is undeployed

You can also create a shared pool in each verticle:

The first time a shared pool is created it will create the resources for the pool. Subsequent calls will reuse this pool and create a lease to this pool. The resources are disposed after all leases have been closed.

By default, a pool reuses the current event-loop when it needs to create a TCP connection. The shared pool will therefore randomly use event-loops of verticles using it.

You can assign a number of event loop a pool will use independently of the context using it

Configuration

There are several alternatives for you to configure the client.

Data Object

A simple way to configure the client is to specify a OracleConnectOptions data object.

OracleConnectOptions connectOptions = new OracleConnectOptions()  .setPort(1521)  .setHost("the-host")  .setDatabase("the-db")  .setUser("user")  .setPassword("secret");  // Pool Options PoolOptions poolOptions = new PoolOptions().setMaxSize(5);  // Create the pool from the data object Pool pool = OracleBuilder.pool()  .with(poolOptions)  .connectingTo(connectOptions)  .using(vertx)  .build();  pool  .getConnection()  .onComplete(ar -> {  // Handling your connection  });

Connection URI

Apart from configuring with a OracleConnectOptions data object, we also provide you with an alternative way to connect when you want to configure with a connection URI.

Using the EZConnect format:

String connectionUri = "oracle:thin:@mydbhost1:5521/mydbservice?connect_timeout=10sec";  // Connect options OracleConnectOptions connectOptions = OracleConnectOptions.fromUri(connectionUri)  .setUser("user")  .setPassword("secret");  // Pool Options PoolOptions poolOptions = new PoolOptions().setMaxSize(5);  // Create the pool from the connection URI Pool pool = OracleBuilder.pool()  .with(poolOptions)  .connectingTo(connectOptions)  .using(vertx)  .build();

Or, using the TNS Alias format:

String connectionUri = "oracle:thin:@prod_db?TNS_ADMIN=/work/tns/";  // Connect options OracleConnectOptions connectOptions = OracleConnectOptions.fromUri(connectionUri)  .setUser("user")  .setPassword("secret");  // Pool Options PoolOptions poolOptions = new PoolOptions().setMaxSize(5);  // Create the pool from the connection URI Pool pool = OracleBuilder.pool()  .with(poolOptions)  .connectingTo(connectOptions)  .using(vertx)  .build();

Connect retries

You can configure the client to retry when a connection fails to be established.

options  .setReconnectAttempts(2)  .setReconnectInterval(1000);

Running queries

When you don’t need a transaction or run single queries, you can run queries directly on the pool; the pool will use one of its connection to run the query and return the result to you.

Here is how to run simple queries:

client  .query("SELECT * FROM users WHERE id='julien'")  .execute()  .onComplete(ar -> {  if (ar.succeeded()) {  RowSet<Row> result = ar.result();  System.out.println("Got " + result.size() + " rows ");  } else {  System.out.println("Failure: " + ar.cause().getMessage());  } });

Prepared queries

You can do the same with prepared queries.

The SQL string can refer to parameters by position, using the database syntax `?`

client  .preparedQuery("SELECT * FROM users WHERE id=?")  .execute(Tuple.of("julien"))  .onComplete(ar -> {  if (ar.succeeded()) {  RowSet<Row> rows = ar.result();  System.out.println("Got " + rows.size() + " rows ");  } else {  System.out.println("Failure: " + ar.cause().getMessage());  } });

Query methods provides an asynchronous RowSet instance that works for SELECT queries

client  .preparedQuery("SELECT first_name, last_name FROM users")  .execute()  .onComplete(ar -> {  if (ar.succeeded()) {  RowSet<Row> rows = ar.result();  for (Row row : rows) {  System.out.println("User " + row.getString(0) + " " + row.getString(1));  }  } else {  System.out.println("Failure: " + ar.cause().getMessage());  } });

or UPDATE/INSERT queries:

client  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")  .execute(Tuple.of("Julien", "Viet"))  .onComplete(ar -> {  if (ar.succeeded()) {  RowSet<Row> rows = ar.result();  System.out.println(rows.rowCount());  } else {  System.out.println("Failure: " + ar.cause().getMessage());  } });

The Row gives you access to your data by index

System.out.println("User " + row.getString(0) + " " + row.getString(1));
Column indexes start at 0, not at 1.

Alternatively, data can be retrieved by name:

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

The client will not do any magic here and the column name is identified with the name in the table regardless of how your SQL text is.

You can access a wide variety of of types

String firstName = row.getString("first_name"); Boolean male = row.getBoolean("male"); Integer age = row.getInteger("age");

You can use cached prepared statements to execute one-shot prepared queries:

connectOptions.setCachePreparedStatements(true); client  .preparedQuery("SELECT * FROM users WHERE id = ?")  .execute(Tuple.of("julien"))  .onComplete(ar -> {  if (ar.succeeded()) {  RowSet<Row> rows = ar.result();  System.out.println("Got " + rows.size() + " rows ");  } else {  System.out.println("Failure: " + ar.cause().getMessage());  }  });

You can create a PreparedStatement and manage the lifecycle by yourself.

sqlConnection  .prepare("SELECT * FROM users WHERE id = ?")  .onComplete(ar -> {  if (ar.succeeded()) {  PreparedStatement preparedStatement = ar.result();  preparedStatement.query()  .execute(Tuple.of("julien"))  .onComplete(ar2 -> {  if (ar2.succeeded()) {  RowSet<Row> rows = ar2.result();  System.out.println("Got " + rows.size() + " rows ");  preparedStatement.close();  } else {  System.out.println("Failure: " + ar2.cause().getMessage());  }  });  } else {  System.out.println("Failure: " + ar.cause().getMessage());  }  });

Batches

You can execute prepared batch

List<Tuple> batch = new ArrayList<>(); batch.add(Tuple.of("julien", "Julien Viet")); batch.add(Tuple.of("emad", "Emad Alblueshi"));  // Execute the prepared batch client  .preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")  .executeBatch(batch)  .onComplete(res -> {  if (res.succeeded()) {   // Process rows  RowSet<Row> rows = res.result();  } else {  System.out.println("Batch failed " + res.cause());  } });

Retrieving generated key values

When executing INSERT queries, you can retrieve the generated key values.

The values are returned as a Row instance. This instance is accessible by invoking SqlResult.property(kind) using the OracleClient.GENERATED_KEYS property kind.

The key values can be retrieved by column name:

String sql = "INSERT INTO EntityWithIdentity (name, position) VALUES (?, ?)";  // Retrieve generated key column value by name OraclePrepareOptions options = new OraclePrepareOptions()  .setAutoGeneratedKeysIndexes(new JsonArray().add("ID"));  client.preparedQuery(sql, options)  .execute(Tuple.of("john", 3))  .onComplete(ar -> {  if (ar.succeeded()) {  RowSet<Row> result = ar.result();   Row generated = result.property(OracleClient.GENERATED_KEYS);  Long id = generated.getLong("ID");  }  });

Or, they can be retrieved by column index:

String sql = "INSERT INTO EntityWithIdentity (name, position) VALUES (?, ?)";  // Retrieve generated key column value by index OraclePrepareOptions options = new OraclePrepareOptions()  .setAutoGeneratedKeysIndexes(new JsonArray().add("1"));  client.preparedQuery(sql, options)  .execute(Tuple.of("john", 3))  .onComplete(ar -> {  if (ar.succeeded()) {  RowSet<Row> result = ar.result();   Row generated = result.property(OracleClient.GENERATED_KEYS);  Long id = generated.getLong("ID");  }  });

Using connections

Getting a connection

When you need to execute sequential queries (without a transaction), you can create a new connection or borrow one from the pool. Remember that between acquiring the connection from the pool and returning it to the pool, you should take care of the connection because it might be closed by the server for some reason such as an idle time out.

pool  .getConnection()  .compose(connection ->  connection  .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")  .executeBatch(Arrays.asList(  Tuple.of("Julien", "Viet"),  Tuple.of("Emad", "Alblueshi")  ))  .compose(res -> connection  // Do something with rows  .query("SELECT COUNT(*) FROM Users")  .execute()  .map(rows -> rows.iterator().next().getInteger(0)))  // Return the connection to the pool  .eventually(() -> connection.close())  ).onSuccess(count -> {  System.out.println("Insert users, now the number of users is " + count); });

Prepared queries can be created:

connection  .prepare("SELECT * FROM users WHERE first_name LIKE ?")  .compose(pq ->  pq.query()  .execute(Tuple.of("Julien"))  .eventually(() -> pq.close())  ).onSuccess(rows -> {  // All rows });

Simplified connection API

When you use a pool, you can call withConnection to pass it a function executed within a connection.

It borrows a connection from the pool and calls the function with this connection.

The function must return a future of an arbitrary result.

After the future completes, the connection is returned to the pool and the overall result is provided.

pool.withConnection(connection ->  connection  .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")  .executeBatch(Arrays.asList(  Tuple.of("Julien", "Viet"),  Tuple.of("Emad", "Alblueshi")  ))  .compose(res -> connection  // Do something with rows  .query("SELECT COUNT(*) FROM Users")  .execute()  .map(rows -> rows.iterator().next().getInteger(0))) ).onSuccess(count -> {  System.out.println("Insert users, now the number of users is " + count); });

Using transactions

Transactions with connections

You can execute transaction using SQL BEGIN/COMMIT/ROLLBACK, if you do so you must use a SqlConnection and manage it yourself.

Or you can use the transaction API of SqlConnection:

pool.getConnection()  // Transaction must use a connection  .onSuccess(conn -> {  // Begin the transaction  conn.begin()  .compose(tx -> conn  // Various statements  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")  .execute()  .compose(res2 -> conn  .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")  .execute())  // Commit the transaction  .compose(res3 -> tx.commit()))  // Return the connection to the pool  .eventually(() -> conn.close())  .onSuccess(v -> System.out.println("Transaction succeeded"))  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));  });

When the database server reports the current transaction is failed (e.g the infamous current transaction is aborted, commands ignored until end of transaction block), the transaction is rollbacked and the completion future is failed with a TransactionRollbackException:

tx.completion()  .onFailure(err -> {  System.out.println("Transaction failed => rolled back");  });

Simplified transaction API

When you use a pool, you can call withTransaction to pass it a function executed within a transaction.

It borrows a connection from the pool, begins the transaction and calls the function with a client executing all operations in the scope of this transaction.

The function must return a future of an arbitrary result:

  • when the future succeeds the client will commit the transaction

  • when the future fails the client will rollback the transaction

After the transaction completes, the connection is returned to the pool and the overall result is provided.

pool.withTransaction(client -> client  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")  .execute()  .flatMap(res -> client  .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")  .execute()  // Map to a message result  .map("Users inserted")))  .onSuccess(v -> System.out.println("Transaction succeeded"))  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

Cursors and streaming

By default, prepared query execution fetches all rows, you can use a Cursor to control the amount of rows you want to read:

connection  .prepare("SELECT * FROM users WHERE age > ?")  .onComplete(ar1 -> {  if (ar1.succeeded()) {  PreparedStatement pq = ar1.result();   // Create a cursor  Cursor cursor = pq.cursor(Tuple.of(18));   // Read 50 rows  cursor  .read(50)  .onComplete(ar2 -> {  if (ar2.succeeded()) {  RowSet<Row> rows = ar2.result();   // Check for more ?  if (cursor.hasMore()) {  // Repeat the process...  } else {  // No more rows - close the cursor  cursor.close();  }  }  });  } });

Cursors shall be closed when they are released prematurely:

cursor  .read(50)  .onComplete(ar2 -> {  if (ar2.succeeded()) {  // Close the cursor  cursor.close();  } });

A stream API is also available for cursors, which can be more convenient, specially with the Rxified version.

connection  .prepare("SELECT * FROM users WHERE age > ?")  .onComplete(ar1 -> {  if (ar1.succeeded()) {  PreparedStatement pq = ar1.result();   // Fetch 50 rows at a time  RowStream<Row> stream = pq.createStream(50, Tuple.of(18));   // Use the stream  stream.exceptionHandler(err -> {  System.out.println("Error: " + err.getMessage());  });  stream.endHandler(v -> {  System.out.println("End of stream");  });  stream.handler(row -> {  System.out.println("User: " + row.getString("last_name"));  });  } });

The stream read the rows by batch of 50 and stream them, when the rows have been passed to the handler, a new batch of 50 is read and so on.

The stream can be resumed or paused, the loaded rows will remain in memory until they are delivered and the cursor will stop iterating.

Data types supported

Currently, the client supports the following Oracle data types:

  • CHAR/VARCHAR2(java.lang.String)

  • NCHAR/NVARCHAR2(java.lang.String)

  • NUMBER(BigDecimal)

  • FLOAT(java.lang.Double)

  • DATE(java.time.LocalDate)

  • TIMESTAMP(java.time.LocalDateTime)

  • RAW(io.vertx.core.buffer.Buffer)

Tuple decoding uses the above types when storing values.

BLOB data type is also supported with one caveat: it must be represented by an instance of Blob when writing or filtering. However, when reading BLOB data, the client returns a Buffer.

client.preparedQuery("INSERT INTO images (name, data) VALUES (?, ?)")  // Use io.vertx.oracleclient.data.Blob when inserting  .execute(Tuple.of("beautiful-sunset.jpg", Blob.copy(imageBuffer)))  .onComplete(ar -> {  // Do something  });  client.preparedQuery("SELECT data FROM images WHERE id = ?")  .execute(Tuple.of(id))  .onComplete(ar -> {  if (ar.succeeded()) {  Row row = ar.result().iterator().next();   // Use io.vertx.core.buffer.Buffer when reading  Buffer data = row.getBuffer("data");  }  });

Tracing queries

The SQL client can trace query execution when Vert.x has tracing enabled.

The client reports the following client spans:

  • Query operation name

  • tags

  • db.system: the database management system product

  • db.user: the database username

  • db.instance: the database instance

  • db.statement: the SQL query

  • db.type: sql

The default tracing policy is PROPAGATE, the client will only create a span when involved in an active trace.

You can change the client policy with setTracingPolicy, e.g you can set ALWAYS to always report a span:

options.setTracingPolicy(TracingPolicy.ALWAYS);

Collector queries

You can use Java collectors with the query API:

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(  row -> row.getLong("id"),  row -> row.getString("last_name"));  // Run the query with the collector client.query("SELECT * FROM users")  .collecting(collector)  .execute()  .onComplete(ar -> {  if (ar.succeeded()) {  SqlResult<Map<Long, String>> result = ar.result();   // Get the map created by the collector  Map<Long, String> map = result.value();  System.out.println("Got " + map);  } else {  System.out.println("Failure: " + ar.cause().getMessage());  }  });

The collector processing must not keep a reference on the Row as there is a single row used for processing the entire set.

The Java Collectors provides many interesting predefined collectors, for example you can create easily create a string directly from the row set:

Collector<Row, ?, String> collector = Collectors.mapping(  row -> row.getString("last_name"),  Collectors.joining(",", "(", ")") );  // Run the query with the collector client.query("SELECT * FROM users")  .collecting(collector)  .execute()  .onComplete(ar -> {  if (ar.succeeded()) {  SqlResult<String> result = ar.result();   // Get the string created by the collector  String list = result.value();  System.out.println("Got " + list);  } else {  System.out.println("Failure: " + ar.cause().getMessage());  }  });

Using SSL/TLS

To enable encryption in client options, use the setSsl method. By default, ssl is set to false.

oracleConnectOptions.setSsl(true);

Encryption can be customized using properties. For example, to set a trust store:

oracleConnectOptions  .setSsl(true)  .addProperty("javax.net.ssl.trustStore", pathToTrustStore)  .addProperty("javax.net.ssl.trustStoreType", "JKS")  .addProperty("javax.net.ssl.trustStorePassword", trustStorePassword);