微服务,链路追踪,opentracing+jaeger(六)

    技术2022-07-15  79

    参考文档

    opentracing详解:https://pjw.io/articles/2018/05/08/opentracing-explanations/ opentracing中文文档:https://wu-sheng.gitbooks.io/opentracing-io/content/ jaeger教程:https://pjw.io/articles/2018/05/18/jaeger-tutorial/ jaeger go语言文档:https://godoc.org/github.com/uber/jaeger-client-go/config#SamplerConfig

    目录

    参考文档 前言opentracingjaeger jaeger使用安装下载go包 本机栗子http栗子客户端服务端 grpc栗子客户端服务器

    前言

    在微服务开发中,链路追踪的重要性就不多bb了。

    opentracing

    OpenTracing 是一套标准,它通过提供平台无关、厂商无关的API,使得开发人员能够方便的添加(或更换)追踪系统的实现(我们在测试使用中是基本上通过两行代码的更改就可以在Zipkin和Jaeger之间切换)。OpenTracing提供了用于运营支撑系统的和针对特定平台的辅助程序库。程序库的具体信息请参考详细的规范。OpenTracing 已进入 CNCF (云原生计算基金会,著名的Kubernetes、gRPC和Prometheus等均孵化于此),正在为全球的分布式追踪,提供统一的概念和数据标准。

    OpenTracing中的Trace(调用链)通过归属于此调用链的Span来隐性的定义。一条Trace(调用链)可以被认为是一个由多个Span组成的有向无环图(DAG图)Span与Span的关系被命名为References。

    jaeger

    jaeger是优步(uber)开源的分布式链路追踪系统,随着jaeger加入CNCF,jaeger越发辉煌。所有本文采用jaeger。因为opentracing的特性,即便更换成zipkin系统,对于已经采用jaeger的程序只需要改变几行代码就能轻松实现转变。所以无需忧虑。jaeger 的存储是可插拔组件,目前支持 Cassandra、ElasticSearch 和 Kafka。分布式追踪系统发展很快,种类繁多,不管是jaeger合适zipkin还是其他的链路追踪系统,核心步骤一般有三个:代码埋点,数据存储、查询展示

    jaeger使用

    安装

    这里docker安装,更多细节参考官网

    docker run -d --name jaeger \ -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \ -p 5775:5775/udp \ -p 6831:6831/udp \ -p 6832:6832/udp \ -p 5778:5778 \ -p 16686:16686 \ -p 14268:14268 \ -p 9411:9411 \ jaegertracing/all-in-one:1.9

    访问安装安装主机ip加16686端口,如下图。

    下载go包

    "github.com/opentracing/opentracing-go" "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/config"

    本机栗子

    初始化jaeger连接

    func InitJaeger(service ,idaddr string) (opentracing.Tracer, io.Closer) { //声明一个jaeger配置文件,可以从yaml文件中加载 cfg := &config.Configuration{ ServiceName:service, Sampler: &config.SamplerConfig{ Type: "const", Param: 1, }, Reporter: &config.ReporterConfig{ LogSpans: true, LocalAgentHostPort:idaddr, }, } //连接远程jaeger服务 tracer, closer, err := cfg.NewTracer(config.Logger(jaeger.StdLogger)) if err != nil { panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err)) } //将这个tracer设置为全局tracer opentracing.SetGlobalTracer(tracer) return tracer, closer } func main(){ //初始化tracer连接 tracer,close := util.InitJaeger("wdjaeger","192.168.254.172:6831") defer close.Close() //用tracer生成一个span parentspan := tracer.StartSpan("span_test") //用这个span生成一个ctx上下文 pctx := opentracing.ContextWithSpan(context.Background(),parentspan) //程序运行结束时提交 defer parentspan.Finish() //第一个函数,延时一秒钟 func(ctx context.Context){ //从上下文中获取span childspan,_ := opentracing.StartSpanFromContext(ctx,"func001") defer func(){ //设置tag childspan.SetTag("funcname","test001") //函数运行结束时提交 childspan.Finish() }() time.Sleep(time.Second) }(pctx) //同上一个函数 func(ctx context.Context){ childspan,_ := opentracing.StartSpanFromContext(ctx,"func002") defer func(){ childspan.SetTag("funcname","test002") childspan.Finish() }() time.Sleep(time.Second) }(pctx) }

    通过浏览器查看 先搜索,然后点击右边对应链路查看 查看结果如图

    http栗子

    在分布式系统中,ctx无法直接传递,需要将ctx注入到请求中,但本质不变

    客户端将ctx注入到http的header,用tracer的Inject函数服务端将ctx从请求的header中取出,用tracer的Extract函数

    客户端

    这里使用fasthttp发送http请求,在高并发的http请求中,fasthttp包绝对是首选,不会使用的可以参考前面的关于fasthttp介绍的文章

    func main(){ tracer,close := util.InitJaeger("wdjaeger","192.168.254.172:6831") defer close.Close() span := tracer.StartSpan("root") defer span.Finish() //获取请求响应对象 //fasthttp在Acquire得到对象,使用完毕后,应该Release requ := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() requ.SetRequestURI("http://192.168.10.106:666/tractest") requ.Header.SetMethod("GET") head := HTTPHeadersCarrier{RequestHeader:&requ.Header} //将span上下文,注入到header中 //源码:carrier.(opentracing.TextMapWriter), //head会被断言成TextMapWriter接口,最终调用set设置到http请求头中 err := tracer.Inject(span.Context(),opentracing.HTTPHeaders,head) if err!=nil{ panic(err) } time.Sleep(time.Millisecond * 100) //向服务器发送请求 err = fasthttp.Do(requ,resp) if err!=nil{ panic(err) } time.Sleep(time.Millisecond * 100) } //重写HTTPHeadersCarrier,的set方法 type HTTPHeadersCarrier struct { *fasthttp.RequestHeader } // 重写注入赋值的方法 // tracer实际是在头部添加一个 uber-trace-id 字段, //内容是 623a22e85d81e55b:623a22e85d81e55b:0000000000000000:1 func (c HTTPHeadersCarrier) Set(key, val string) { h := c.RequestHeader fmt.Println(key,val) h.Add(key, val) }

    服务端

    服务采用gin框架将tracer功能封装在一个中间件里 func HttpJaegerTest(){ _,close := util.InitJaeger("wdjaeger","192.168.254.172:6831") defer close.Close() app := gin.Default() app.Use(TracerMiddle) app.GET("/tractest",func(c * gin.Context){ fmt.Println(c.Request.Header) time.Sleep(time.Millisecond * 500) }) app.Run("192.168.10.106:666") } //请求追踪中间件 func TracerMiddle(c * gin.Context){ tracer := opentracing.GlobalTracer() head := HTTPHeadersCarrier{Header:c.Request.Header} //从head中获取ctx上下文 ctx,err:= tracer.Extract(opentracing.HTTPHeaders,head) if err !=nil { fmt.Println("Error: ",err) } span := opentracing.StartSpan(c.Request.URL.String(),opentracing.ChildOf(ctx)) defer span.Finish() c.Next() } type HTTPHeadersCarrier struct { http.Header } // 为HTTPHeadersCarrier实现ForeachKey方法 func (c HTTPHeadersCarrier) ForeachKey(handler func(key, val string) error) error { for k, vals := range c.Header { for _, v := range vals { if err := handler(k, v); err != nil { return err } } } return nil }

    测试截图

    grpc栗子

    grpc传输ctx的方式稍微复杂,需要用过grpc的元数据和拦截器

    grpc的使用方法可以参考前面的文章:微服务,gRPC 一文全解(五)客户端在拦截器中,创建一个metadata,将ctx注入到metadata中,grpc可以传输metadata服务端在拦截器中,从ctx中获取metadata,然后将metadata解析为ctx使用

    客户端

    客户端实现的拦截器如下

    func WithTracerInterceptorClient(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) (err error) { var parentContext opentracing.SpanContext //先从context中获取原始的span parentSpan := opentracing.SpanFromContext(ctx) if parentSpan != nil { parentContext = parentSpan.Context() } tracer := opentracing.GlobalTracer() span := tracer.StartSpan(method, opentracing.ChildOf(parentContext)) defer span.Finish() //从context中获取metadata。md.(type) == map[string][]string md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.New(nil) } else { //如果对metadata进行修改,那么需要用拷贝的副本进行修改。(FromIncomingContext的注释) md = md.Copy() } //定义一个carrier,下面的Inject注入数据需要用到。carrier.(type) == map[string]string carrier := opentracing.TextMapCarrier{} //carrier := TextMapWriter{md} //将span的context信息注入到carrier中 e := tracer.Inject(span.Context(), opentracing.TextMap, carrier) if e != nil { fmt.Println("tracer Inject err,", e) } md = metadata.Join(md,metadata.New(carrier)) //创建一个新的context,把metadata附带上 ctx = metadata.NewOutgoingContext(ctx, md) return invoker(ctx, method, req, reply, cc, opts...) }

    在声明grpc的Dail时,添加拦截器

    conn,err := grpc.Dial(":666",grpc.WithInsecure(),grpc.WithUnaryInterceptor(control.WithTracerInterceptorClient))

    服务器

    声明拦截器如下

    //metadata.MD不存在ForeachKey成员方法,这里需要重新声明实现 type TextMapReader struct { metadata.MD } //读取metadata中的span信息 func (t TextMapReader) ForeachKey(handler func(key, val string) error) error { //不能是指针 for key, val := range t.MD { for _, v := range val { if err := handler(key, v); err != nil { return err } } } return nil } func WithTracerInterceptorServer(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { //从context中获取metadata。md.(type) == map[string][]string md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.New(nil) } else { //如果对metadata进行修改,那么需要用拷贝的副本进行修改。(FromIncomingContext的注释) md = md.Copy() } carrier := TextMapReader{md} tracer := opentracing.GlobalTracer() spanContext, e := tracer.Extract(opentracing.TextMap, carrier) if e != nil { fmt.Println("Extract err:", e) } span := tracer.StartSpan(info.FullMethod, opentracing.ChildOf(spanContext)) defer span.Finish() ctx = opentracing.ContextWithSpan(ctx, span) return handler(ctx, req) }

    在grpc创建server处添加拦截器

    gs := grpc.NewServer(grpc.UnaryInterceptor(WithTracerInterceptorServer))

    测试截图 未完待续。。。

    Processed: 0.014, SQL: 9