Skip to content
This repository was archived by the owner on Feb 12, 2022. It is now read-only.

Commit 7a1ccca

Browse files
authored
Metrics Refactoring (#118)
* Break out key generation so that it can be reused across MetricsRecorder instances * Break out time recording so that it can be reused across MetricRecorder implementations
1 parent b7a71a3 commit 7a1ccca

File tree

7 files changed

+536
-129
lines changed

7 files changed

+536
-129
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright (c) 2017, 2018, Salesforce.com, Inc.
3+
* All rights reserved.
4+
*
5+
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
6+
* following conditions are met:
7+
*
8+
* * Redistributions of source code must retain the above copyright notice, this list of conditions and the following
9+
* disclaimer.
10+
*
11+
* * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
12+
* disclaimer in the documentation and/or other materials provided with the distribution.
13+
*
14+
* * Neither the name of Salesforce.com nor the names of its contributors may be used to endorse or promote products
15+
* derived from this software without specific prior written permission.
16+
*
17+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
18+
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20+
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
21+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
22+
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
23+
* USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
24+
*/
25+
26+
package com.salesforce.storm.spout.dynamic.metrics;
27+
28+
import org.slf4j.helpers.MessageFormatter;
29+
30+
/**
31+
* Takes a {@link MetricDefinition} and generates a String that can be used as a metric key.
32+
*/
33+
class KeyBuilder {
34+
35+
private final String keyPrefix;
36+
37+
/**
38+
* Create a new {@link KeyBuilder} with the specified prefix.
39+
*
40+
* Prefix is optional, if you don't need it simply pass in null to the builder.
41+
*
42+
* @param keyPrefix prefix to use when building keys from {@link MetricDefinition} instances
43+
*/
44+
KeyBuilder(final String keyPrefix) {
45+
this.keyPrefix = keyPrefix;
46+
}
47+
48+
/**
49+
* Build metric keys from {@link MetricDefinition} instances with the optionally supplied parameters.
50+
*
51+
* Strings are formatted using {@link MessageFormatter}, so if you have parameters "foo" and "bar" for a metric with the name of "test"
52+
* than you could use a {@link CustomMetric} with "test.{}.{}" in the constructor and pass parameters "foo" and "bar" to make the full
53+
* metric key of "test.foo.bar".
54+
*
55+
* @return format string like: "prefix.metric_name.param1.param2"
56+
*/
57+
String build(final MetricDefinition metric, final Object[] parameters) {
58+
final StringBuilder keyBuilder = new StringBuilder();
59+
60+
// Conditionally add key prefix.
61+
if (getKeyPrefix() != null && !getKeyPrefix().isEmpty()) {
62+
keyBuilder
63+
.append(getKeyPrefix())
64+
.append(".");
65+
}
66+
67+
// Our default implementation should include the simple class name in the key
68+
keyBuilder.append(
69+
MessageFormatter.arrayFormat(metric.getKey(), parameters).getMessage()
70+
);
71+
return keyBuilder.toString();
72+
}
73+
74+
/**
75+
* Get key prefix.
76+
* @return key prefix
77+
*/
78+
private String getKeyPrefix() {
79+
return keyPrefix;
80+
}
81+
}

src/main/java/com/salesforce/storm/spout/dynamic/metrics/LogRecorder.java

Lines changed: 13 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,7 @@
2828
import org.apache.storm.task.TopologyContext;
2929
import org.slf4j.Logger;
3030
import org.slf4j.LoggerFactory;
31-
import org.slf4j.helpers.MessageFormatter;
3231

33-
import java.time.Clock;
3432
import java.util.Map;
3533
import java.util.concurrent.ConcurrentHashMap;
3634

@@ -42,35 +40,17 @@
4240
public class LogRecorder implements MetricsRecorder {
4341

4442
private static final Logger logger = LoggerFactory.getLogger(LogRecorder.class);
45-
private final Map<String, Long> counters = new ConcurrentHashMap<>();
46-
private final Map<String, Object> assignedValues = new ConcurrentHashMap<>();
4743

48-
// For storing timer start values
49-
private final Map<String, Long> timerStartValues = new ConcurrentHashMap<>();
44+
private final KeyBuilder keyBuilder = new KeyBuilder(null);
5045

51-
@Override
52-
public void open(Map spoutConfig, TopologyContext topologyContext) {
53-
}
54-
55-
@Override
56-
public void close() {
57-
// Noop
58-
}
46+
private final Map<String, Long> counters = new ConcurrentHashMap<>();
47+
private final Map<String, Object> assignedValues = new ConcurrentHashMap<>();
5948

60-
@Override
61-
public void count(final MetricDefinition metric) {
62-
countBy(metric, 1L, new Object[0]);
63-
}
49+
private TimerManager timerManager;
6450

6551
@Override
66-
public void count(final MetricDefinition metric, final Object... metricParameters) {
67-
countBy(metric, 1L, metricParameters);
68-
69-
}
70-
71-
@Override
72-
public void countBy(final MetricDefinition metric, final long incrementBy) {
73-
countBy(metric, incrementBy, new Object[0]);
52+
public void open(final Map<String, Object> config, final TopologyContext topologyContext) {
53+
timerManager = new TimerManager();
7454
}
7555

7656
@Override
@@ -90,41 +70,22 @@ public void assignValue(final MetricDefinition metric, final Object value, final
9070
logger.debug("[ASSIGNED] {} => {}", key, value);
9171
}
9272

93-
@Override
94-
public void assignValue(final MetricDefinition metric, final Object value) {
95-
assignValue(metric, value, new Object[0]);
96-
}
97-
9873
@Override
9974
public void startTimer(final MetricDefinition metric, final Object... metricParameters) {
10075
final String key = generateKey(metric, metricParameters);
101-
timerStartValues.put(key, Clock.systemUTC().millis());
102-
}
103-
104-
@Override
105-
public void startTimer(final MetricDefinition metric) {
106-
startTimer(metric, new Object[0]);
76+
timerManager.start(key);
10777
}
10878

10979
@Override
11080
public long stopTimer(final MetricDefinition metric, final Object... metricParameters) {
111-
final long stopTime = Clock.systemUTC().millis();
112-
81+
// Build key from the metric
11382
final String key = generateKey(metric, metricParameters);
114-
final Long startTime = timerStartValues.get(key);
11583

116-
if (startTime == null) {
117-
logger.warn("Could not find timer key {}", key);
118-
return -1;
119-
}
120-
final long elapsedTimeMs = stopTime - startTime;
121-
logger.debug("[TIMER] {} + {}ms", key, elapsedTimeMs);
122-
return elapsedTimeMs;
123-
}
84+
final long elapsedMs = timerManager.stop(key);
12485

125-
@Override
126-
public long stopTimer(final MetricDefinition metric) {
127-
return stopTimer(metric, new Object[0]);
86+
logger.debug("[TIMER] {} + {}ms", key, elapsedMs);
87+
88+
return elapsedMs;
12889
}
12990

13091
@Override
@@ -134,6 +95,6 @@ public void recordTimer(final MetricDefinition metric, final long elapsedTimeMs,
13495
}
13596

13697
private String generateKey(final MetricDefinition metric, final Object[] parameters) {
137-
return MessageFormatter.format(metric.getKey(), parameters).getMessage();
98+
return keyBuilder.build(metric, parameters);
13899
}
139100
}

src/main/java/com/salesforce/storm/spout/dynamic/metrics/MetricsRecorder.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,33 +42,41 @@ public interface MetricsRecorder {
4242
* @param spoutConfig spout configuration.
4343
* @param topologyContext topology context.
4444
*/
45-
void open(final Map<String, Object> spoutConfig, final TopologyContext topologyContext);
45+
default void open(final Map<String, Object> spoutConfig, final TopologyContext topologyContext) {
46+
}
4647

4748
/**
4849
* Perform any cleanup.
4950
*/
50-
void close();
51+
default void close() {
52+
}
5153

5254
/**
5355
* Count a metric, given a name, increments it by 1.
5456
* @param metric metric definition.
5557
*/
56-
void count(final MetricDefinition metric);
58+
default void count(final MetricDefinition metric) {
59+
countBy(metric, 1L, new Object[0]);
60+
}
5761

5862
/**
5963
* Count a metric, given a name, increments it by 1.
6064
* @param metric metric definition.
6165
* @param metricParameters when a {@link MetricDefinition} supports interpolation on it's key, for example "foo.{}.bar" the {}
6266
* can be replace with the supplied parameters.
6367
*/
64-
void count(final MetricDefinition metric, final Object... metricParameters);
68+
default void count(final MetricDefinition metric, final Object... metricParameters) {
69+
countBy(metric, 1L, metricParameters);
70+
}
6571

6672
/**
6773
* Count a metric, given a name, increments it by value.
6874
* @param metric metric definition.
6975
* @param incrementBy amount to increment the metric by.
7076
*/
71-
void countBy(final MetricDefinition metric, final long incrementBy);
77+
default void countBy(final MetricDefinition metric, final long incrementBy) {
78+
countBy(metric, incrementBy, new Object[0]);
79+
}
7280

7381
/**
7482
* Count a metric, given a name and increments by a specific amount.
@@ -93,7 +101,9 @@ public interface MetricsRecorder {
93101
* @param metric metric definition.
94102
* @param value value to be assigned.
95103
*/
96-
void assignValue(final MetricDefinition metric, final Object value);
104+
default void assignValue(final MetricDefinition metric, final Object value) {
105+
assignValue(metric, value, new Object[0]);
106+
}
97107

98108
/**
99109
* Starts a timer for the given sourceClass and metricName.
@@ -107,7 +117,9 @@ public interface MetricsRecorder {
107117
* Starts a timer for the given sourceClass and metricName.
108118
* @param metric metric definition.
109119
*/
110-
void startTimer(final MetricDefinition metric);
120+
default void startTimer(final MetricDefinition metric) {
121+
startTimer(metric, new Object[0]);
122+
}
111123

112124
/**
113125
* Stops and records a timer for the given sourceClass and metricName.
@@ -123,7 +135,9 @@ public interface MetricsRecorder {
123135
* @param metric metric definition.
124136
* @return How long elapsed in the timer, in milliseconds.
125137
*/
126-
long stopTimer(final MetricDefinition metric);
138+
default long stopTimer(final MetricDefinition metric) {
139+
return stopTimer(metric, new Object[0]);
140+
}
127141

128142
/**
129143
* Record a timer passing in the elapsed time.

0 commit comments

Comments
 (0)