@@ -29,7 +29,7 @@ module DataStores
2929
3030 class DirectFileStore
3131 class InvalidStoreSettingsError < StandardError ; end
32- AGGREGATION_MODES = [ MAX = :max , MIN = :min , SUM = :sum , ALL = :all ]
32+ AGGREGATION_MODES = [ MAX = :max , MIN = :min , SUM = :sum , ALL = :all , MOST_RECENT = :most_recent ]
3333 DEFAULT_METRIC_SETTINGS = { aggregation : SUM }
3434 DEFAULT_GAUGE_SETTINGS = { aggregation : ALL }
3535
@@ -45,7 +45,7 @@ def for_metric(metric_name, metric_type:, metric_settings: {})
4545 end
4646
4747 settings = default_settings . merge ( metric_settings )
48- validate_metric_settings ( settings )
48+ validate_metric_settings ( metric_type , settings )
4949
5050 MetricStore . new ( metric_name : metric_name ,
5151 store_settings : @store_settings ,
@@ -54,7 +54,7 @@ def for_metric(metric_name, metric_type:, metric_settings: {})
5454
5555 private
5656
57- def validate_metric_settings ( metric_settings )
57+ def validate_metric_settings ( metric_type , metric_settings )
5858 unless metric_settings . has_key? ( :aggregation ) &&
5959 AGGREGATION_MODES . include? ( metric_settings [ :aggregation ] )
6060 raise InvalidStoreSettingsError ,
@@ -65,6 +65,11 @@ def validate_metric_settings(metric_settings)
6565 raise InvalidStoreSettingsError ,
6666 "Only :aggregation setting can be specified"
6767 end
68+
69+ if metric_settings [ :aggregation ] == MOST_RECENT && metric_type != :gauge
70+ raise InvalidStoreSettingsError ,
71+ "Only :gauge metrics support :most_recent aggregation"
72+ end
6873 end
6974
7075 class MetricStore
@@ -100,6 +105,12 @@ def set(labels:, val:)
100105 end
101106
102107 def increment ( labels :, by : 1 )
108+ if @values_aggregation_mode == DirectFileStore ::MOST_RECENT
109+ raise InvalidStoreSettingsError ,
110+ "The :most_recent aggregation does not support the use of increment" \
111+ "/decrement"
112+ end
113+
103114 key = store_key ( labels )
104115 in_process_sync do
105116 value = internal_store . read_value ( key )
@@ -121,15 +132,15 @@ def all_values
121132 stores_for_metric . each do |file_path |
122133 begin
123134 store = FileMappedDict . new ( file_path , true )
124- store . all_values . each do |( labelset_qs , v ) |
135+ store . all_values . each do |( labelset_qs , v , ts ) |
125136 # Labels come as a query string, and CGI::parse returns arrays for each key
126137 # "foo=bar&x=y" => { "foo" => ["bar"], "x" => ["y"] }
127138 # Turn the keys back into symbols, and remove the arrays
128139 label_set = CGI ::parse ( labelset_qs ) . map do |k , vs |
129140 [ k . to_sym , vs . first ]
130141 end . to_h
131142
132- stores_data [ label_set ] << v
143+ stores_data [ label_set ] << [ v , ts ]
133144 end
134145 ensure
135146 store . close if store
@@ -181,30 +192,41 @@ def process_id
181192 end
182193
183194 def aggregate_values ( values )
184- if @values_aggregation_mode == SUM
185- values . inject { |sum , element | sum + element }
186- elsif @values_aggregation_mode == MAX
187- values . max
188- elsif @values_aggregation_mode == MIN
189- values . min
190- elsif @values_aggregation_mode == ALL
191- values . first
195+ # Each entry in the `values` array is a tuple of `value` and `timestamp`,
196+ # so for all aggregations except `MOST_RECENT`, we need to only take the
197+ # first value in each entry and ignore the second.
198+ if @values_aggregation_mode == MOST_RECENT
199+ latest_tuple = values . max { |a , b | a [ 1 ] <=> b [ 1 ] }
200+ latest_tuple . first # return the value without the timestamp
192201 else
193- raise InvalidStoreSettingsError ,
194- "Invalid Aggregation Mode: #{ @values_aggregation_mode } "
202+ values = values . map ( &:first ) # Discard timestamps
203+
204+ if @values_aggregation_mode == SUM
205+ values . inject { |sum , element | sum + element }
206+ elsif @values_aggregation_mode == MAX
207+ values . max
208+ elsif @values_aggregation_mode == MIN
209+ values . min
210+ elsif @values_aggregation_mode == ALL
211+ values . first
212+ else
213+ raise InvalidStoreSettingsError ,
214+ "Invalid Aggregation Mode: #{ @values_aggregation_mode } "
215+ end
195216 end
196217 end
197218 end
198219
199220 private_constant :MetricStore
200221
201- # A dict of doubles, backed by an file we access directly a a byte array.
222+ # A dict of doubles, backed by an file we access directly as a byte array.
202223 #
203224 # The file starts with a 4 byte int, indicating how much of it is used.
204225 # Then 4 bytes of padding.
205226 # There's then a number of entries, consisting of a 4 byte int which is the
206227 # size of the next field, a utf-8 encoded string key, padding to an 8 byte
207- # alignment, and then a 8 byte float which is the value.
228+ # alignment, and then a 8 byte float which is the value, and then a 8 byte
229+ # float which is the unix timestamp when the value was set.
208230 class FileMappedDict
209231 INITIAL_FILE_SIZE = 1024 *1024
210232
@@ -235,8 +257,8 @@ def all_values
235257 with_file_lock do
236258 @positions . map do |key , pos |
237259 @f . seek ( pos )
238- value = @f . read ( 8 ) . unpack ( 'd' ) [ 0 ]
239- [ key , value ]
260+ value , timestamp = @f . read ( 16 ) . unpack ( 'dd' )
261+ [ key , value , timestamp ]
240262 end
241263 end
242264 end
@@ -256,9 +278,10 @@ def write_value(key, value)
256278 init_value ( key )
257279 end
258280
281+ now = Process . clock_gettime ( Process ::CLOCK_MONOTONIC )
259282 pos = @positions [ key ]
260283 @f . seek ( pos )
261- @f . write ( [ value ] . pack ( 'd ' ) )
284+ @f . write ( [ value , now ] . pack ( 'dd ' ) )
262285 @f . flush
263286 end
264287
@@ -299,7 +322,7 @@ def resize_file(new_capacity)
299322 def init_value ( key )
300323 # Pad to be 8-byte aligned.
301324 padded = key + ( ' ' * ( 8 - ( key . length + 4 ) % 8 ) )
302- value = [ padded . length , padded , 0.0 ] . pack ( "lA#{ padded . length } d " )
325+ value = [ padded . length , padded , 0.0 , 0.0 ] . pack ( "lA#{ padded . length } dd " )
303326 while @used + value . length > @capacity
304327 @capacity *= 2
305328 resize_file ( @capacity )
@@ -310,7 +333,7 @@ def init_value(key)
310333 @f . seek ( 0 )
311334 @f . write ( [ @used ] . pack ( 'l' ) )
312335 @f . flush
313- @positions [ key ] = @used - 8
336+ @positions [ key ] = @used - 16
314337 end
315338
316339 # Read position of all keys. No locking is performed.
@@ -320,7 +343,7 @@ def populate_positions
320343 padded_len = @f . read ( 4 ) . unpack ( 'l' ) [ 0 ]
321344 key = @f . read ( padded_len ) . unpack ( "A#{ padded_len } " ) [ 0 ] . strip
322345 @positions [ key ] = @f . pos
323- @f . seek ( 8 , :CUR )
346+ @f . seek ( 16 , :CUR )
324347 end
325348 end
326349 end
0 commit comments