2222import org .mockito .junit .MockitoJUnitRunner ;
2323
2424import java .util .ArrayList ;
25+ import java .util .Arrays ;
2526import java .util .Collections ;
2627import java .util .List ;
2728import java .util .concurrent .CopyOnWriteArrayList ;
3132
3233import io .objectbox .query .FakeQueryPublisher ;
3334import io .objectbox .query .MockQuery ;
34- import io .reactivex .rxjava3 .annotations .NonNull ;
3535import io .reactivex .rxjava3 .core .Flowable ;
3636import io .reactivex .rxjava3 .core .Observable ;
3737import io .reactivex .rxjava3 .core .Observer ;
4141import io .reactivex .rxjava3 .functions .Consumer ;
4242
4343
44- import static org .junit .Assert .*;
44+ import static org .junit .Assert .assertEquals ;
45+ import static org .junit .Assert .assertNull ;
46+ import static org .junit .Assert .assertTrue ;
4547
48+ /**
49+ * This test has a counterpart in internal integration tests using a real Query and BoxStore.
50+ */
4651@ RunWith (MockitoJUnitRunner .class )
47- public class QueryObserverTest implements Observer <List <String >>, SingleObserver <List <String >>, Consumer <String > {
48-
49- private List <List <String >> receivedChanges = new CopyOnWriteArrayList <>();
50- private CountDownLatch latch = new CountDownLatch (1 );
52+ public class QueryObserverTest {
5153
5254 private MockQuery <String > mockQuery = new MockQuery <>(false );
5355 private FakeQueryPublisher <String > publisher = mockQuery .getFakeQueryPublisher ();
5456 private List <String > listResult = new ArrayList <>();
55- private Throwable error ;
56-
57- private AtomicInteger completedCount = new AtomicInteger ();
5857
5958 @ Before
6059 public void prep () {
@@ -63,114 +62,155 @@ public void prep() {
6362 }
6463
6564 @ Test
66- public void testObservable () {
67- Observable observable = RxQuery .observable (mockQuery .getQuery ());
68- observable .subscribe ((Observer ) this );
69- assertLatchCountedDown (latch , 2 );
70- assertEquals (1 , receivedChanges .size ());
71- assertEquals (0 , receivedChanges .get (0 ).size ());
72- assertNull (error );
73-
74- latch = new CountDownLatch (1 );
75- receivedChanges .clear ();
65+ public void observable () {
66+ Observable <List <String >> observable = RxQuery .observable (mockQuery .getQuery ());
67+
68+ // Subscribe should emit.
69+ TestObserver testObserver = new TestObserver ();
70+ observable .subscribe (testObserver );
71+
72+ testObserver .assertLatchCountedDown (2 );
73+ assertEquals (1 , testObserver .receivedChanges .size ());
74+ assertEquals (0 , testObserver .receivedChanges .get (0 ).size ());
75+ assertNull (testObserver .error );
76+
77+ // Publish should emit.
78+ testObserver .resetLatch (1 );
79+ testObserver .receivedChanges .clear ();
80+
7681 publisher .setQueryResult (listResult );
7782 publisher .publish ();
7883
79- assertLatchCountedDown (latch , 5 );
80- assertEquals (1 , receivedChanges .size ());
81- assertEquals (2 , receivedChanges .get (0 ).size ());
84+ testObserver . assertLatchCountedDown (5 );
85+ assertEquals (1 , testObserver . receivedChanges .size ());
86+ assertEquals (2 , testObserver . receivedChanges .get (0 ).size ());
8287
83- assertEquals (0 , completedCount .get ());
84-
85- //Unsubscribe?
86- // receivedChanges.clear();
87- // latch = new CountDownLatch(1);
88- // assertLatchCountedDown(latch, 5);
89- //
90- // assertEquals(1, receivedChanges.size());
91- // assertEquals(3, receivedChanges.get(0).size());
88+ // Finally, should not be completed.
89+ assertEquals (0 , testObserver .completedCount .get ());
9290 }
9391
9492 @ Test
95- public void testFlowableOneByOne () {
93+ public void flowableOneByOne () {
9694 publisher .setQueryResult (listResult );
9795
98- latch = new CountDownLatch (2 );
99- Flowable flowable = RxQuery .flowableOneByOne (mockQuery .getQuery ());
100- flowable .subscribe (this );
101- assertLatchCountedDown (latch , 2 );
102- assertEquals (2 , receivedChanges .size ());
103- assertEquals (1 , receivedChanges .get (0 ).size ());
104- assertEquals (1 , receivedChanges .get (1 ).size ());
105- assertNull (error );
96+ Flowable <String > flowable = RxQuery .flowableOneByOne (mockQuery .getQuery ());
97+
98+ TestObserver testObserver = new TestObserver ();
99+ testObserver .resetLatch (2 );
100+ //noinspection ResultOfMethodCallIgnored
101+ flowable .subscribe (testObserver );
102+
103+ testObserver .assertLatchCountedDown (2 );
104+ assertEquals (2 , testObserver .receivedChanges .size ());
105+ assertEquals (1 , testObserver .receivedChanges .get (0 ).size ());
106+ assertEquals (1 , testObserver .receivedChanges .get (1 ).size ());
107+ assertNull (testObserver .error );
108+
109+ testObserver .receivedChanges .clear ();
106110
107- receivedChanges .clear ();
108111 publisher .publish ();
109- assertNoMoreResults ();
112+ testObserver . assertNoMoreResults ();
110113 }
111114
112115 @ Test
113- public void testSingle () {
116+ public void single () {
114117 publisher .setQueryResult (listResult );
115- Single single = RxQuery .single (mockQuery .getQuery ());
116- single .subscribe ((SingleObserver ) this );
117- assertLatchCountedDown (latch , 2 );
118- assertEquals (1 , receivedChanges .size ());
119- assertEquals (2 , receivedChanges .get (0 ).size ());
120118
121- receivedChanges .clear ();
119+ Single <List <String >> single = RxQuery .single (mockQuery .getQuery ());
120+
121+ TestObserver testObserver = new TestObserver ();
122+ single .subscribe (testObserver );
123+
124+ testObserver .assertLatchCountedDown (2 );
125+ assertEquals (1 , testObserver .receivedChanges .size ());
126+ assertEquals (2 , testObserver .receivedChanges .get (0 ).size ());
127+
128+ testObserver .receivedChanges .clear ();
129+
122130 publisher .publish ();
123- assertNoMoreResults ();
131+ testObserver . assertNoMoreResults ();
124132 }
125133
126- protected void assertNoMoreResults () {
127- assertEquals (0 , receivedChanges .size ());
128- try {
129- Thread .sleep (20 );
130- } catch (InterruptedException e ) {
131- throw new RuntimeException (e );
134+ private static class TestObserver implements Observer <List <String >>, SingleObserver <List <String >>, Consumer <String > {
135+
136+ List <List <String >> receivedChanges = new CopyOnWriteArrayList <>();
137+ CountDownLatch latch = new CountDownLatch (1 );
138+ Throwable error ;
139+ AtomicInteger completedCount = new AtomicInteger ();
140+
141+ private void log (String message ) {
142+ System .out .println ("TestObserver: " + message );
132143 }
133- assertEquals (0 , receivedChanges .size ());
134- }
135144
136- protected void assertLatchCountedDown (CountDownLatch latch , int seconds ) {
137- try {
138- assertTrue (latch .await (seconds , TimeUnit .SECONDS ));
139- } catch (InterruptedException e ) {
140- throw new RuntimeException (e );
145+ void printEvents () {
146+ int count = receivedChanges .size ();
147+ log ("Received " + count + " event(s):" );
148+ for (int i = 0 ; i < count ; i ++) {
149+ List <String > receivedChange = receivedChanges .get (i );
150+ log ((i + 1 ) + "/" + count + ": size=" + receivedChange .size ()
151+ + "; items=" + Arrays .toString (receivedChange .toArray ()));
152+ }
141153 }
142- }
143154
144- @ Override
145- public void onSubscribe (Disposable d ) {
155+ void resetLatch (int count ) {
156+ latch = new CountDownLatch (count );
157+ }
146158
147- }
159+ void assertLatchCountedDown (int seconds ) {
160+ try {
161+ assertTrue (latch .await (seconds , TimeUnit .SECONDS ));
162+ } catch (InterruptedException e ) {
163+ throw new RuntimeException (e );
164+ }
165+ printEvents ();
166+ }
148167
149- @ Override
150- public void onSuccess (List <String > queryResult ) {
151- receivedChanges .add (queryResult );
152- latch .countDown ();
153- }
168+ void assertNoMoreResults () {
169+ assertEquals (0 , receivedChanges .size ());
170+ try {
171+ Thread .sleep (20 );
172+ } catch (InterruptedException e ) {
173+ throw new RuntimeException (e );
174+ }
175+ assertEquals (0 , receivedChanges .size ());
176+ }
154177
155- @ Override
156- public void onNext (List <String > queryResult ) {
157- receivedChanges .add (queryResult );
158- latch .countDown ();
159- }
178+ @ Override
179+ public void onSubscribe (Disposable d ) {
180+ log ("onSubscribe" );
181+ }
160182
161- @ Override
162- public void onError (Throwable e ) {
163- error = e ;
164- }
183+ @ Override
184+ public void onNext (List <String > t ) {
185+ log ("onNext" );
186+ receivedChanges .add (t );
187+ latch .countDown ();
188+ }
165189
166- @ Override
167- public void onComplete () {
168- completedCount .incrementAndGet ();
169- }
190+ @ Override
191+ public void onError (Throwable e ) {
192+ log ("onError" );
193+ error = e ;
194+ }
195+
196+ @ Override
197+ public void onComplete () {
198+ log ("onComplete" );
199+ completedCount .incrementAndGet ();
200+ }
201+
202+ @ Override
203+ public void accept (String t ) {
204+ log ("accept" );
205+ receivedChanges .add (Collections .singletonList (t ));
206+ latch .countDown ();
207+ }
170208
171- @ Override
172- public void accept (@ NonNull String s ) throws Exception {
173- receivedChanges .add (Collections .singletonList (s ));
174- latch .countDown ();
209+ @ Override
210+ public void onSuccess (List <String > t ) {
211+ log ("onSuccess" );
212+ receivedChanges .add (t );
213+ latch .countDown ();
214+ }
175215 }
176216}
0 commit comments