Skip to content

Commit 46939fc

Browse files
authored
core/sampler: Support decision hook (#813)
This adds support for monitoring sampling decisions made by the Sampler core with a `func(Entry, SamplingDecision)` where `SamplingDecision` is a bit field. To allow plumbing the hook to the sampler, this additionally deprecates the `NewSampler` constructor in favor of `NewSamplerWithOptions`. type SamplerOption func SamplerHook(func(Entry, SamplingDecision)) func NewSamplerWithOptions(/* ... */, opts ...SamplerOption) This functionality is usable from the `zap` package via the new `Hook` field of `zap.SamplingConfig`. Refs T5056227
1 parent b2382d7 commit 46939fc

File tree

6 files changed

+227
-18
lines changed

6 files changed

+227
-18
lines changed

benchmarks/zap_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func newZapLogger(lvl zapcore.Level) *zap.Logger {
116116
}
117117

118118
func newSampledLogger(lvl zapcore.Level) *zap.Logger {
119-
return zap.New(zapcore.NewSampler(
119+
return zap.New(zapcore.NewSamplerWithOptions(
120120
newZapLogger(zap.DebugLevel).Core(),
121121
100*time.Millisecond,
122122
10, // first

config.go

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,14 @@ import (
3232
// global CPU and I/O load that logging puts on your process while attempting
3333
// to preserve a representative subset of your logs.
3434
//
35-
// Values configured here are per-second. See zapcore.NewSampler for details.
35+
// If specified, the Sampler will invoke the Hook after each decision.
36+
//
37+
// Values configured here are per-second. See zapcore.NewSamplerWithOptions for
38+
// details.
3639
type SamplingConfig struct {
37-
Initial int `json:"initial" yaml:"initial"`
38-
Thereafter int `json:"thereafter" yaml:"thereafter"`
40+
Initial int `json:"initial" yaml:"initial"`
41+
Thereafter int `json:"thereafter" yaml:"thereafter"`
42+
Hook func(zapcore.Entry, zapcore.SamplingDecision) `json:"-" yaml:"-"`
3943
}
4044

4145
// Config offers a declarative way to construct a logger. It doesn't do
@@ -208,9 +212,19 @@ func (cfg Config) buildOptions(errSink zapcore.WriteSyncer) []Option {
208212
opts = append(opts, AddStacktrace(stackLevel))
209213
}
210214

211-
if cfg.Sampling != nil {
215+
if scfg := cfg.Sampling; scfg != nil {
212216
opts = append(opts, WrapCore(func(core zapcore.Core) zapcore.Core {
213-
return zapcore.NewSampler(core, time.Second, int(cfg.Sampling.Initial), int(cfg.Sampling.Thereafter))
217+
var samplerOpts []zapcore.SamplerOption
218+
if scfg.Hook != nil {
219+
samplerOpts = append(samplerOpts, zapcore.SamplerHook(scfg.Hook))
220+
}
221+
return zapcore.NewSamplerWithOptions(
222+
core,
223+
time.Second,
224+
cfg.Sampling.Initial,
225+
cfg.Sampling.Thereafter,
226+
samplerOpts...,
227+
)
214228
}))
215229
}
216230

config_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727

2828
"github.com/stretchr/testify/assert"
2929
"github.com/stretchr/testify/require"
30+
"go.uber.org/atomic"
3031
"go.uber.org/zap/zapcore"
3132
)
3233

@@ -144,3 +145,69 @@ func TestConfigWithMissingAttributes(t *testing.T) {
144145
})
145146
}
146147
}
148+
149+
func makeSamplerCountingHook() (h func(zapcore.Entry, zapcore.SamplingDecision),
150+
dropped, sampled *atomic.Int64) {
151+
dropped = new(atomic.Int64)
152+
sampled = new(atomic.Int64)
153+
h = func(_ zapcore.Entry, dec zapcore.SamplingDecision) {
154+
if dec&zapcore.LogDropped > 0 {
155+
dropped.Inc()
156+
} else if dec&zapcore.LogSampled > 0 {
157+
sampled.Inc()
158+
}
159+
}
160+
return h, dropped, sampled
161+
}
162+
163+
func TestConfigWithSamplingHook(t *testing.T) {
164+
shook, dcount, scount := makeSamplerCountingHook()
165+
cfg := Config{
166+
Level: NewAtomicLevelAt(InfoLevel),
167+
Development: false,
168+
Sampling: &SamplingConfig{
169+
Initial: 100,
170+
Thereafter: 100,
171+
Hook: shook,
172+
},
173+
Encoding: "json",
174+
EncoderConfig: NewProductionEncoderConfig(),
175+
OutputPaths: []string{"stderr"},
176+
ErrorOutputPaths: []string{"stderr"},
177+
}
178+
expectRe := `{"level":"info","caller":"zap/config_test.go:\d+","msg":"info","k":"v","z":"zz"}` + "\n" +
179+
`{"level":"warn","caller":"zap/config_test.go:\d+","msg":"warn","k":"v","z":"zz"}` + "\n"
180+
expectDropped := 99 // 200 - 100 initial - 1 thereafter
181+
expectSampled := 103 // 2 from initial + 100 + 1 thereafter
182+
183+
temp, err := ioutil.TempFile("", "zap-prod-config-test")
184+
require.NoError(t, err, "Failed to create temp file.")
185+
defer func() {
186+
err := os.Remove(temp.Name())
187+
if err != nil {
188+
return
189+
}
190+
}()
191+
192+
cfg.OutputPaths = []string{temp.Name()}
193+
cfg.EncoderConfig.TimeKey = "" // no timestamps in tests
194+
cfg.InitialFields = map[string]interface{}{"z": "zz", "k": "v"}
195+
196+
logger, err := cfg.Build()
197+
require.NoError(t, err, "Unexpected error constructing logger.")
198+
199+
logger.Debug("debug")
200+
logger.Info("info")
201+
logger.Warn("warn")
202+
203+
byteContents, err := ioutil.ReadAll(temp)
204+
require.NoError(t, err, "Couldn't read log contents from temp file.")
205+
logs := string(byteContents)
206+
assert.Regexp(t, expectRe, logs, "Unexpected log output.")
207+
208+
for i := 0; i < 200; i++ {
209+
logger.Info("sampling")
210+
}
211+
assert.Equal(t, int64(expectDropped), dcount.Load())
212+
assert.Equal(t, int64(expectSampled), scount.Load())
213+
}

zapcore/sampler.go

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,33 +81,104 @@ func (c *counter) IncCheckReset(t time.Time, tick time.Duration) uint64 {
8181
return 1
8282
}
8383

84-
type sampler struct {
85-
Core
84+
// SamplingDecision is a decision represented as a bit field made by sampler.
85+
// More decisions may be added in the future.
86+
type SamplingDecision uint32
8687

87-
counts *counters
88-
tick time.Duration
89-
first, thereafter uint64
88+
const (
89+
// LogDropped indicates that the Sampler dropped a log entry.
90+
LogDropped SamplingDecision = 1 << iota
91+
// LogSampled indicates that the Sampler sampled a log entry.
92+
LogSampled
93+
)
94+
95+
// optionFunc wraps a func so it satisfies the SamplerOption interface.
96+
type optionFunc func(*sampler)
97+
98+
func (f optionFunc) apply(s *sampler) {
99+
f(s)
100+
}
101+
102+
// SamplerOption configures a Sampler.
103+
type SamplerOption interface {
104+
apply(*sampler)
90105
}
91106

92-
// NewSampler creates a Core that samples incoming entries, which caps the CPU
93-
// and I/O load of logging while attempting to preserve a representative subset
94-
// of your logs.
107+
// nopSamplingHook is the default hook used by sampler.
108+
func nopSamplingHook(Entry, SamplingDecision) {}
109+
110+
// SamplerHook registers a function which will be called when Sampler makes a
111+
// decision.
112+
//
113+
// This hook may be used to get visibility into the performance of the sampler.
114+
// For example, use it to track metrics of dropped versus sampled logs.
115+
//
116+
// var dropped atomic.Int64
117+
// zapcore.SamplerHook(func(ent zapcore.Entry, dec zapcore.SamplingDecision) {
118+
// if dec&zapcore.LogDropped > 0 {
119+
// dropped.Inc()
120+
// }
121+
// })
122+
func SamplerHook(hook func(entry Entry, dec SamplingDecision)) SamplerOption {
123+
return optionFunc(func(s *sampler) {
124+
s.hook = hook
125+
})
126+
}
127+
128+
// NewSamplerWithOptions creates a Core that samples incoming entries, which
129+
// caps the CPU and I/O load of logging while attempting to preserve a
130+
// representative subset of your logs.
95131
//
96132
// Zap samples by logging the first N entries with a given level and message
97133
// each tick. If more Entries with the same level and message are seen during
98134
// the same interval, every Mth message is logged and the rest are dropped.
99135
//
136+
// Sampler can be configured to report sampling decisions with the SamplerHook
137+
// option.
138+
//
100139
// Keep in mind that zap's sampling implementation is optimized for speed over
101140
// absolute precision; under load, each tick may be slightly over- or
102141
// under-sampled.
103-
func NewSampler(core Core, tick time.Duration, first, thereafter int) Core {
104-
return &sampler{
142+
func NewSamplerWithOptions(core Core, tick time.Duration, first, thereafter int, opts ...SamplerOption) Core {
143+
s := &sampler{
105144
Core: core,
106145
tick: tick,
107146
counts: newCounters(),
108147
first: uint64(first),
109148
thereafter: uint64(thereafter),
149+
hook: nopSamplingHook,
110150
}
151+
for _, opt := range opts {
152+
opt.apply(s)
153+
}
154+
155+
return s
156+
}
157+
158+
type sampler struct {
159+
Core
160+
161+
counts *counters
162+
tick time.Duration
163+
first, thereafter uint64
164+
hook func(Entry, SamplingDecision)
165+
}
166+
167+
// NewSampler creates a Core that samples incoming entries, which
168+
// caps the CPU and I/O load of logging while attempting to preserve a
169+
// representative subset of your logs.
170+
//
171+
// Zap samples by logging the first N entries with a given level and message
172+
// each tick. If more Entries with the same level and message are seen during
173+
// the same interval, every Mth message is logged and the rest are dropped.
174+
//
175+
// Keep in mind that zap's sampling implementation is optimized for speed over
176+
// absolute precision; under load, each tick may be slightly over- or
177+
// under-sampled.
178+
//
179+
// Deprecated: use NewSamplerWithOptions.
180+
func NewSampler(core Core, tick time.Duration, first, thereafter int) Core {
181+
return NewSamplerWithOptions(core, tick, first, thereafter)
111182
}
112183

113184
func (s *sampler) With(fields []Field) Core {
@@ -117,6 +188,7 @@ func (s *sampler) With(fields []Field) Core {
117188
counts: s.counts,
118189
first: s.first,
119190
thereafter: s.thereafter,
191+
hook: s.hook,
120192
}
121193
}
122194

@@ -128,7 +200,9 @@ func (s *sampler) Check(ent Entry, ce *CheckedEntry) *CheckedEntry {
128200
counter := s.counts.get(ent.Level, ent.Message)
129201
n := counter.IncCheckReset(ent.Time, s.tick)
130202
if n > s.first && (n-s.first)%s.thereafter != 0 {
203+
s.hook(ent, LogDropped)
131204
return ce
132205
}
206+
s.hook(ent, LogSampled)
133207
return s.Core.Check(ent, ce)
134208
}

zapcore/sampler_bench_test.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"testing"
2626
"time"
2727

28+
"github.com/stretchr/testify/assert"
29+
"go.uber.org/atomic"
2830
"go.uber.org/zap/internal/ztest"
2931
. "go.uber.org/zap/zapcore"
3032
)
@@ -203,7 +205,7 @@ var counterTestCases = [][]string{
203205
func BenchmarkSampler_Check(b *testing.B) {
204206
for _, keys := range counterTestCases {
205207
b.Run(fmt.Sprintf("%v keys", len(keys)), func(b *testing.B) {
206-
fac := NewSampler(
208+
fac := NewSamplerWithOptions(
207209
NewCore(
208210
NewJSONEncoder(testEncoderConfig()),
209211
&ztest.Discarder{},
@@ -228,3 +230,54 @@ func BenchmarkSampler_Check(b *testing.B) {
228230
})
229231
}
230232
}
233+
234+
func makeSamplerCountingHook() (func(_ Entry, dec SamplingDecision), *atomic.Int64, *atomic.Int64) {
235+
droppedCount := new(atomic.Int64)
236+
sampledCount := new(atomic.Int64)
237+
h := func(_ Entry, dec SamplingDecision) {
238+
if dec&LogDropped > 0 {
239+
droppedCount.Inc()
240+
} else if dec&LogSampled > 0 {
241+
sampledCount.Inc()
242+
}
243+
}
244+
return h, droppedCount, sampledCount
245+
}
246+
247+
func BenchmarkSampler_CheckWithHook(b *testing.B) {
248+
hook, dropped, sampled := makeSamplerCountingHook()
249+
for _, keys := range counterTestCases {
250+
b.Run(fmt.Sprintf("%v keys", len(keys)), func(b *testing.B) {
251+
fac := NewSamplerWithOptions(
252+
NewCore(
253+
NewJSONEncoder(testEncoderConfig()),
254+
&ztest.Discarder{},
255+
DebugLevel,
256+
),
257+
time.Millisecond,
258+
1,
259+
1000,
260+
SamplerHook(hook),
261+
)
262+
b.ResetTimer()
263+
b.RunParallel(func(pb *testing.PB) {
264+
i := 0
265+
for pb.Next() {
266+
ent := Entry{
267+
Level: DebugLevel + Level(i%4),
268+
Message: keys[i],
269+
}
270+
_ = fac.Check(ent, nil)
271+
i++
272+
if n := len(keys); i >= n {
273+
i -= n
274+
}
275+
}
276+
})
277+
})
278+
}
279+
// We expect to see 1000 dropped messages for every sampled per settings,
280+
// with a delta due to less 1000 messages getting dropped after initial one
281+
// is sampled.
282+
assert.Greater(b, dropped.Load()/1000, sampled.Load()-1000)
283+
}

zapcore/sampler_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737

3838
func fakeSampler(lvl LevelEnabler, tick time.Duration, first, thereafter int) (Core, *observer.ObservedLogs) {
3939
core, logs := observer.New(lvl)
40+
// Keep using deprecated constructor for cc.
4041
core = NewSampler(core, tick, first, thereafter)
4142
return core, logs
4243
}
@@ -162,7 +163,7 @@ func TestSamplerConcurrent(t *testing.T) {
162163

163164
tick := ztest.Timeout(10 * time.Millisecond)
164165
cc := &countingCore{}
165-
sampler := NewSampler(cc, tick, logsPerTick, 100000)
166+
sampler := NewSamplerWithOptions(cc, tick, logsPerTick, 100000)
166167

167168
var (
168169
done atomic.Bool

0 commit comments

Comments
 (0)