1919import java .util .List ;
2020
2121import io .objectbox .query .Query ;
22- import io .objectbox .reactive .DataObserver ;
2322import io .objectbox .reactive .DataSubscription ;
24- import io .reactivex .BackpressureStrategy ;
25- import io .reactivex .Flowable ;
26- import io .reactivex .FlowableEmitter ;
27- import io .reactivex .FlowableOnSubscribe ;
28- import io .reactivex .Observable ;
29- import io .reactivex .ObservableEmitter ;
30- import io .reactivex .ObservableOnSubscribe ;
31- import io .reactivex .Single ;
32- import io .reactivex .SingleEmitter ;
33- import io .reactivex .SingleOnSubscribe ;
34- import io .reactivex .functions .Cancellable ;
23+ import io .reactivex .rxjava3 .core .BackpressureStrategy ;
24+ import io .reactivex .rxjava3 .core .Flowable ;
25+ import io .reactivex .rxjava3 .core .FlowableEmitter ;
26+ import io .reactivex .rxjava3 .core .Observable ;
27+ import io .reactivex .rxjava3 .core .Single ;
3528
3629/**
3730 * Static methods to Rx-ify ObjectBox queries.
@@ -50,37 +43,23 @@ public static <T> Flowable<T> flowableOneByOne(final Query<T> query) {
5043 * Uses given BackpressureStrategy.
5144 */
5245 public static <T > Flowable <T > flowableOneByOne (final Query <T > query , BackpressureStrategy strategy ) {
53- return Flowable .create (new FlowableOnSubscribe <T >() {
54- @ Override
55- public void subscribe (final FlowableEmitter <T > emitter ) throws Exception {
56- createListItemEmitter (query , emitter );
57- }
58-
59- }, strategy );
46+ return Flowable .create (emitter -> createListItemEmitter (query , emitter ), strategy );
6047 }
6148
6249 static <T > void createListItemEmitter (final Query <T > query , final FlowableEmitter <T > emitter ) {
63- final DataSubscription dataSubscription = query .subscribe ().observer (new DataObserver <List <T >>() {
64- @ Override
65- public void onData (List <T > data ) {
66- for (T datum : data ) {
67- if (emitter .isCancelled ()) {
68- return ;
69- } else {
70- emitter .onNext (datum );
71- }
72- }
73- if (!emitter .isCancelled ()) {
74- emitter .onComplete ();
50+ final DataSubscription dataSubscription = query .subscribe ().observer (data -> {
51+ for (T datum : data ) {
52+ if (emitter .isCancelled ()) {
53+ return ;
54+ } else {
55+ emitter .onNext (datum );
7556 }
7657 }
77- });
78- emitter .setCancellable (new Cancellable () {
79- @ Override
80- public void cancel () throws Exception {
81- dataSubscription .cancel ();
58+ if (!emitter .isCancelled ()) {
59+ emitter .onComplete ();
8260 }
8361 });
62+ emitter .setCancellable (dataSubscription ::cancel );
8463 }
8564
8665 /**
@@ -89,44 +68,27 @@ public void cancel() throws Exception {
8968 * (see {@link Query#subscribe()} for details).
9069 */
9170 public static <T > Observable <List <T >> observable (final Query <T > query ) {
92- return Observable .create (new ObservableOnSubscribe <List <T >>() {
93- @ Override
94- public void subscribe (final ObservableEmitter <List <T >> emitter ) throws Exception {
95- final DataSubscription dataSubscription = query .subscribe ().observer (new DataObserver <List <T >>() {
96- @ Override
97- public void onData (List <T > data ) {
98- if (!emitter .isDisposed ()) {
99- emitter .onNext (data );
100- }
101- }
102- });
103- emitter .setCancellable (new Cancellable () {
104- @ Override
105- public void cancel () throws Exception {
106- dataSubscription .cancel ();
107- }
108- });
109- }
71+ return Observable .create (emitter -> {
72+ final DataSubscription dataSubscription = query .subscribe ().observer (data -> {
73+ if (!emitter .isDisposed ()) {
74+ emitter .onNext (data );
75+ }
76+ });
77+ emitter .setCancellable (dataSubscription ::cancel );
11078 });
11179 }
11280
11381 /**
11482 * The returned Single emits one Query result as a List.
11583 */
11684 public static <T > Single <List <T >> single (final Query <T > query ) {
117- return Single .create (new SingleOnSubscribe <List <T >>() {
118- @ Override
119- public void subscribe (final SingleEmitter <List <T >> emitter ) throws Exception {
120- query .subscribe ().single ().observer (new DataObserver <List <T >>() {
121- @ Override
122- public void onData (List <T > data ) {
123- if (!emitter .isDisposed ()) {
124- emitter .onSuccess (data );
125- }
126- }
127- });
128- // no need to cancel, single never subscribes
129- }
85+ return Single .create (emitter -> {
86+ query .subscribe ().single ().observer (data -> {
87+ if (!emitter .isDisposed ()) {
88+ emitter .onSuccess (data );
89+ }
90+ });
91+ // no need to cancel, single never subscribes
13092 });
13193 }
13294}
0 commit comments