温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

victoriaMetrics库布隆过滤器初始化及使用的方法

发布时间:2022-04-06 10:19:51 来源:亿速云 阅读:310 作者:iii 栏目:开发技术

本篇内容主要讲解“victoriaMetrics库布隆过滤器初始化及使用的方法”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“victoriaMetrics库布隆过滤器初始化及使用的方法”吧!

victoriaMetrics库布隆过滤器

概述

victoriaMetrics的vmstorage组件会接收上游传递过来的指标,在现实场景中,指标或瞬时指标的数量级可能会非常恐怖,如果不限制缓存的大小,有可能会由于cache miss而导致出现过高的slow insert。

为此,vmstorage提供了两个参数:maxHourlySeriesmaxDailySeries,用于限制每小时/每天添加到缓存的唯一序列。

唯一序列指表示唯一的时间序列,如metrics{label1="value1",label2="value2"}属于一个时间序列,但多条不同值的metrics{label1="value1",label2="value2"}属于同一条时间序列。victoriaMetrics使用如下方式来获取时序的唯一标识:

func getLabelsHash(labels []prompbmarshal.Label) uint64 {	bb := labelsHashBufPool.Get()	b := bb.B[:0]	for _, label := range labels {	b = append(b, label.Name...)	b = append(b, label.Value...)	}	h := xxhash.Sum64(b)	bb.B = b	labelsHashBufPool.Put(bb)	return h }

限速器的初始化

victoriaMetrics使用了一个类似限速器的概念,限制每小时/每天新增的唯一序列,但与普通的限速器不同的是,它需要在序列级别进行限制,即判断某个序列是否是新的唯一序列,如果是,则需要进一步判断一段时间内缓存中新的时序数目是否超过限制,而不是简单地在请求层面进行限制。

hourlySeriesLimiter = bloomfilter.NewLimiter(*maxHourlySeries, time.Hour) dailySeriesLimiter = bloomfilter.NewLimiter(*maxDailySeries, 24*time.Hour)

下面是新建限速器的函数,传入一个最大(序列)值,以及一个刷新时间。该函数中会:

  • 初始化一个限速器,限速器的最大元素个数为maxItems

  • 则启用了一个goroutine,当时间达到refreshInterval时会重置限速器

func NewLimiter(maxItems int, refreshInterval time.Duration) *Limiter {	l := &Limiter{	maxItems: maxItems,	stopCh:   make(chan struct{}),	}	l.v.Store(newLimiter(maxItems)) //1	l.wg.Add(1)	go func() {	defer l.wg.Done()	t := time.NewTicker(refreshInterval)	defer t.Stop()	for {	select {	case <-t.C:	l.v.Store(newLimiter(maxItems))//2	case <-l.stopCh:	return	}	}	}()	return l }

限速器只有一个核心函数Add,当vmstorage接收到一个指标之后,会(通过getLabelsHash计算该指标的唯一标识(h),然后调用下面的Add函数来判断该唯一标识是否存在于缓存中。

如果当前存储的元素个数大于等于允许的最大元素,则通过过滤器判断缓存中是否已经存在该元素;否则将该元素直接加入过滤器中,后续允许将该元素加入到缓存中。

func (l *Limiter) Add(h uint64) bool {	lm := l.v.Load().(*limiter)	return lm.Add(h) } func (l *limiter) Add(h uint64) bool {	currentItems := atomic.LoadUint64(&l.currentItems)	if currentItems >= uint64(l.f.maxItems) {	return l.f.Has(h)	}	if l.f.Add(h) {	atomic.AddUint64(&l.currentItems, 1)	}	return true }

上面的过滤器采用的是布隆过滤器,核心函数为HasAdd,分别用于判断某个元素是否存在于过滤器中,以及将元素添加到布隆过滤器中。

过滤器的初始化函数如下,bitsPerItem是个常量,值为16。bitsCount统计了过滤器中的总bit数,每个bit表示某个值的存在性。bits以64bit为单位的(后续称之为slot,目的是为了在bitsCount中快速检索目标bit)。计算bits时加上63的原因是为了四舍五入向上取值,比如当maxItems=1时至少需要1个unit64的slot。

func newFilter(maxItems int) *filter {	bitsCount := maxItems * bitsPerItem	bits := make([]uint64, (bitsCount+63)/64)	return &filter{	maxItems: maxItems,	bits:     bits,	} }

为什么bitsPerItem为16?这篇文章给出了如何计算布隆过滤器的大小。在本代码中,k为4(hashesCount),期望的漏失率为0.003(可以从官方的filter_test.go中看出),则要求总存储和总元素的比例为15,为了方便检索slot(64bit,为16的倍数),将之设置为16。

if p &gt; 0.003 {	t.Fatalf("too big false hits share for maxItems=%d: %.5f, falseHits: %d", maxItems, p, falseHits)	}

victoriaMetrics库布隆过滤器初始化及使用的方法

下面是过滤器的Add操作,目的是在过滤器中添加某个元素。Add函数中没有使用多个哈希函数来计算元素的哈希值,转而改变同一个元素的值,然后对相应的值应用相同的哈希函数,元素改变的次数受hashesCount的限制。

  • 获取过滤器的完整存储,并转换为以bit单位

  • 将元素h转换为byte数组,便于xxhash.Sum64计算

  • 后续将执行hashesCount次哈希,降低漏失率

  • 计算元素h的哈希

  • 递增元素h,为下一次哈希做准备

  • 取余法获取元素的bit范围

  • 获取元素所在的slot(即uint64大小的bit范围)

  • 获取元素所在的slot中的bit位,该位为1表示该元素存在,为0表示该元素不存在

  • 获取元素所在bit位的掩码

  • 加载元素所在的slot的数值

  • 如果w & mask结果为0,说明该元素不存在,

  • 将元素所在的slot(w)中的元素所在的bit位(mask)置为1,表示添加了该元素

  • 由于Add函数可以并发访问,因此bits[i]有可能被其他操作修改,因此需要通过重新加载(14)并通过循环来在bits[i]中设置该元素的存在性

func (f *filter) Add(h uint64) bool {	bits := f.bits	maxBits := uint64(len(bits)) * 64 //1	bp := (*[8]byte)(unsafe.Pointer(&h))//2	b := bp[:]	isNew := false	for i := 0; i < hashesCount; i++ {//3	hi := xxhash.Sum64(b)//4	h++ //5	idx := hi % maxBits //6	i := idx / 64 //7	j := idx % 64 //8	mask := uint64(1) << j //9	w := atomic.LoadUint64(&bits[i])//10	for (w & mask) == 0 {//11	wNew := w | mask //12	if atomic.CompareAndSwapUint64(&bits[i], w, wNew) {//13	isNew = true//14	break	}	w = atomic.LoadUint64(&bits[i])//14	}	}	return isNew }

看懂了Add函数,Has就相当简单了,它只是Add函数的缩减版,无需设置bits[i]

func (f *filter) Has(h uint64) bool {	bits := f.bits	maxBits := uint64(len(bits)) * 64	bp := (*[8]byte)(unsafe.Pointer(&h))	b := bp[:]	for i := 0; i < hashesCount; i++ {	hi := xxhash.Sum64(b)	h++	idx := hi % maxBits	i := idx / 64	j := idx % 64	mask := uint64(1) << j	w := atomic.LoadUint64(&bits[i])	if (w & mask) == 0 {	return false	}	}	return true }

到此,相信大家对“victoriaMetrics库布隆过滤器初始化及使用的方法”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI