Skip to content
This repository was archived by the owner on Feb 12, 2022. It is now read-only.

Commit 94b5733

Browse files
authored
Fix sideline zk configuration, docs, zulu support & flapper (#122)
* Fix root config key, it shouldn't be plural * Fix doc blocks for config * Drop using javax because it's not in zulu by default * Fix flapping test due to statics in MockConsumer * Add other jdks to travis build matrix
1 parent ff3c785 commit 94b5733

File tree

7 files changed

+51
-14
lines changed

7 files changed

+51
-14
lines changed

.travis.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
language: java
22
jdk:
33
- oraclejdk8
4+
- oraclejdk11
5+
- openjdk11
46
sudo: false
57
cache:
68
directories:

src/main/java/com/salesforce/storm/spout/dynamic/VirtualSpout.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -567,19 +567,35 @@ private void attemptToComplete() {
567567
// Get current state and compare it against our ending state
568568
final ConsumerState currentState = consumer.getCurrentState();
569569

570+
if (currentState == null) {
571+
logger.error("Attempting to flush state when the current state is null.");
572+
return;
573+
}
574+
570575
// Compare it against our ending state
571576
for (final ConsumerPartition consumerPartition : currentState.getConsumerPartitions()) {
572577
// currentOffset contains the last "committed" offset our consumer has fully processed
573-
final long currentOffset = currentState.getOffsetForNamespaceAndPartition(consumerPartition);
578+
final Long currentOffset = currentState.getOffsetForNamespaceAndPartition(consumerPartition);
574579

575580
// endingOffset contains the last offset we want to process.
576-
final long endingOffset = endingState.getOffsetForNamespaceAndPartition(consumerPartition);
581+
final Long endingOffset = endingState.getOffsetForNamespaceAndPartition(consumerPartition);
582+
583+
if (currentOffset == null) {
584+
logger.error("Current offset is null");
585+
return;
586+
}
587+
588+
if (endingOffset == null) {
589+
logger.error("Ending offset is null");
590+
return;
591+
}
577592

578593
// If the current offset is < ending offset
579594
if (currentOffset < endingOffset) {
580595
// Then we cannot end
581596
return;
582597
}
598+
583599
// Log that this partition is finished, and make sure we unsubscribe from it.
584600
if (consumer.unsubscribeConsumerPartition(consumerPartition)) {
585601
logger.debug(

src/main/java/com/salesforce/storm/spout/dynamic/kafka/Consumer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,8 +455,12 @@ public ConsumerState flushConsumerState() {
455455
// Get the current state
456456
final ConsumerState consumerState = partitionOffsetsManager.getCurrentState();
457457

458-
// Persist each partition offset
458+
if (consumerState == null) {
459+
logger.error("Attempting to flush state when the current state is null.");
460+
return null;
461+
}
459462

463+
// Persist each partition offset
460464
for (Map.Entry<ConsumerPartition, Long> entry : consumerState.entrySet()) {
461465
final ConsumerPartition consumerPartition = entry.getKey();
462466
final long lastFinishedOffset = entry.getValue();

src/main/java/com/salesforce/storm/spout/sideline/config/SidelineConfig.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@
3939
public class SidelineConfig {
4040

4141
/**
42-
* (List|String) Defines one or more sideline trigger(s) (if any) to use.
42+
* (String) Defines a sideline trigger (if any) to use.
4343
* Should be a fully qualified class path that implements thee SidelineTrigger interface.
4444
*/
4545
@ConfigDocumentation(
4646
category = ConfigDocumentation.Category.SIDELINE,
47-
description = "Defines one or more sideline trigger(s) (if any) to use. "
48-
+ "Should be a fully qualified class path that implements thee SidelineTrigger interface.",
47+
description = "Defines a sideline trigger (if any) to use. "
48+
+ "Should be a fully qualified class path that implements the SidelineTrigger interface.",
4949
type = String.class
5050
)
5151
public static final String TRIGGER_CLASS = "sideline.trigger_class";

src/main/java/com/salesforce/storm/spout/sideline/recipes/trigger/zookeeper/Config.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class Config {
7171
+ "Example: \"/sideline-trigger/my-topology/my-topic\"",
7272
type = String.class
7373
)
74-
public static final String ZK_ROOT = PREFIX + "roots";
74+
public static final String ZK_ROOT = PREFIX + "root";
7575

7676
/**
7777
* (Integer) Zookeeper session timeout.

src/main/java/com/salesforce/storm/spout/sideline/recipes/trigger/zookeeper/ZookeeperWatchTrigger.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@
4747
import org.slf4j.Logger;
4848
import org.slf4j.LoggerFactory;
4949

50-
import javax.xml.bind.DatatypeConverter;
5150
import java.io.IOException;
52-
import java.io.UnsupportedEncodingException;
51+
import java.math.BigInteger;
5352
import java.nio.charset.Charset;
53+
import java.nio.charset.StandardCharsets;
5454
import java.security.MessageDigest;
5555
import java.security.NoSuchAlgorithmException;
5656
import java.time.LocalDateTime;
@@ -293,7 +293,7 @@ private SidelineRequest buildSidelineRequest(final TriggerEvent triggerEvent) {
293293
logger.info("Creating a sideline request with id {} and step {}", sidelineRequest.id, sidelineRequest.step);
294294

295295
return sidelineRequest;
296-
} catch (NoSuchAlgorithmException | UnsupportedEncodingException ex) {
296+
} catch (NoSuchAlgorithmException ex) {
297297
logger.error("Unable to generate an identifier for this request, cowardly refusing to proceed! {}", ex);
298298
return null;
299299
}
@@ -309,12 +309,10 @@ private SidelineRequest buildSidelineRequest(final TriggerEvent triggerEvent) {
309309
private SidelineRequestIdentifier generateSidelineRequestIdentifier(
310310
final TriggerEvent triggerEvent,
311311
final FilterChainStep step
312-
) throws NoSuchAlgorithmException, UnsupportedEncodingException {
312+
) throws NoSuchAlgorithmException {
313313
final String json = gson.toJson(step);
314314

315-
final StringBuilder identifier = new StringBuilder(
316-
DatatypeConverter.printHexBinary(MessageDigest.getInstance("MD5").digest(json.getBytes("UTF-8")))
317-
);
315+
final StringBuilder identifier = new StringBuilder(generateIdFromJson(json));
318316

319317
// If we were provided a date time in the event, append the time stamp of that event to the identifier
320318
if (triggerEvent.getCreatedAt() != null) {
@@ -327,6 +325,14 @@ private SidelineRequestIdentifier generateSidelineRequestIdentifier(
327325
return new SidelineRequestIdentifier(identifier.toString());
328326
}
329327

328+
private String generateIdFromJson(final String dataJson) throws NoSuchAlgorithmException {
329+
// Use the data map, which should be things unique to define this criteria to generate our id
330+
final MessageDigest md5 = MessageDigest.getInstance("MD5");
331+
md5.update(StandardCharsets.UTF_8.encode(dataJson));
332+
final String id = String.format("%032x", new BigInteger(1, md5.digest()));
333+
return id;
334+
}
335+
330336
/**
331337
* Create a trigger event from the provided data.
332338
* @return Trigger event

src/test/java/com/salesforce/storm/spout/dynamic/VirtualSpoutTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.storm.task.TopologyContext;
5050
import org.apache.storm.tuple.Values;
5151
import org.junit.jupiter.api.Assertions;
52+
import org.junit.jupiter.api.BeforeEach;
5253
import org.junit.jupiter.api.Test;
5354

5455
import java.lang.reflect.Field;
@@ -79,6 +80,14 @@
7980
*/
8081
public class VirtualSpoutTest {
8182

83+
/**
84+
* Before each test reset the static on the MockConsumer.
85+
*/
86+
@BeforeEach
87+
void resetMockConsumerPartitions() {
88+
MockConsumer.partitions = Arrays.asList(1);
89+
}
90+
8291
/**
8392
* Verify that constructor args get set appropriately.
8493
*/

0 commit comments

Comments
 (0)