您的位置:  首页 > 技术 > 数据库 > 正文

浪潮云溪分布式数据库 Tracing(二)—— 源码解析

2022-07-07 12:00 https://my.oschina.net/u/5148943/blog/5550323 浪潮云溪数据库 次阅读 条评论

按照【云溪数据库Tracing(一)】介绍的使用opentracing要求,本文着重介绍云溪数据库Tracing模块中是如何实现Span,SpanContexts和Tracer的。

Part 1 - Tracing 模块调用关系

1.1  Traincg模块包含的文件列表

Tracer.go :定义了opentracing 中的trace相关接口的实现。Tracer_span.go :定义了opentracing中的span 相关操作的实现。Tags.go :定义了 opentracing中关于tags的相关接口。Shadow.go :不是opentracing中的概念,这里主要实现与zipkin的通信,用于tracing 信息推送到外部的zipkin中。

1.2   各个文件之间的调用关系

在cluster_settings.go中会创建tracer,供全局使用,其他模块中使用这个Tracer实现span的创建和其他操作,例如设定span名称、设定tag 、增加log等操作。

 

Part 2 - Opentracing 

在云溪数据库中的实现

以下是只是列出了部分接口实现,并非全部。

2.1   Span 接口实现:

GetContext实现:API用于获取Span中的SpanContext,主要功能是先创建一个map[string]string类型的baggageCopy,span中的mu.Baggage 读出写入baggageCopy,创建新的spanContext,并且返回。

func (s *span) Context() opentracing.SpanContext {   s.mu.Lock()   defer s.mu.Unlock()   baggageCopy := make(map[string]string, len(s.mu.Baggage))   for k, v := range s.mu.Baggage {      baggageCopy[k] = v   }   sc := &spanContext{      spanMeta: s.spanMeta,      Baggage:  baggageCopy,   }   if s.shadowTr != nil {      sc.shadowTr = s.shadowTr      sc.shadowCtx = s.shadowSpan.Context()   }   if s.isRecording() {      sc.recordingGroup = s.mu.recordingGroup      sc.recordingType = s.mu.recordingType   }   return sc}

Finished实现:API用于结束一个Span的记录和追踪。

func (s *span) Finish() {   s.FinishWithOptions(opentracing.FinishOptions{})}

SetTag实现:用于向指定的Span添加Tag信息。

func (s *span) SetTag(key string, value interface{}) opentracing.Span {   return s.setTagInner(key, value, false /* locked */)}

Log实现:用于向指定的Span添加Log信息。

func (s *span) LogKV(alternatingKeyValues ...interface{}) {   fields, err := otlog.InterleavedKVToFields(alternatingKeyValues...)   if err != nil {      s.LogFields(otlog.Error(err), otlog.String("function", "LogKV"))      return   }   s.LogFields(fields...)}

SetBaggageItem实现:用于向指定的Span增加Baggage信息,主要是用于跨进程追踪使用。

func (s *span) SetBaggageItem(restrictedKey, value string) opentracing.Span {   s.mu.Lock()   defer s.mu.Unlock()   return s.setBaggageItemLocked(restrictedKey, value)}

BaggageItem实现:用于获取指定的Baggage信息。

func (s *span) BaggageItem(restrictedKey string) string {   s.mu.Lock()   defer s.mu.Unlock()   return s.mu.Baggage[restrictedKey]}

SetOperationName实现:用于设定Span 的名称。

func (s *span) SetOperationName(operationName string) opentracing.Span {
 if s.shadowTr != nil {
  s.shadowSpan.SetOperationName(operationName)
 }
 s.operation = operationName
 return s
}

Tracer实现:用于获取Span属于哪个Tracer。

// Tracer is part of the opentracing.Span interface.func (s *span) Tracer() opentracing.Tracer { return s.tracer}

2.2   SpanContext 接口实现:

ForeachBaggageItem实现:用于遍历spanContext中的baggage信息。

func (sc *spanContext) ForeachBaggageItem(handler func(k, v string) bool) {   for k, v := range sc.Baggage {      if !handler(k, v) {         break      }   }}

2.3   Tracer接口实现:

Inject实现:用于向carrier中注入SpanContext信息

// Inject is part of the opentracing.Tracer interface.func (t *Tracer) Inject(   osc opentracing.SpanContext, format interface{}, carrier interface{},) error { ……   // We onlysupport the HTTPHeaders/TextMap format.   if format != opentracing.HTTPHeaders && format != opentracing.TextMap {      return opentracing.ErrUnsupportedFormat   }   mapWriter, ok := carrier.(opentracing.TextMapWriter)   if !ok {      return opentracing.ErrInvalidCarrier   }   sc, ok := osc.(*spanContext)   if !ok {      return opentracing.ErrInvalidSpanContext   }   mapWriter.Set(fieldNameTraceID, strconv.FormatUint(sc.TraceID, 16))   mapWriter.Set(fieldNameSpanID, strconv.FormatUint(sc.SpanID, 16))   for k, v := range sc.Baggage {      mapWriter.Set(prefixBaggage+k, v)   }  ……   return nil}

Extract实现:用于从carrier中抽取出SpanContext信息。

func (t *Tracer) Extract(format interface{}, carrier interface{}) (opentracing.SpanContext, error) {   // We onlysupport the HTTPHeaders/TextMap format.   if format != opentracing.HTTPHeaders && format != opentracing.TextMap {      return noopSpanContext{}, opentracing.ErrUnsupportedFormat   }   mapReader, ok := carrier.(opentracing.TextMapReader)   if !ok {      return noopSpanContext{}, opentracing.ErrInvalidCarrier   }   var sc spanContext ……   err :=mapReader.ForeachKey(func(k, v string) error {      switch k = strings.ToLower(k); k {      case fieldNameTraceID:         var err error         sc.TraceID, err = strconv.ParseUint(v, 16, 64)         if err != nil {            return opentracing.ErrSpanContextCorrupted         }      case fieldNameSpanID:         var err error         sc.SpanID, err = strconv.ParseUint(v, 16, 64)         if err != nil {            return opentracing.ErrSpanContextCorrupted         }      case fieldNameShadowType:         shadowType = v      default:         if strings.HasPrefix(k, prefixBaggage) {            if sc.Baggage == nil {               sc.Baggage = make(map[string]string)            }            sc.Baggage[strings.TrimPrefix(k, prefixBaggage)] = v         } else if strings.HasPrefix(k, prefixShadow) {            if shadowCarrier == nil {               shadowCarrier = make(opentracing.TextMapCarrier)            }            // We build ashadow textmap with the original shadow keys.            shadowCarrier.Set(strings.TrimPrefix(k, prefixShadow), v)         }      }      return nil   })   if err != nil {      return noopSpanContext{}, err   }   if sc.TraceID == 0 &&sc.SpanID == 0 {      return noopSpanContext{}, nil   }   ……   return &sc, nil}

StartSpan接口实现:用于创建一个新的Span,可根据传入不同opts来实现不同Span的初始化。

func (t *Tracer) StartSpan(   operationName string, opts ...opentracing.StartSpanOption,) opentracing.Span {   // Fast paths toavoid the allocation of StartSpanOptions below when tracing   // is disabled: if we have no optionsor a single SpanReference (the common   // case) with a noop context, return anoop span now.   if len(opts) == 1 {      if o, ok := opts[0].(opentracing.SpanReference); ok {         if IsNoopContext(o.ReferencedContext) {            return &t.noopSpan         }      }   }   shadowTr := t.getShadowTracer()   ……   return s}

2.4   noop span 实现:

noop span实现:使监控代码不依赖Tracer和Span的返回值,防止程序异常退出。

type noopSpan struct {   tracer *Tracer}var _ opentracing.Span = &noopSpan{}func (n *noopSpan) Context() opentracing.SpanContext                       { return noopSpanContext{} }func (n *noopSpan) BaggageItem(key string) string                          { return "" }func (n *noopSpan) SetTag(key string, value interface{}) opentracing.Span  { return n }func (n *noopSpan) Finish()                                               {}func (n *noopSpan) FinishWithOptions(opts opentracing.FinishOptions)      {}func (n *noopSpan) SetOperationName(operationName string) opentracing.Span { return n }func (n *noopSpan) Tracer() opentracing.Tracer                             { return n.tracer }func (n *noopSpan) LogFields(fields ...otlog.Field)                        {}func (n *noopSpan) LogKV(keyVals ...interface{})                           {}func (n *noopSpan) LogEvent(event string)                                 {}func (n *noopSpan) LogEventWithPayload(event string, payload interface{})  {}func (n *noopSpan) Log(data opentracing.LogData)                           {}func (n *noopSpan) SetBaggageItem(key, val string) opentracing.Span {   if key == Snowball {      panic("attempting to set Snowball on a noop span; use the Recordable optionto StartSpan")   }   return n}

 

Part3 - 云溪数据库中

Opentracing 简单使用示例

3.1     开启Tracer Recording测试

云溪数据库中 开始创建的span均是no operator span,需要手动调用StartRecording,将span转换为可record状态,才能正常对span进行操作。

func TestTracerRecording(t *testing.T) { tr := NewTracer() noop1 := tr.StartSpan("noop") if _, noop := noop1.(*noopSpan); !noop {  t.Error("expected noop span") } noop1.LogKV("hello", "void") noop2 := tr.StartSpan("noop2", opentracing.ChildOf(noop1.Context())) if _, noop := noop2.(*noopSpan); !noop {  t.Error("expected noop child span") } noop2.Finish() noop1.Finish() s1 := tr.StartSpan("a", Recordable) if _, noop := s1.(*noopSpan); noop {  t.Error("Recordable (but not recording) span should not be noop") } if !IsBlackHoleSpan(s1) {  t.Error("Recordable span should be black hole") } // Unless recording is actually started, child spans are still noop. noop3 := tr.StartSpan("noop3", opentracing.ChildOf(s1.Context())) if _, noop := noop3.(*noopSpan); !noop {  t.Error("expected noop child span") } noop3.Finish() s1.LogKV("x", 1) StartRecording(s1, SingleNodeRecording) s1.LogKV("x", 2) s2 := tr.StartSpan("b", opentracing.ChildOf(s1.Context())) if IsBlackHoleSpan(s2) {  t.Error("recording span should not be black hole") } s2.LogKV("x", 3) if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:   tags: unfinished=   x: 2  span b:   tags: unfinished=   x: 3 `); err != nil {  t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(s2), `  span b:   tags: unfinished=   x: 3 `); err != nil {  t.Fatal(err) } s3 := tr.StartSpan("c", opentracing.FollowsFrom(s2.Context())) s3.LogKV("x", 4) s3.SetTag("tag", "val") s2.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:   tags: unfinished=   x: 2  span b:   x: 3  span c:   tags: tag=val unfinished=   x: 4 `); err != nil {  t.Fatal(err) } s3.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:      tags: unfinished=   x: 2  span b:   x: 3  span c:   tags: tag=val   x: 4 `); err != nil {  t.Fatal(err) } StopRecording(s1) s1.LogKV("x", 100) if err := TestingCheckRecordedSpans(GetRecording(s1), ``); err != nil {  t.Fatal(err) } // The child span is still recording. s3.LogKV("x", 5) if err := TestingCheckRecordedSpans(GetRecording(s3), `  span c:   tags: tag=val   x: 4   x: 5 `); err != nil {  t.Fatal(err) } s1.Finish()}

3.2     创建childSpan 测试

测试StartChildSpan,根据已有span创建出一个新的span,为已有span的子span。

func TestStartChildSpan(t *testing.T) { tr := NewTracer() sp1 := tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 := StartChildSpan("child", sp1, nil /* logTags */, false /*separateRecording*/) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), `  span parent:   span child: `); err != nil {  t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 = StartChildSpan("child", sp1, nil /* logTags */, true /*separateRecording*/) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), `  span parent: `); err != nil {  t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(sp2), `  span child: `); err != nil {  t.Fatal(err) } sp1 = tr.StartSpan("parent", Recordable) StartRecording(sp1, SingleNodeRecording) sp2 = StartChildSpan(  "child", sp1, logtags.SingleTagBuffer("key", "val"), false, /*separateRecording*/ ) sp2.Finish() sp1.Finish() if err := TestingCheckRecordedSpans(GetRecording(sp1), `  span parent:   span child:    tags: key=val `); err != nil {  t.Fatal(err) }}

3.3     跨进程追踪测试

测试跨进程追踪功能,主要是测试inject接口和 extract 接口,Inject用于向carrier中注入SpanContext信息,Extract用于从carrier中抽取出SpanContext 信息。

 func TestTracerInjectExtract(t *testing.T) { tr := NewTracer() tr2 := NewTracer() // Verify that noop spans become noop spans on the remote side. noop1 := tr.StartSpan("noop") if _, noop := noop1.(*noopSpan); !noop {  t.Fatalf("expected noop span: %+v", noop1) } carrier := make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(noop1.Context(), opentracing.HTTPHeaders, carrier); err != nil {  t.Fatal(err) } if len(carrier) != 0 {  t.Errorf("noop span has carrier: %+v", carrier) } wireContext, err := tr2.Extract(opentracing.HTTPHeaders, carrier) if err != nil {  t.Fatal(err) } if _, noopCtx := wireContext.(noopSpanContext); !noopCtx {  t.Errorf("expected noop context: %v", wireContext) } noop2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) if _, noop := noop2.(*noopSpan); !noop {  t.Fatalf("expected noop span: %+v", noop2) } noop1.Finish() noop2.Finish() // Verify that snowball tracing is propagated and triggers recording on the // remote side. s1 := tr.StartSpan("a", Recordable) StartRecording(s1, SnowballRecording) carrier = make(opentracing.HTTPHeadersCarrier) if err := tr.Inject(s1.Context(), opentracing.HTTPHeaders, carrier); err != nil {  t.Fatal(err) } wireContext, err = tr2.Extract(opentracing.HTTPHeaders, carrier) if err != nil {  t.Fatal(err) } s2 := tr2.StartSpan("remote op", opentracing.FollowsFrom(wireContext)) // Compare TraceIDs trace1 := s1.Context().(*spanContext).TraceID trace2 := s2.Context().(*spanContext).TraceID if trace1 != trace2 {  t.Errorf("TraceID doesn't match: parent %d child %d", trace1, trace2) } s2.LogKV("x", 1) s2.Finish() // Verify that recording was started automatically. rec := GetRecording(s2) if err := TestingCheckRecordedSpans(rec, `  span remote op:   tags: sb=1   x: 1 `); err != nil {  t.Fatal(err) } if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:   tags: sb=1 unfinished= `); err != nil {  t.Fatal(err) } if err := ImportRemoteSpans(s1, rec); err != nil {  t.Fatal(err) } s1.Finish() if err := TestingCheckRecordedSpans(GetRecording(s1), `  span a:   tags: sb=1  span remote op:   tags: sb=1   x: 1 `); err != nil {  t.Fatal(err) }}

                                                                                                               

展开阅读全文                                    
  • 0
    感动
  • 0
    路过
  • 0
    高兴
  • 0
    难过
  • 0
    搞笑
  • 0
    无聊
  • 0
    愤怒
  • 0
    同情
热度排行
友情链接