4949import org .apache .storm .task .TopologyContext ;
5050import org .apache .storm .tuple .Values ;
5151import org .junit .jupiter .api .Assertions ;
52+ import org .junit .jupiter .api .BeforeEach ;
5253import org .junit .jupiter .api .Test ;
5354
5455import java .lang .reflect .Field ;
5556import java .util .Arrays ;
5657import java .util .HashMap ;
5758import java .util .Map ;
5859
59- import static org .awaitility .Awaitility .await ;
60- import static org .hamcrest .CoreMatchers .equalTo ;
6160import static org .junit .jupiter .api .Assertions .assertEquals ;
6261import static org .junit .jupiter .api .Assertions .assertFalse ;
6362import static org .junit .jupiter .api .Assertions .assertNotNull ;
8180 */
8281public class VirtualSpoutTest {
8382
83+ /**
84+ * Before each test reset the static on the MockConsumer.
85+ */
86+ @ BeforeEach
87+ void resetMockConsumerPartitions () {
88+ MockConsumer .partitions = Arrays .asList (1 );
89+ }
90+
8491 /**
8592 * Verify that constructor args get set appropriately.
8693 */
@@ -189,8 +196,6 @@ public void testSetAndGetConsumerId() {
189196 public void testAttemptToComplete () {
190197 final Map <String , Object > config = getDefaultConfig ();
191198
192- MockConsumer .partitions = Arrays .asList (1 );
193-
194199 // Our starting state
195200 final ConsumerState startingState = ConsumerState .builder ()
196201 .withPartition (new ConsumerPartition (MockConsumer .topic , 1 ), 1L )
@@ -199,7 +204,7 @@ public void testAttemptToComplete() {
199204 final ConsumerState endingState = ConsumerState .builder ()
200205 .withPartition (new ConsumerPartition (MockConsumer .topic , 1 ), 1L )
201206 .build ();
202- System . out . println ( endingState . toString ());
207+
203208 // Create spout
204209 final VirtualSpout virtualSpout = new VirtualSpout (
205210 new DefaultVirtualSpoutIdentifier ("MyConsumerId" ),
@@ -213,38 +218,12 @@ public void testAttemptToComplete() {
213218 // Open it, this will setup the consumer (which is our mock)
214219 virtualSpout .open ();
215220
216- /*
217- try {
218- Thread.sleep(1000);
219- } catch (InterruptedException ex) {
220- System.out.println("Couldn't sleep as long as I liked...");
221- }
222- */
223-
224221 // Should default to false, we haven't attempt completing yet
225222 assertFalse (virtualSpout .isCompleted (), "Should default to false" );
226223
227- // This should wait to flush state until the consumer is opened. If we attempt to flush state before
228- // the consumer is opened we will get a null pointer
229- await ()
230- .until (() -> virtualSpout .isOpened (), equalTo (true ));
231-
232-
233- System .out .println (virtualSpout .getCurrentState ());
234- System .out .println (virtualSpout .getStartingState ());
235- System .out .println (virtualSpout .getEndingState ());
236-
237- // Our spout is now complete
238- await ()
239- .until (() -> {
240-
241- // Flushing state will attempt to complete the VirtualSpout, because our starting and offstate are the same
242- // and we only have one partition this should call setCompleted() right away
243- virtualSpout .flushState ();
244- return virtualSpout .isCompleted ();
245- }, equalTo (true ));
224+ virtualSpout .flushState ();
246225
247- // assertTrue(virtualSpout.isCompleted(), "Should be true");
226+ assertTrue (virtualSpout .isCompleted (), "Should be true" );
248227 }
249228
250229 /**
0 commit comments