Copyright © 2018, Oracle and/or its affiliates. All rights reserved. | ADBA – Asynchronous Database Access A new asynchronous API for connecting to a database Douglas Surber Kuassi Mensah JDBC Architect Director, Product Management Database Server Technologies November, 2018
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Safe Harbor Statement The following is intended to outline our general product direction. It is intended for information purposes only, and may not be incorporated into any contract. It is not a commitment to deliver any material, code, or functionality, and should not be relied upon in making purchasing decisions. The development, release, and timing of any features or functionality described for Oracle’s products remains at the sole discretion of Oracle. 3
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Program Agenda Introduction Code Wrap-up Q&A 1 2 3 4 4
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | ADBA – Asynchronous Database Access • What: Java standard database access API that never blocks user threads • Who: Developed by the JDBC Community, JDBC Expert Group and Oracle • When: Targeted for a near future release, Java 12 perhaps • Why: async apps have better throughput – Fewer threads means less thread scheduling, less thread contention – Database access is slow so blocked threads leave resources idle for a long time 5
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Goals • No user thread blocks ever – Minimize the number of threads used for database access • Alternate API for database access – Not an extension to the current JDBC standard – Not a replacement for the current JDBC standard • Target high throughput apps – Not a completely general purpose database access API – At least version 1 will have a limited feature set • Build on the Java SE class library • Support 3rd party Reactive Stream Database Access APIs 6
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Design Choices • No reference to java.sql • Rigorous use of types • Builder pattern • Fluent API • Immutable after initialization • One way to do something is enough • No SQL processing by the driver • Avoid callback hell 7
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | What About …? There are already multiple async Java and JavaScript APIs • Streams – Java streams are inherently synchronous • ReactiveStreams – ADBA uses java.util.concurrent.Flow where appropriate – R2DBC (see next slide) • NodeJS 8
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | R2DBC • Pure Reactive Streams Database Access API – Adheres to the Reactive Manifesto – Non-blocking Backpressure (push-pull) – Dependency on project Reactor library – https://github.com/r2dbc • R2DBC over ADBA “…allows usage of ADBA drivers by using the R2DBC SPI” – https://github.com/r2dbc/r2dbc-over-adba – Reminder: one of the goals of ADBA is to support Reactive Streams Database Access APIs like R2DBC Confidential – Oracle Internal/Restricted/Highly Restricted 9 Reactive Relational Database Connectivity
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Program Agenda Introduction Code Wrap-up Q&A 1 2 3 4 10
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Trivial Insert // RowCountOperation public void insertItem(Session session, Item item) { session.rowCountOperation("insert into tab values (:id, :name, :answer)") .set("id", item.id(), AdbaType.NUMERIC) .set("name", item.name(), AdbaType.VARCHAR) .set("answer", item.answer(), AdbaType.NUMERIC) .submit(); } Confidential – Oracle Internal/Restricted/Highly Restricted 11
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | CompletionStage java.util.concurrent.CompletionStage • Java representation of a computational monad • Mechanism for asynchronous programming – Event based: push model -> higher scalability – Allows composing asynchronous tasks – Supports lambda expressions and fluent programming • More @ http://bit.ly/2nnLqa0 12
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Execution Model • Everything is an Operation • An Operation consists of – SQL or other database operation – Parameter assignments – Result handling – Submission and CompletionStage • User thread creates and submits Operations – Creating and submitting never blocks; user thread never blocks • Implementation executes those Operations asynchronously – Performs round trip(s) to the database – Executes result handling – Completes CompletionStage 13
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | getDataSource // DataSourceFactory public DataSource getDataSource() { return DataSourceFactory.newFactory("oracle.database.adba") .builder() .url("//host.oracle.com:5521/example") .username("scott") .password("tiger") .build(); } Confidential – Oracle Internal/Restricted/Highly Restricted 14
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Getting a Session Default method in DataSource: public default Session getSession() { return builder().build().attach(); } Default method in Session: public default Session attach() { this.submit(); this.attachOperation() .submit(); return this; } 15
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Trivial Select // RowOperation public void idsForAnswer(DataSource ds, List<Integer> result, int correctAnswer) { try (Session session = ds.getSession()) { session.<List<Integer>>rowOperation("select id, name, answer from tab where answer = :target") .set("target", correctAnswer, AdbaType.NUMERIC) .collect(() -> result, (list, row) -> list.add(row.at("id").get(Integer.class))) .submit(); } } Confidential – Oracle Internal/Restricted/Highly Restricted 16
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Close @Override public default void close() { this.closeOperation() .submit(); // submitting a close Operation must satisfy the requirements of // OperationGroup.close() } Note: A CloseOperation is never skipped. 17
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | SQL Support • All SQL is vendor specific – No escape sequences – No specified parameter markers • Non vendor specific syntax requires processing by the driver – Adds overhead – Increases code complexity – Minimal benefit as most apps are tied to a specific database regardless Note: Code examples use parameter markers from a variety of databases including DB2 (:foo), MySQL (?), Oracle Database(:foo), PostgresSQL($1), SQL Server (@foo), and JDBC (?). 18
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | SELECT // RowOperation public CompletionStage<List<Item>> itemsForAnswer(DataSource ds, int answer) { try (Session session = ds.getSession()) { return session.<List<Item>>rowOperation("select id, name, answer from tab where answer = :target") .set("target", 42, AdbaType.NUMERIC) .collect(Collectors.mapping( row -> new Item(row.at("id").get(Integer.class), row.at("name").get(String.class), row.at("answer").get(Integer.class)), Collectors.toList())) .submit() .getCompletionStage(); } } Confidential – Oracle Internal/Restricted/Highly Restricted 19
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Transaction // TransactionCompletion public void transaction(DataSource ds) { try (Session session = ds.getSession(t -> System.out.println("ERROR: " + t.toString()))){ TransactionCompletion trans = session.transactionCompletion(); CompletionStage<Integer> idPromise = session.<Integer>rowOperation("select empno, ename from emp where ename = :1 for update") .set("1", "CLARK", AdbaType.VARCHAR) .collect(Collectors.collectingAndThen( Collectors.mapping(r -> r.at("empno").get(Integer.class), Collectors.toList()), l -> l.get(0))) .onError(t -> trans.setRollbackOnly()) .submit() .getCompletionStage(); Confidential – Oracle Internal/Restricted/Highly Restricted 20 continued
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Transaction (continued) session.<Long>rowCountOperation("update emp set deptno = :1 where empno = :2") .set("1", 50, AdbaType.INTEGER) .set("2", idPromise, AdbaType.INTEGER) .apply(c -> { if (c.getCount() != 1L) { trans.setRollbackOnly(); throw new RuntimeException("updated wrong number of rows"); } return c.getCount(); }) .onError(t -> trans.setRollbackOnly()) .submit(); // .getCompletionStage() // .exceptionally( t -> { trans.setRollbackOnly(); return null;}) // incorrect session.catchErrors(); session.commitMaybeRollback(trans); } } Confidential – Oracle Internal/Restricted/Highly Restricted 21
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | OperationGroup Operations can be grouped • OperationGroup has its own result handling and CompletionStage • Members submitted to group.OperationGroup submitted as a unit • Order of execution – Sequential in order submitted – Parallel, any order • Conditional or unconditional • Error response – Dependent: remaining group members skipped – Independent: remaining group members unaffected • Session is an OperationGroup – Sequential, dependent, unconditional by default 22
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Parallel UPDATE // Parallel, Independent OperationGroup public void updateListParallel(List<Item> list, DataSource ds) { String query = "select id from tab where answer = :answer"; String update = "update tab set name = :name where id = :id"; try (Session session = ds.getSession(); OperationGroup<Object, Object> group = session.operationGroup() .independent() .parallel()) { group.submit(); for (Item elem : list) { CompletionStage<Integer> idPromise = group.<List<Integer>>rowOperation(query) .set("answer", elem.answer, AdbaType.NUMERIC) .collect(Collector.of( () -> new ArrayList<>(), (l, row) -> l.add(row.at("id").get(Integer.class)), (l, r) -> l)) .submit() .getCompletionStage() .thenApply(l -> l.get(0)); Confidential – Oracle Internal/Restricted/Highly Restricted 23 continued
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Parallel UPDATE (continued) group.rowCountOperation(update) .set("id", idPromise) .set("name", "the ultimate question") .submit() .getCompletionStage() .exceptionally(t -> { System.out.println(elem.id); return null; }); } } } Confidential – Oracle Internal/Restricted/Highly Restricted 24
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Reactive Streams – Non blocking Backpressure java.util.concurrent.Flow Confidential – Oracle Internal/Restricted/Highly Restricted 25 ADBA User Java code
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Row Publisher // RowPublisherOperation public CompletionStage<List<String>> rowSubscriber(DataSource ds) { String sql = "select empno, ename from emp"; CompletableFuture<List<String>> result = new CompletableFuture<>(); Flow.Subscriber<Result.RowColumn> subscriber = new Flow.Subscriber<>() { Flow.Subscription subscription; List<String> names = new ArrayList<>(); int demand = 0; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; this.subscription.request(10); demand += 10; } Confidential – Oracle Internal/Restricted/Highly Restricted 26 continued
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Row Publisher(continued) @Override public void onNext(Result.RowColumn column) { names.add(column.at("ename").get(String.class)); if (--demand < 1) { subscription.request(10); demand += 10; } } @Override public void onError(Throwable throwable) { result.completeExceptionally(throwable); } @Override public void onComplete() { result.complete(names); } }; Confidential – Oracle Internal/Restricted/Highly Restricted 27 // Subscriber continued
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Row Publisher(continued) try (Session session = ds.getSession()) { return session.<List<String>>rowPublisherOperation(sql) .subscribe(subscriber, result) .submit(); .getCompletionStage(); } } // fetchEmployees 28
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Operation Submission Rate public class RecordSubscriber implements Subscriber<byte[]> { private final Session session; private OperationGroup<Long, Long> group; public RecordSubscriber(DataSource ds) { session = ds.getSession(); } @Override public void onSubscribe(Subscription subscription) { group = session.<Long, Long>operationGroup() .independent() .collect(Collectors.summingLong(c -> c)); group.submit(); session.requestHook(subscription::request); } continued 29
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Operation Submission Rate (continued) @Override public void onNext(byte[] item) { String insert = "insert into tab values (@record)"; group.<Long>rowCountOperation(insert) .set("record", item, AdbaType.VARBINARY) .apply(c -> c.getCount()) .submit(); } @Override public void onError(Throwable t) { group.close(); session.close(); } @Override public void onComplete() { group.close(); session.close(); } } 30
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Controlling Session Creation Rate public class ItemSubscriber implements Subscriber<Item> { private final DataSourceFactory factory; private DataSource ds; public ItemSubscriber(DataSourceFactory f) { factory = f; } @Override public void onSubscribe(Subscription subscription) { ds = factory.builder() .url("//host.oracle.com:5521/example") .username("scott") .password("tiger") .requestHook(subscription::request) .build(); } continued 31
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Controlling Session Creation Rate (continued) @Override public void onNext(Item item) { try (Session s = ds.getSession()) { insertItem(s, item); } } @Override public void onError(Throwable t) { ds.close(); } @Override public void onComplete() { ds.close(); } } 32
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Program Agenda Introduction Code Wrap-up Q&A 1 2 3 4 33
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | AoJ – ADBA over JDBC • Proof-of-concept implementation that uses any standard JDBC driver • Very limited functionality • Source released under the Apache license • Oracle is updating AoJ to track changes in ADBA 34
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Status • Everything is subject to change • Developed through the Java Community Process • You can participate via jdbc-spec-discuss@openjdk.java.net • The API is available for download from OpenJDK at http://hg.openjdk.java.net/jdk/sandbox/file/JDK-8188051- branch/src/jdk.incubator.adba/share/classes • AoJ source available at https://github.com/oracle/oracle-db- examples/tree/master/java/AoJ • Targeted for a near future release, Java 12 perhaps 35
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Program Agenda Introduction Code Wrap-up Q&A 1 2 3 4 36
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | Q & A 37
Copyright © 2017, Oracle and/or its affiliates. All rights reserved. | 38
Reactive Java Programming: A new Asynchronous Database Access API by Kuassi Mensah

Reactive Java Programming: A new Asynchronous Database Access API by Kuassi Mensah

  • 2.
    Copyright © 2018,Oracle and/or its affiliates. All rights reserved. | ADBA – Asynchronous Database Access A new asynchronous API for connecting to a database Douglas Surber Kuassi Mensah JDBC Architect Director, Product Management Database Server Technologies November, 2018
  • 3.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Safe Harbor Statement The following is intended to outline our general product direction. It is intended for information purposes only, and may not be incorporated into any contract. It is not a commitment to deliver any material, code, or functionality, and should not be relied upon in making purchasing decisions. The development, release, and timing of any features or functionality described for Oracle’s products remains at the sole discretion of Oracle. 3
  • 4.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Program Agenda Introduction Code Wrap-up Q&A 1 2 3 4 4
  • 5.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | ADBA – Asynchronous Database Access • What: Java standard database access API that never blocks user threads • Who: Developed by the JDBC Community, JDBC Expert Group and Oracle • When: Targeted for a near future release, Java 12 perhaps • Why: async apps have better throughput – Fewer threads means less thread scheduling, less thread contention – Database access is slow so blocked threads leave resources idle for a long time 5
  • 6.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Goals • No user thread blocks ever – Minimize the number of threads used for database access • Alternate API for database access – Not an extension to the current JDBC standard – Not a replacement for the current JDBC standard • Target high throughput apps – Not a completely general purpose database access API – At least version 1 will have a limited feature set • Build on the Java SE class library • Support 3rd party Reactive Stream Database Access APIs 6
  • 7.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Design Choices • No reference to java.sql • Rigorous use of types • Builder pattern • Fluent API • Immutable after initialization • One way to do something is enough • No SQL processing by the driver • Avoid callback hell 7
  • 8.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | What About …? There are already multiple async Java and JavaScript APIs • Streams – Java streams are inherently synchronous • ReactiveStreams – ADBA uses java.util.concurrent.Flow where appropriate – R2DBC (see next slide) • NodeJS 8
  • 9.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | R2DBC • Pure Reactive Streams Database Access API – Adheres to the Reactive Manifesto – Non-blocking Backpressure (push-pull) – Dependency on project Reactor library – https://github.com/r2dbc • R2DBC over ADBA “…allows usage of ADBA drivers by using the R2DBC SPI” – https://github.com/r2dbc/r2dbc-over-adba – Reminder: one of the goals of ADBA is to support Reactive Streams Database Access APIs like R2DBC Confidential – Oracle Internal/Restricted/Highly Restricted 9 Reactive Relational Database Connectivity
  • 10.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Program Agenda Introduction Code Wrap-up Q&A 1 2 3 4 10
  • 11.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Trivial Insert // RowCountOperation public void insertItem(Session session, Item item) { session.rowCountOperation("insert into tab values (:id, :name, :answer)") .set("id", item.id(), AdbaType.NUMERIC) .set("name", item.name(), AdbaType.VARCHAR) .set("answer", item.answer(), AdbaType.NUMERIC) .submit(); } Confidential – Oracle Internal/Restricted/Highly Restricted 11
  • 12.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | CompletionStage java.util.concurrent.CompletionStage • Java representation of a computational monad • Mechanism for asynchronous programming – Event based: push model -> higher scalability – Allows composing asynchronous tasks – Supports lambda expressions and fluent programming • More @ http://bit.ly/2nnLqa0 12
  • 13.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Execution Model • Everything is an Operation • An Operation consists of – SQL or other database operation – Parameter assignments – Result handling – Submission and CompletionStage • User thread creates and submits Operations – Creating and submitting never blocks; user thread never blocks • Implementation executes those Operations asynchronously – Performs round trip(s) to the database – Executes result handling – Completes CompletionStage 13
  • 14.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | getDataSource // DataSourceFactory public DataSource getDataSource() { return DataSourceFactory.newFactory("oracle.database.adba") .builder() .url("//host.oracle.com:5521/example") .username("scott") .password("tiger") .build(); } Confidential – Oracle Internal/Restricted/Highly Restricted 14
  • 15.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Getting a Session Default method in DataSource: public default Session getSession() { return builder().build().attach(); } Default method in Session: public default Session attach() { this.submit(); this.attachOperation() .submit(); return this; } 15
  • 16.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Trivial Select // RowOperation public void idsForAnswer(DataSource ds, List<Integer> result, int correctAnswer) { try (Session session = ds.getSession()) { session.<List<Integer>>rowOperation("select id, name, answer from tab where answer = :target") .set("target", correctAnswer, AdbaType.NUMERIC) .collect(() -> result, (list, row) -> list.add(row.at("id").get(Integer.class))) .submit(); } } Confidential – Oracle Internal/Restricted/Highly Restricted 16
  • 17.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Close @Override public default void close() { this.closeOperation() .submit(); // submitting a close Operation must satisfy the requirements of // OperationGroup.close() } Note: A CloseOperation is never skipped. 17
  • 18.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | SQL Support • All SQL is vendor specific – No escape sequences – No specified parameter markers • Non vendor specific syntax requires processing by the driver – Adds overhead – Increases code complexity – Minimal benefit as most apps are tied to a specific database regardless Note: Code examples use parameter markers from a variety of databases including DB2 (:foo), MySQL (?), Oracle Database(:foo), PostgresSQL($1), SQL Server (@foo), and JDBC (?). 18
  • 19.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | SELECT // RowOperation public CompletionStage<List<Item>> itemsForAnswer(DataSource ds, int answer) { try (Session session = ds.getSession()) { return session.<List<Item>>rowOperation("select id, name, answer from tab where answer = :target") .set("target", 42, AdbaType.NUMERIC) .collect(Collectors.mapping( row -> new Item(row.at("id").get(Integer.class), row.at("name").get(String.class), row.at("answer").get(Integer.class)), Collectors.toList())) .submit() .getCompletionStage(); } } Confidential – Oracle Internal/Restricted/Highly Restricted 19
  • 20.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Transaction // TransactionCompletion public void transaction(DataSource ds) { try (Session session = ds.getSession(t -> System.out.println("ERROR: " + t.toString()))){ TransactionCompletion trans = session.transactionCompletion(); CompletionStage<Integer> idPromise = session.<Integer>rowOperation("select empno, ename from emp where ename = :1 for update") .set("1", "CLARK", AdbaType.VARCHAR) .collect(Collectors.collectingAndThen( Collectors.mapping(r -> r.at("empno").get(Integer.class), Collectors.toList()), l -> l.get(0))) .onError(t -> trans.setRollbackOnly()) .submit() .getCompletionStage(); Confidential – Oracle Internal/Restricted/Highly Restricted 20 continued
  • 21.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Transaction (continued) session.<Long>rowCountOperation("update emp set deptno = :1 where empno = :2") .set("1", 50, AdbaType.INTEGER) .set("2", idPromise, AdbaType.INTEGER) .apply(c -> { if (c.getCount() != 1L) { trans.setRollbackOnly(); throw new RuntimeException("updated wrong number of rows"); } return c.getCount(); }) .onError(t -> trans.setRollbackOnly()) .submit(); // .getCompletionStage() // .exceptionally( t -> { trans.setRollbackOnly(); return null;}) // incorrect session.catchErrors(); session.commitMaybeRollback(trans); } } Confidential – Oracle Internal/Restricted/Highly Restricted 21
  • 22.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | OperationGroup Operations can be grouped • OperationGroup has its own result handling and CompletionStage • Members submitted to group.OperationGroup submitted as a unit • Order of execution – Sequential in order submitted – Parallel, any order • Conditional or unconditional • Error response – Dependent: remaining group members skipped – Independent: remaining group members unaffected • Session is an OperationGroup – Sequential, dependent, unconditional by default 22
  • 23.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Parallel UPDATE // Parallel, Independent OperationGroup public void updateListParallel(List<Item> list, DataSource ds) { String query = "select id from tab where answer = :answer"; String update = "update tab set name = :name where id = :id"; try (Session session = ds.getSession(); OperationGroup<Object, Object> group = session.operationGroup() .independent() .parallel()) { group.submit(); for (Item elem : list) { CompletionStage<Integer> idPromise = group.<List<Integer>>rowOperation(query) .set("answer", elem.answer, AdbaType.NUMERIC) .collect(Collector.of( () -> new ArrayList<>(), (l, row) -> l.add(row.at("id").get(Integer.class)), (l, r) -> l)) .submit() .getCompletionStage() .thenApply(l -> l.get(0)); Confidential – Oracle Internal/Restricted/Highly Restricted 23 continued
  • 24.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Parallel UPDATE (continued) group.rowCountOperation(update) .set("id", idPromise) .set("name", "the ultimate question") .submit() .getCompletionStage() .exceptionally(t -> { System.out.println(elem.id); return null; }); } } } Confidential – Oracle Internal/Restricted/Highly Restricted 24
  • 25.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Reactive Streams – Non blocking Backpressure java.util.concurrent.Flow Confidential – Oracle Internal/Restricted/Highly Restricted 25 ADBA User Java code
  • 26.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Row Publisher // RowPublisherOperation public CompletionStage<List<String>> rowSubscriber(DataSource ds) { String sql = "select empno, ename from emp"; CompletableFuture<List<String>> result = new CompletableFuture<>(); Flow.Subscriber<Result.RowColumn> subscriber = new Flow.Subscriber<>() { Flow.Subscription subscription; List<String> names = new ArrayList<>(); int demand = 0; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; this.subscription.request(10); demand += 10; } Confidential – Oracle Internal/Restricted/Highly Restricted 26 continued
  • 27.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Row Publisher(continued) @Override public void onNext(Result.RowColumn column) { names.add(column.at("ename").get(String.class)); if (--demand < 1) { subscription.request(10); demand += 10; } } @Override public void onError(Throwable throwable) { result.completeExceptionally(throwable); } @Override public void onComplete() { result.complete(names); } }; Confidential – Oracle Internal/Restricted/Highly Restricted 27 // Subscriber continued
  • 28.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Row Publisher(continued) try (Session session = ds.getSession()) { return session.<List<String>>rowPublisherOperation(sql) .subscribe(subscriber, result) .submit(); .getCompletionStage(); } } // fetchEmployees 28
  • 29.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Operation Submission Rate public class RecordSubscriber implements Subscriber<byte[]> { private final Session session; private OperationGroup<Long, Long> group; public RecordSubscriber(DataSource ds) { session = ds.getSession(); } @Override public void onSubscribe(Subscription subscription) { group = session.<Long, Long>operationGroup() .independent() .collect(Collectors.summingLong(c -> c)); group.submit(); session.requestHook(subscription::request); } continued 29
  • 30.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Operation Submission Rate (continued) @Override public void onNext(byte[] item) { String insert = "insert into tab values (@record)"; group.<Long>rowCountOperation(insert) .set("record", item, AdbaType.VARBINARY) .apply(c -> c.getCount()) .submit(); } @Override public void onError(Throwable t) { group.close(); session.close(); } @Override public void onComplete() { group.close(); session.close(); } } 30
  • 31.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Controlling Session Creation Rate public class ItemSubscriber implements Subscriber<Item> { private final DataSourceFactory factory; private DataSource ds; public ItemSubscriber(DataSourceFactory f) { factory = f; } @Override public void onSubscribe(Subscription subscription) { ds = factory.builder() .url("//host.oracle.com:5521/example") .username("scott") .password("tiger") .requestHook(subscription::request) .build(); } continued 31
  • 32.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Controlling Session Creation Rate (continued) @Override public void onNext(Item item) { try (Session s = ds.getSession()) { insertItem(s, item); } } @Override public void onError(Throwable t) { ds.close(); } @Override public void onComplete() { ds.close(); } } 32
  • 33.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Program Agenda Introduction Code Wrap-up Q&A 1 2 3 4 33
  • 34.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | AoJ – ADBA over JDBC • Proof-of-concept implementation that uses any standard JDBC driver • Very limited functionality • Source released under the Apache license • Oracle is updating AoJ to track changes in ADBA 34
  • 35.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Status • Everything is subject to change • Developed through the Java Community Process • You can participate via jdbc-spec-discuss@openjdk.java.net • The API is available for download from OpenJDK at http://hg.openjdk.java.net/jdk/sandbox/file/JDK-8188051- branch/src/jdk.incubator.adba/share/classes • AoJ source available at https://github.com/oracle/oracle-db- examples/tree/master/java/AoJ • Targeted for a near future release, Java 12 perhaps 35
  • 36.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Program Agenda Introduction Code Wrap-up Q&A 1 2 3 4 36
  • 37.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | Q & A 37
  • 38.
    Copyright © 2017,Oracle and/or its affiliates. All rights reserved. | 38