- Notifications
You must be signed in to change notification settings - Fork 541
TBS: set default sampling.tail.storage_limit to 0 but limit disk usage to 90% #15467
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0e138ff 34406b7 a614778 27d2e21 a047c91 6bc1078 b409d03 28e440f e51d329 e550b11 954eca1 38489a9 cf9e2da 7c89ced 454ab84 d6c84b8 40ed22a b602c0a fa9f12c f333764 d23c40d 06cea30 fe5fa61 5c0a7d9 5474d43 a22f2a3 6342883 2e4206e 0144ddb 6988450 af3fe53 494a540 c6403d5 e66e070 e2034b5 5bc7004 1904ae0 839efb3 682f7d3 18fe30b ca62f9f 370c7ea 0678a5f 1e1b4cf bdff329 0d74181 856a5bc 615e94d 7bcfc60 4677241 235d1ed File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -16,6 +16,7 @@ import ( | |
| "time" | ||
| | ||
| "github.com/cockroachdb/pebble/v2" | ||
| "github.com/cockroachdb/pebble/v2/vfs" | ||
| "go.opentelemetry.io/otel/metric" | ||
| "golang.org/x/sync/errgroup" | ||
| | ||
| | @@ -46,6 +47,12 @@ const ( | |
| | ||
| // diskUsageFetchInterval is how often disk usage is fetched which is equivalent to how long disk usage is cached. | ||
| diskUsageFetchInterval = 1 * time.Second | ||
| | ||
| // dbStorageLimitFallback is the default fallback storage limit in bytes | ||
| // that applies when disk usage threshold cannot be enforced due to an error. | ||
| dbStorageLimitFallback = 3 << 30 | ||
| | ||
| gb = float64(1 << 30) | ||
| ) | ||
| | ||
| type StorageManagerOptions func(*StorageManager) | ||
| | @@ -62,6 +69,27 @@ func WithMeterProvider(mp metric.MeterProvider) StorageManagerOptions { | |
| } | ||
| } | ||
| | ||
| // WithGetDBSize configures getDBSize function used by StorageManager. | ||
| // For testing only. | ||
| func WithGetDBSize(getDBSize func() uint64) StorageManagerOptions { | ||
| return func(sm *StorageManager) { | ||
| sm.getDBSize = getDBSize | ||
| } | ||
| } | ||
| | ||
| // WithGetDiskUsage configures getDiskUsage function used by StorageManager. | ||
| // For testing only. | ||
| func WithGetDiskUsage(getDiskUsage func() (DiskUsage, error)) StorageManagerOptions { | ||
| return func(sm *StorageManager) { | ||
| sm.getDiskUsage = getDiskUsage | ||
| } | ||
| } | ||
| | ||
| // DiskUsage is the struct returned by getDiskUsage. | ||
| type DiskUsage struct { | ||
| UsedBytes, TotalBytes uint64 | ||
| } | ||
| | ||
| // StorageManager encapsulates pebble.DB. | ||
| // It assumes exclusive access to pebble DB at storageDir. | ||
| type StorageManager struct { | ||
| | @@ -80,9 +108,20 @@ type StorageManager struct { | |
| // subscriberPosMu protects the subscriber file from concurrent RW. | ||
| subscriberPosMu sync.Mutex | ||
| | ||
| // getDBSize returns the total size of databases in bytes. | ||
| getDBSize func() uint64 | ||
| // cachedDBSize is a cached result of db size. | ||
| cachedDBSize atomic.Uint64 | ||
| | ||
| // getDiskUsage returns the disk / filesystem usage statistics of storageDir. | ||
| getDiskUsage func() (DiskUsage, error) | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we call it Member Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should have called it cachedDiskStat and changing diskStatFailed to getDiskUsageFailed. Keeping the call as getDiskUsage as the pebble function is called GetDiskUsage. | ||
| // getDiskUsageFailed indicates if getDiskUsage calls ever failed. | ||
| getDiskUsageFailed atomic.Bool | ||
| // cachedDiskStat is disk usage statistics about the disk only, not related to the databases. | ||
| cachedDiskStat struct { | ||
| used, total atomic.Uint64 | ||
| } | ||
| | ||
| // runCh acts as a mutex to ensure only 1 Run is actively running per StorageManager. | ||
| // as it is possible that 2 separate Run are created by 2 TBS processors during a hot reload. | ||
| runCh chan struct{} | ||
| | @@ -100,6 +139,16 @@ func NewStorageManager(storageDir string, opts ...StorageManagerOptions) (*Stora | |
| runCh: make(chan struct{}, 1), | ||
| logger: logp.NewLogger(logs.Sampling), | ||
| codec: ProtobufCodec{}, | ||
| getDiskUsage: func() (DiskUsage, error) { | ||
| usage, err := vfs.Default.GetDiskUsage(storageDir) | ||
| return DiskUsage{ | ||
| UsedBytes: usage.UsedBytes, | ||
| TotalBytes: usage.TotalBytes, | ||
| }, err | ||
| }, | ||
| } | ||
| sm.getDBSize = func() uint64 { | ||
| return sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage() | ||
| } | ||
| for _, opt := range opts { | ||
| opt(sm) | ||
| | @@ -210,7 +259,30 @@ func (sm *StorageManager) dbSize() uint64 { | |
| } | ||
| | ||
| func (sm *StorageManager) updateDiskUsage() { | ||
| sm.cachedDBSize.Store(sm.eventDB.Metrics().DiskSpaceUsage() + sm.decisionDB.Metrics().DiskSpaceUsage()) | ||
| sm.cachedDBSize.Store(sm.getDBSize()) | ||
| | ||
| if sm.getDiskUsageFailed.Load() { | ||
| // Skip GetDiskUsage under the assumption that | ||
| // it will always get the same error if GetDiskUsage ever returns one, | ||
| // such that it does not keep logging GetDiskUsage errors. | ||
| return | ||
| } | ||
| usage, err := sm.getDiskUsage() | ||
| if err != nil { | ||
| sm.logger.With(logp.Error(err)).Warn("failed to get disk usage") | ||
1pkg marked this conversation as resolved. Show resolved Hide resolved | ||
| sm.getDiskUsageFailed.Store(true) | ||
| Contributor There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: While I can't think of what could lead to transient failures, I am not sure about always failing on error bit - something to think about in a future PR. Member Author There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This gives me a headache as well. What should existing disk usage threshold checks perform when getDiskUsage has transient failures, should it use a stale number, or should it become unlimited? With this assumption of always failing, it simplifies the implementation. | ||
| sm.cachedDiskStat.used.Store(0) | ||
| sm.cachedDiskStat.total.Store(0) // setting total to 0 to disable any running disk usage threshold checks | ||
carsonip marked this conversation as resolved. Show resolved Hide resolved | ||
| return | ||
| } | ||
| sm.cachedDiskStat.used.Store(usage.UsedBytes) | ||
| sm.cachedDiskStat.total.Store(usage.TotalBytes) | ||
| } | ||
| | ||
| // diskUsed returns the actual used disk space in bytes. | ||
| // Not to be confused with dbSize which is specific to database. | ||
| func (sm *StorageManager) diskUsed() uint64 { | ||
| return sm.cachedDiskStat.used.Load() | ||
| } | ||
| | ||
| // runDiskUsageLoop runs a loop that updates cached disk usage regularly. | ||
| | @@ -338,33 +410,53 @@ func (sm *StorageManager) WriteSubscriberPosition(data []byte) error { | |
| return os.WriteFile(filepath.Join(sm.storageDir, subscriberPositionFile), data, 0644) | ||
| } | ||
| | ||
| // NewUnlimitedReadWriter returns a read writer with no storage limit. | ||
| // For testing only. | ||
| func (sm *StorageManager) NewUnlimitedReadWriter() StorageLimitReadWriter { | ||
| return sm.NewReadWriter(0) | ||
| } | ||
| | ||
| // NewReadWriter returns a read writer with storage limit. | ||
| func (sm *StorageManager) NewReadWriter(storageLimit uint64) StorageLimitReadWriter { | ||
| splitRW := SplitReadWriter{ | ||
| // NewReadWriter returns a read writer configured with storage limit and disk usage threshold. | ||
| func (sm *StorageManager) NewReadWriter(storageLimit uint64, diskUsageThreshold float64) RW { | ||
| var rw RW = SplitReadWriter{ | ||
| eventRW: sm.eventStorage.NewReadWriter(), | ||
| decisionRW: sm.decisionStorage.NewReadWriter(), | ||
| } | ||
| | ||
| dbStorageLimit := func() uint64 { | ||
| return storageLimit | ||
| } | ||
| if storageLimit == 0 { | ||
| sm.logger.Infof("setting database storage limit to unlimited") | ||
| } else { | ||
| sm.logger.Infof("setting database storage limit to %.1fgb", float64(storageLimit)) | ||
| // If db storage limit is set, only enforce db storage limit. | ||
| if storageLimit > 0 { | ||
| // dbStorageLimit returns max size of db in bytes. | ||
| // If size of db exceeds dbStorageLimit, writes should be rejected. | ||
| dbStorageLimit := func() uint64 { | ||
| return storageLimit | ||
| } | ||
| sm.logger.Infof("setting database storage limit to %0.1fgb", float64(storageLimit)/gb) | ||
| dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit) | ||
| rw = NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, rw) | ||
| return rw | ||
| } | ||
| | ||
| // To limit db size to storage_limit | ||
| dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit) | ||
| dbStorageLimitRW := NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, splitRW) | ||
| // DB storage limit is unlimited, enforce disk usage threshold if possible. | ||
| // Load whether getDiskUsage failed, as it was called during StorageManager initialization. | ||
| if sm.getDiskUsageFailed.Load() { | ||
| // Limit db size to fallback storage limit as getDiskUsage returned an error | ||
| dbStorageLimit := func() uint64 { | ||
| return dbStorageLimitFallback | ||
| } | ||
| sm.logger.Warnf("overriding database storage limit to fallback default of %0.1fgb as get disk usage failed", float64(dbStorageLimitFallback)/gb) | ||
| dbStorageLimitChecker := NewStorageLimitCheckerFunc(sm.dbSize, dbStorageLimit) | ||
| rw = NewStorageLimitReadWriter("database storage limit", dbStorageLimitChecker, rw) | ||
| return rw | ||
| } | ||
| | ||
| return dbStorageLimitRW | ||
| // diskThreshold returns max used disk space in bytes, not in percentage. | ||
| // If size of used disk space exceeds diskThreshold, writes should be rejected. | ||
| diskThreshold := func() uint64 { | ||
| return uint64(float64(sm.cachedDiskStat.total.Load()) * diskUsageThreshold) | ||
| } | ||
| // the total disk space could change in runtime, but it is still useful to print it out in logs. | ||
| sm.logger.Infof("setting disk usage threshold to %.2f of total disk space of %0.1fgb", diskUsageThreshold, float64(sm.cachedDiskStat.total.Load())/gb) | ||
| diskThresholdChecker := NewStorageLimitCheckerFunc(sm.diskUsed, diskThreshold) | ||
| rw = NewStorageLimitReadWriter( | ||
| fmt.Sprintf("disk usage threshold %.2f", diskUsageThreshold), | ||
| diskThresholdChecker, | ||
| rw, | ||
| ) | ||
carsonip marked this conversation as resolved. Show resolved Hide resolved | ||
| return rw | ||
| } | ||
| | ||
| // wrapNonNilErr only wraps an error with format if the error is not nil. | ||
| | ||
Uh oh!
There was an error while loading. Please reload this page.