@@ -183,6 +183,7 @@ void testInvalidSchema() throws Exception {
183183 verify (destination ).start (destinationConfig , jobRoot );
184184 verify (destination ).accept (RECORD_MESSAGE1 );
185185 verify (destination ).accept (RECORD_MESSAGE2 );
186+ verify (destination ).accept (RECORD_MESSAGE3 );
186187 verify (recordSchemaValidator ).validateSchema (RECORD_MESSAGE1 .getRecord (), STREAM_NAME );
187188 verify (recordSchemaValidator ).validateSchema (RECORD_MESSAGE2 .getRecord (), STREAM_NAME );
188189 verify (recordSchemaValidator ).validateSchema (RECORD_MESSAGE3 .getRecord (), STREAM_NAME );
@@ -298,8 +299,11 @@ void testReplicationRunnableWorkerFailure() throws Exception {
298299 void testOnlyStateAndRecordMessagesDeliveredToDestination () throws Exception {
299300 final AirbyteMessage LOG_MESSAGE = AirbyteMessageUtils .createLogMessage (Level .INFO , "a log message" );
300301 final AirbyteMessage TRACE_MESSAGE = AirbyteMessageUtils .createTraceMessage ("a trace message" , 123456.0 );
302+ when (mapper .mapMessage (LOG_MESSAGE )).thenReturn (LOG_MESSAGE );
303+ when (mapper .mapMessage (TRACE_MESSAGE )).thenReturn (TRACE_MESSAGE );
304+ when (source .isFinished ()).thenReturn (false , false , false , false , true );
301305 when (source .attemptRead ()).thenReturn (Optional .of (RECORD_MESSAGE1 ), Optional .of (LOG_MESSAGE ), Optional .of (TRACE_MESSAGE ),
302- Optional .of (RECORD_MESSAGE2 ), Optional . empty () );
306+ Optional .of (RECORD_MESSAGE2 ));
303307
304308 final ReplicationWorker worker = new DefaultReplicationWorker (
305309 JOB_ID ,
0 commit comments