您的位置:  首页 > 技术杂谈 > 正文

Prometheus源码 - memSeries

2021-08-30 11:00 https://my.oschina.net/u/5084709/blog/5209796 AlanWang0202 次阅读 条评论

Prometheus源码 - memSeries

Prolegomena

Prometheus无疑是一款优秀的开源系统监控报警框架,在云原生的时代发挥着重要作用。它提供近实时的、基于动态云环境和容器微服务、服务以及应用程序的内省监控。同时也用于监控传统架构的资源。Fortunately,笔者每天的工作都会与Prometheus打交道,在使用过程中它体现的高效无不让人着迷。同时,笔者对于这款CNCF设计思想产生了浓厚的兴趣,这个框架是如何做到单一节点可以处理数以百万的监控指标,每秒处理数十万的数据点? 带着这种疑惑与兴趣,开始了prometheus设计思想和工程实践的探索。它的高效必然离不开一款高效的时序数据库的支持TSDB。本文会对TSDB的其中一个memSeries模块进行源码层面的剖析。借此机会,笔者也分享下看源码的心得,对于这种体量级别的应用,看其源码无异于自己置身扁舟漂泊在太平洋,无头绪,无方向,充满挑战。Efficiently,我们可以找一篇写的很有总结性质的好文章(Prometheus时序数据库-内存中的存储结构),然后阅读下该模块所涉及到的技术论文(Gorilla: A Fast, Scalable, In-Memory Time Series Database),这样再去撸源码就事半功倍了。

Introduction

笔者主要是围绕memSeries源码(prometheus/prometheus-main/tsdb/head.go)进行,顺序性的解读。因为还没有系统性的将整个工程源码都读完,而且这类resource很少,无法站在更高的角度将memSeries的方法具体作用和Prometheus实际使用完美的串联起来。Hence,本文主要讲memSeries源码实现过程和方法的效果。后续笔者会不断的解读Prometheus其他模块,这样一步步的将其各个模块串联起来,读者可以关注后续的update。

MemSeries Attributes

type memSeries struct {
   sync.RWMutex

   ref           uint64  
   lset          labels.Labels
   mmappedChunks []*mmappedChunk 
   headChunk     *memChunk
   chunkRange    int64
   firstChunkID  int

   nextAt        int64 // Timestamp at which to cu? the next chunk.
   sampleBuf     [4]sample
   pendingCommit bool // Whether there are samples waiting to be commi?ted to~ this series.

   app chunkenc.Appender // Current appender for the chunk.

   memChunkPool *sync.Pool

   txs *txRing
}

  1. ref :每接受一个新的时间序列(e.g. ht?p_request??_total{path="/", method="GET"},NTC. 时间序列=指标(e.g. h?tp_request_total) + 标签(k,v))

  2. lset:这个是识别这个series的标签集合。

    // 源码
    lset labels.Labels
    	| -> type Labels []Label
    		| -> type Label struct { Name, Value string }
    
  3. mmappedChunks

    type mmappedChunk struct {
       ref              uint64
       numSamples       uint16
       minTime, maxTime int64
    }
    

sample(t,v)是这个时间序列某个时间的采集数据经过gorilla压缩后的数据。sample是被存储到memchunk中,其中达到120个(默认15s采集一次数据,生成一个sample。Hence,120*15s=30min Full chunk),或者

chunkRange的两个小时(memSeries -> chunkRange 参数决定) 都可以叫做这个chunk是full(ref. Once the chunk fills till 120 samples (or) spans upto chunk/block range (let's call it chunkRange), which is 2h by default, a new chunk is cut and the old chunk is said to be "full". Link: https://ganeshvernekar.com/blog/prometheus-tsdb-the-head-block/)

这个也可以在源码中考究:

// tsdb/head.go
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {

->   const samplesPerChunk = 120

当 active chunk 写满sample后,就会使用 chunks.ChunkDiskMapper -> chunkDiskMapper.WriteChunk() 写入到磁盘中,同时生成 chunkRef,这个值 represent :该时间序列磁盘中的chunk 在内存中的映射mmappedChunks []*mmappedChunk 维护着该时间序列的所有的chunk。

源码探索:

func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper) {

    ...
   // 将full chunk 写入到磁盘
   chunkRef, err := chunkDiskMapper.WriteChunk(s.ref, s.headChunk.minTime, s.headChunk.maxTime, s.headChunk.chunk)

   // 建立内存映射 
   s.mmappedChunks = append(s.mmappedChunks, &mmappedChunk{
      ...
   })
   ...
}
  1. headChunk:

    type memChunk struct {
       chunk            chunkenc.Chunk
       minTime, maxTime int64
    }
    

headChunk 一般都是active chunk 一直有samples写入。

Relevant Methods:

// 将目前的headchunk 写入磁盘同时在内存中建立映射保存元数据,memSeries的指针在指向新的headChunk
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk{ ... }

// 将headChunk 写入磁盘 建立M-map映射
func (s *memSeries) mmapCurrentHeadChunk(chunkDiskMapper *chunks.ChunkDiskMapper){ ... }

那我们会疑惑一个问题:headChunk 满足什么条件才会触发memSeries指针指向新的headChunk,以及落盘和创建内存映射呢?这里,笔者画了一个flow chart. 关注两个指标即可

memSeries.chunkRange and const samplesPerChunk = 120

  1. chunkRange

    type memSeries struct {
     	...
       chunkRange    int64
       ...
    }
    

    它的作用可以看下该源码:

    func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
    
       | -> s.nextAt = rangeForTimestamp(mint, s.chunkRange)
       		| -> func rangeForTimestamp(t int64, width int64) (maxt int64) {
    				return (t/width)*width + width}
    }
    

​ 这个chunkRange的大小决定着 nextAt的值(default. 2H),这个值决定了新的headChunk什么时候被创建,也就是curChunk的什么时候full。

  1. firstChunkID : 因为每个memSeries中会有属于这个时间序列chunk的映射表mmappedChunks []*mmappedChunk,这个主要是是为了找到对应chunk(一般是on-disk chunk)的metadata

    type memSeries struct {
        ...
       mmappedChunks []*mmappedChunk
          |-> type mmappedChunk struct {
                    ...
    					}
    	...
       firstChunkID  int
        ...
    }
    

    实际应用:ix 表示chunk在mmappedChunks的索引,chunkID 是从1开始逐渐增长的。Hence,(id- s.firstChunkID)就是这个chunk在 mmappendChunks的索引。

    func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
    
       ix := id - s.firstChunkID  // chunk在mmappendChunks的索引
       ...
    }
    
  2. nextAt :下一个headChunk的开始时间。 可以看下面这段源码, 设置一个最低的限制,下一个chunk必须创建的时间,其实是受 const samplesPerChunk = 120(当sample 达到120个的时候回cutHeadChunk)chunkRange(default:2h) 影响的。

func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
	...
   s.nextAt = rangeForTimestamp(mint, s.chunkRange)
    		| -> func rangeForTimestamp(t int64, width int64) (maxt int64) {
					return (t/width)*width + width
				}
    ...
}
  1. sampleBuf

    type memSeries struct {
    	...
    	sampleBuf [4]sample
    	...
    }
    

通过源码查看 其实就是headChunk 保存最新的4个sample,对于有什么用处,目前看memSeries源码还没发现

func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
	...
   s.sampleBuf[0] = s.sampleBuf[1]
   s.sampleBuf[1] = s.sampleBuf[2]
   s.sampleBuf[2] = s.sampleBuf[3]
   s.sampleBuf[3] = sample{t: t, v: v}
   	...

}
  1. pendingCommit : OFF: Whether there are samples waiting to be committed to this series.

    这个字段在 func (a *headAppender) Commit() 这个方法中会涉及到,目前不展开讲,等下一个Head系列会提到。

  2. app : 其实是一个空接口,但是当初始化memSeries的时候,会实例化它。这个设计模式值得学习(其实一个工厂模型,但是在源码中比较难找。。。),

    // Appender adds sample pairs to a chunk.
    type Appender interface {
       Append(int64, float64)
    }
    

而它真正的实现是XOR算法模块,这个算法可以参考笔者另一篇文章:Gorilla Encoding. 在以后的Prometheus系列中,笔者会出一期关于XOR算法实现的分析锻炼下工程化落地能力。Anyway,memSeries如何使用这个接口的呢? 可以参考下面的源码片段

// XORChunk 结构体的这个方法返回一个xorAppender结构体
func (c *XORChunk) Appender() (Appender, error) {
   ...
   
   a := &xorAppender{
	...
   }
   ... 
  return a, err
}
// xorAppender 这个结构体 有一个method Append 这样就具体实现了 Appender interface{} 这个接口
func (a *xorAppender) Append(t int64, v float64) {
  	...
}

// 初始化chunk的时候 chunk 为 XORChunk,并使用XORChunk中的Appender()方法 返回一个xorAppender 结
// 构体,这个结构体具体实现了Append方法,并赋值给 s.app。
func (s *memSeries) cutNewHeadChunk(mint int64, chunkDiskMapper *chunks.ChunkDiskMapper) *memChunk {
  ...
   	s.headChunk = &memChunk{
      chunk:   chunkenc.NewXORChunk(), // 实例化 chunk为 XORChunk()
	  ...
   	}
  	...
  	app, err := s.headChunk.chunk.Appender()  // 实例化
	if err != nil {
		panic(err)
	}
	s.app = app  // 实例化 s.app
	return s.headChunk
  
}

所以它才会有如下这个操作:

// s.app.Append(t,v) 
func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
  ...
   s.app.Append(t, v)
  ...
}
  1. memChunkPool: 由于golang内建的GC机制会影响应用的性能,为了减少GC,golang提供了对象重用的机制,也就是sync.Pool对象池。 sync.Pool是可伸缩的,并发安全的。其大小仅受限于内存的大小,可以被看作是一个存放可重用对象的值的容器。 设计的目的是存放已经分配的但是暂时不用的对象,在需要用到的时候直接从pool中取。

    
    type memSeries struct {
    	...
    
    	memChunkPool *sync.Pool
    
    	...
    }
    
  2. txs: 事务ID记录的一个结构体. 这个又是另一个模块(isolation)的知识体系,本文不细说。

    type memSeries struct {
    	...
    
    	txs *txRing
         | —> type txRing struct {
    			txIDs     []uint64
    			txIDFirst int // Position of the first id in the ring.
    			txIDCount int // How many ids in the ring.
    		}
    
    	...
    }
    

MemSeries methods

当介绍memSeries结构体元素的时候,其实都穿插着讲了它的方法,所以这个版本会简单的介绍下几个方法的实现和作用。

  1. func (s *memSeries) cutNewHeadChunk(...){...} :当前的headChunk达到full chunk条件的时候,会使用该方法重新初始化一个新的headChunk,并将memSeries的headChunk指向该chunk。

  2. func (s *memSeries) mmapCurrentHeadChunk(...){...} : 将目前的headChunk 写入到磁盘中,同时会建立内存映射。内存映射主要是 mmappedChunks []*mmappedChunk 进行内存中存储。

  3. func (s *memSeries) chunk(id int, ...){...} : 根据id,找到想查找的chunk在内存映射中的索引,从而找到该chunk。可以看下源码,这里面还是有一些trick

    func (s *memSeries) chunk(id int, chunkDiskMapper *chunks.ChunkDiskMapper) (chunk *memChunk, garbageCollect bool, err error) {
    
       ix := id - s.firstChunkID  // ix是 该chunk在m-map中的索引. chunk的id 是从1开始逐渐增加的1,2,3,...
       if ix < 0 || ix > len(s.mmappedChunks) {
          return nil, false, storage.ErrNotFound
       }
       if ix == len(s.mmappedChunks) { // 查找的是active chunk 正在执行写入的headChunk
          if s.headChunk == nil {
             return nil, false, errors.New("invalid head chunk")
          }
          return s.headChunk, false, nil  // 由于是active chunk 所以不能被GC 回收。
       }
       chk, err := chunkDiskMapper.Chunk(s.mmappedChunks[ix].ref) // 查找的chunk已经落盘,根据m-map的ref 在磁盘中查找对应的chunk 这个落盘的chunk是没有mint,maxt 
       if err != nil {
          if _, ok := err.(*chunks.CorruptionErr); ok {
             panic(err)
          }
          return nil, false, err
       }
       mc := s.memChunkPool.Get().(*memChunk) // 这里是一个trick,从池子里拿一个memChunk内存空间 然后进行初始化,这个是并发安全的。
       mc.chunk = chk
       mc.minTime = s.mmappedChunks[ix].minTime
       mc.maxTime = s.mmappedChunks[ix].maxTime
        return mc, true, nil  // true: 表示该chunk使用后,可以被回收。
    }
    
  4. func (s *memSeries) truncateChunksBefore(mint int64) (removed int): 这个函数的作用是给一个时间,把这个时间点之前的chunk都从series中剔除掉。

    func (s *memSeries) truncateChunksBefore(mint int64) (removed int) {
       	// headchunk的最大时间小于给的这个时间段 说明 把这个series中所有的chunk都要清零。
        if s.headChunk != nil && s.headChunk.maxTime < mint { 
          removed = 1 + len(s.mmappedChunks)
          s.firstChunkID += removed
          s.headChunk = nil  // 清空
          s.mmappedChunks = nil // 清空
          return removed
       }
        // 判断每一个落盘的chunk中的时间 与 截断时间对比
       if len(s.mmappedChunks) > 0 {
          for i, c := range s.mmappedChunks { // 这个c其实不是chunk 只是磁盘某个chunk中在内存中的元数据
             if c.maxTime >= mint {
                break
             }
             removed = i + 1
          }
          s.mmappedChunks = append(s.mmappedChunks[:0], s.mmappedChunks[removed:]...)
          s.firstChunkID += removed
       }
       return removed
    }
    
  5. func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (...) :

    func (s *memSeries) append(t int64, v float64, appendID uint64, chunkDiskMapper *chunks.ChunkDiskMapper) (sampleInOrder, chunkCreated bool) {
    
       // 这个就是定义了 一个full chunk的一个条件,当有120个samples 就证明full chunk,如果设置的采集时间是每15s采集一次,那么一个full chunk(每一个时间序列的)需要15s * 120 = 30min
       const samplesPerChunk = 120
    
       c := s.head()
    
       if c == nil {
          if len(s.mmappedChunks) > 0 && s.mmappedChunks[len(s.mmappedChunks)-1].maxTime >= t {  // 该headChunk的最大时间都大于插入的时间t了,所以这个sample无法被append
    
             return false, false
          }
    
          // len(s.mmappedChunks) = 0 and headChunk == nil 那么就创建一个新的HeadChunk 起始时       //间是t
          //
          c = s.cutNewHeadChunk(t, chunkDiskMapper) 
          chunkCreated = true
       }
       numSamples := c.chunk.NumSamples()
    
    
       if c.maxTime >= t {
          return false, chunkCreated
       }
    
       if numSamples == samplesPerChunk/4 {
          s.nextAt = computeChunkEndTime(c.minTime, c.maxTime, s.nextAt) // 推断并设置下一个headChunk创建时间。这个函数有一个分母有一个+1操作,是为了防止分母为0。
       }
       if t >= s.nextAt {
          c = s.cutNewHeadChunk(t, chunkDiskMapper)
          chunkCreated = true
       }
       s.app.Append(t, v)
    
       c.maxTime = t
    
       s.sampleBuf[0] = s.sampleBuf[1]
       s.sampleBuf[1] = s.sampleBuf[2]
       s.sampleBuf[2] = s.sampleBuf[3]
       s.sampleBuf[3] = sample{t: t, v: v}
    
       if appendID > 0 { // 是否需要被隔离? 如果是appendID == 0 则不需要。 主要是在查询和插入的时候。
          s.txs.add(appendID)
       }
    
       return true, chunkCreated
    }
    
  6. func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator{...}

    func (s *memSeries) iterator(id int, isoState *isolationState, chunkDiskMapper *chunks.ChunkDiskMapper, it chunkenc.Iterator) chunkenc.Iterator {
       c, garbageCollect, err := s.chunk(id, chunkDiskMapper)  // 根据id 从 memseries中找到对应的c -> memchunk
       // TODO(fabxc): Work around! An error will be returns when a querier have retrieved a pointer to a
       // series's chunk, which got then garbage collected before it got
       // accessed.  We must ensure to not garbage collect as long as any
       // readers still hold a reference.
       if err != nil {
          return chunkenc.NewNopIterator()
       }
       defer func() {
          if garbageCollect {
             // Set this to nil so that Go GC can collect it after it has been used.
             // This should be done always at the end.
             c.chunk = nil
             s.memChunkPool.Put(c)
          }
       }()
    
       ix := id - s.firstChunkID  // ix: c 在 memseries中的索引值
    
       numSamples := c.chunk.NumSamples()  // c 有多少个numSamples
       stopAfter := numSamples
    
       if isoState != nil {
          totalSamples := 0    // Total samples in this series.
          previousSamples := 0 // Samples before this chunk.
    
          for j, d := range s.mmappedChunks {
             totalSamples += int(d.numSamples)
             if j < ix {
                previousSamples += int(d.numSamples)
             }
          }
    
          if s.headChunk != nil {
             totalSamples += s.headChunk.chunk.NumSamples()
          }
    
          // Removing the extra transactionIDs that are relevant for samples that
          // come after this chunk, from the total transactionIDs.
          appendIDsToConsider := s.txs.txIDCount - (totalSamples - (previousSamples + numSamples))  // 这个chunk 和它之前的所有 samples数量
    
          // Iterate over the appendIDs, find the first one that the isolation state says not
          // to return.
          it := s.txs.iterator()
          for index := 0; index < appendIDsToConsider; index++ {
             appendID := it.At()  // 没有初始化txs 所以就是postion=0  第一个位置的appendID
             if appendID <= isoState.maxAppendID { // Easy check first.
                if _, ok := isoState.incompleteAppends[appendID]; !ok {  // 没有检测到这个某个sample的完成操作
                   it.Next()
                   continue
                }
             }
             // Eq. index - previousSamples, 当小于0说明 index
             // 还没到目前chunk的上一个chunk或者到了上一个chunk但是没遍历完呢。 大于0 说明 index目前
             //curChunk上
             stopAfter = numSamples - (appendIDsToConsider - index)
    
             if stopAfter < 0 { // index还没遍历到 curChunk上
                stopAfter = 0 // Stopped in a previous chunk.
             }
             break
          }
       }
    
       if stopAfter == 0 {
          return chunkenc.NewNopIterator()  //NewNopIterator returns a new chunk iterator that does not hold any data.  目前index指针到了previous chunk末尾 或者curChunk开头,也就是stopAfter的pos所在,start = end ? 这样肯定返回一个没有数据的迭代器
       }
    
    
        // 以下代码是根据 传入的参数 it chunkenc.Iterator  来选择返回哪种类型的Iterator
       if id-s.firstChunkID < len(s.mmappedChunks) {
          if stopAfter == numSamples {
             return c.chunk.Iterator(it)  // index 到了curChunk的末尾,其中it 传入的object可以将Iterator实例化,
          }
    
           // 不同种类的Iterator
          if msIter, ok := it.(*stopIterator); ok {
             msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
             msIter.i = -1
             msIter.stopAfter = stopAfter
             return msIter
          }
          return &stopIterator{
             Iterator:  c.chunk.Iterator(it),
             i:         -1,
             stopAfter: stopAfter,
          }
       }
       // Serve the last 4 samples for the last chunk from the sample buffer
       // as their compressed bytes may be mutated by added samples.
       if msIter, ok := it.(*memSafeIterator); ok {
          msIter.Iterator = c.chunk.Iterator(msIter.Iterator)
          msIter.i = -1
          msIter.total = numSamples
          msIter.stopAfter = stopAfter
          msIter.buf = s.sampleBuf
          return msIter
       }
       return &memSafeIterator{
          stopIterator: stopIterator{
             Iterator:  c.chunk.Iterator(it),
             i:         -1,
             stopAfter: stopAfter,
          },
          total: numSamples,
          buf:   s.sampleBuf,
       }
    }
    

上述Iterator的关系

// Iterator interface
type Chunk interface {
   ...
   Iterator(Iterator) Iterator
    | ->  type Iterator interface {Next() bool,Seek(t int64) bool,
    	At() (int64, float64), Err() error}
}
type stopIterator struct {
	chunkenc.Iterator  
	i, stopAfter int
}
type memSafeIterator struct {
	stopIterator 
	total int 
	buf [4]sample}
}

Conclusion

通过阅读源码,笔者不仅学习到了这些大神的代码风格,体会了prometheus TSDB的设计思想(这种设计风格建议总结下来,以后可以尝试造轮子),其中笔者更是对memSeries模块在脑海里有自己的认识和理解。后续笔者将会不断解剖Prometheus源码,大家可以关注下。

  • 0
    感动
  • 0
    路过
  • 0
    高兴
  • 0
    难过
  • 0
    搞笑
  • 0
    无聊
  • 0
    愤怒
  • 0
    同情
热度排行
友情链接