GRPC metadata的使用

metadata 使用

1.修改 helloworld.proto

syntax = "proto3";

option go_package = "./helloworld";

package helloworld;

service Greeter {
  // 普通调用
  rpc UnaryEcho (HelloRequest) returns (HelloReply) {}
  // 服务流调用
  rpc ServerStreamingEcho(HelloRequest) returns (stream HelloReply) {}
  // 客户端流调用
  rpc ClientStreamingEcho(stream HelloRequest) returns (HelloReply) {}
  // 双向流调用
  rpc BidirectionalStreamingEcho(stream HelloRequest) returns (stream HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

2.普通调用metadata数据使用

2.1服务端代码

func (s *server) UnaryEcho(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    fmt.Println("---UnaryEcho---")

    defer func() {
        trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        grpc.SetTrailer(ctx, trailer)
    }()

    md, ok := metadata.FromIncomingContext(ctx)

    if !ok {
        return nil, status.Errorf(codes.DataLoss, "无法获取元数据")
    }

    if t, ok := md["timestamp"]; ok {
        fmt.Println("timestamp from metadata:")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    }

    header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    grpc.SendHeader(ctx, header)

    fmt.Printf("已接受到的请求:%v,发送响应\n", in)

    return &pb.HelloReply{Message: "Hello again " + in.GetName()}, nil
}
  • 1.在defer中调用SetTrailer;会在grpc关闭时在发送metadata数据,可以调用多次,多次调用会合并metadata数据

  • 2.调用FromIncomingContext;从context中解析出client请求时携带的metadata数据

  • 3.SendHeader发送metadata到客户端,只能调用一次

  • 4.返回数据到客户端

2.2客户端代码

func unaryCallWithMetadata(c pb.GreeterClient) {
    fmt.Println("--- unaryCall ---")

    // 创建metadata到context中.
    md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // 使用metadata的上下文创建RPC
    var header, trailer metadata.MD
    r, err := c.UnaryEcho(ctx, &pb.HelloRequest{Name: "unaryCall"}, grpc.Header(&header), grpc.Trailer(&trailer))
    if err != nil {
        log.Fatalf("调用UnaryEcho失败:%v", err)
    }

    if t, ok := header["timestamp"]; ok {
        fmt.Printf("timestamp from header:\n")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }

    if l, ok := header["location"]; ok {
        fmt.Printf("location from header:\n")
        for i, e := range l {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要location,但是header中不存在location")
    }
    fmt.Println("response:")
    fmt.Printf(" - %s\n", r.Message)

    if t, ok := trailer["timestamp"]; ok {
        fmt.Printf("timestamp from trailer:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }
}
  • 1.NewOutgoingContext 将创建的metadata数据放入context中,在rpc调用时通过context将携带的metadata发送给服务端

  • 2.创建headertrailer用于rpc调用完成后,得到服务端返回的metadata数据,header是服务端刚开始调用时填充的数据,trailer是服务端调用完成后填充的数据

整个流程

    1. 客户端调用metadata.NewOutgoingContextmetadata填充到context中;然后调用服务端方法c.UnaryEcho,传入context
    1. 服务端方法被调用时,调用metadata.FromIncomingContextcontext中拿到metadata数据
    1. 服务端调用grpc.SendHeader,发送的Header携带metadata数据
    1. 服务端处理完毕返回时,执行defer,调用grpc.SetTrailer,设置最后的metadata数据
    1. 客户端收到返回的数据时,会同时拿到HeaderTrailer,以及内部携带的metadata

3.服务端Streammetadata数据使用

3.1 服务端代码

func (s *server) ServerStreamingEcho(in *pb.HelloRequest, stream pb.Greeter_ServerStreamingEchoServer) error {
    fmt.Printf("--- ServerStreamingEcho ---\n")

    defer func() {
        trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        stream.SetTrailer(trailer)
    }()

    md, ok := metadata.FromIncomingContext(stream.Context())
    if !ok {
        return status.Errorf(codes.DataLoss, "ServerStreamingEcho: 无法获取metadata")
    }
    if t, ok := md["timestamp"]; ok {
        fmt.Printf("timestamp from metadata:\n")
        for i, e := range t {
            fmt.Printf("%d.%s\n", i, e)
        }
    }

    header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    stream.SendHeader(header)

    fmt.Printf("收到的请求: %v\n", in)

    for i := 0; i < 10; i++ {
        fmt.Printf("echo message %v\n", in.Name)

        err := stream.Send(&pb.HelloReply{Message: "Hello again " + in.GetName()})
        if err != nil {
            return err
        }
    }

    return nil
}
  • 与普通函数调用流程一致,只是将grpc.全部换成stream.

3.2 客户端代码

    fmt.Printf("--- server streaming ---\n")

    // 创建metadata到context中.
    md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    stream, err := c.ServerStreamingEcho(ctx, &pb.HelloRequest{Name: "serverStreamingWithMetadata"})
    if err != nil {
        log.Fatalf("调用ServerStreamingEcho失败: %v", err)
    }

    header, err := stream.Header()
    if err != nil {
        log.Fatalf("无法从stream中获取header: %v", err)
    }

    if t, ok := header["timestamp"]; ok {
        fmt.Printf("timestamp from header:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }

    if l, ok := header["location"]; ok {
        fmt.Printf("location from header:\n")
        for i, e := range l {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要location,但是header中不存在location")
    }

    // 读取所有的responses
    fmt.Printf("response:\n")
    var rpcStatus error
    for {
        r, err := stream.Recv()
        if err != nil {
            rpcStatus = err
            break
        }

        fmt.Printf(" - %s\n", r.Message)
    }

    if rpcStatus != io.EOF {
        log.Fatalf("failed to finish server streaming: %v", rpcStatus)
    }

    trailer := stream.Trailer()

    if t, ok := trailer["timestamp"]; ok {
        fmt.Printf("timestamp from trailer:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("需要timestamp,但header中不存在timestamp")
    }

整个流程

    1. 客户端客户端调用metadata.NewOutgoingContextmetadata放入context中;调用服务端方法c.ServerStreamingEcho,传入context
    1. 服务端方法被调用,使用metadata.FromIncomingContext,取出context中的metadata
    1. 服务端调用stream.SendHeader,发送Header中携带metadata,这个方法只能调用一次
    1. 客户端调用stream.Header(),然后从header中拿到metadata
    1. 服务端所有stream处理完毕,执行defer调用stream.SetTrailer,设置Trailer中的metadata
    1. 客户端接受完服务端所有的stream后,调用stream.Trailer(),从中拿到最后的metadata

4.客户端Streammetadata数据使用

4.1 服务端代码

func (s *server) ClientStreamingEcho(stream pb.Greeter_ClientStreamingEchoServer) error {
    fmt.Printf("--- ClientStreamingEcho ---\n")

    defer func() {
        trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        stream.SetTrailer(trailer)
    }()

    md, ok := metadata.FromIncomingContext(stream.Context())
    if !ok {
        return status.Errorf(codes.DataLoss, "ClientStreamingEcho: failed to get metadata")
    }
    if t, ok := md["timestamp"]; ok {
        fmt.Printf("timestamp from metadata:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    }

    header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    stream.SendHeader(header)

    // Read requests and send responses.
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            fmt.Printf("echo last received message\n")
            return stream.SendAndClose(&pb.HelloReply{Message: "Hello again " + in.GetName()})
        }

        fmt.Printf("request received: %v, building echo\n", in)
        if err != nil {
            return err
        }
    }
}

4.2 客户端代码

func clientStreamWithMetadata(c pb.GreeterClient) {
    fmt.Printf("--- client streaming ---\n")
    // Create metadata and context.
    md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // Make RPC using the context with the metadata.
    stream, err := c.ClientStreamingEcho(ctx)
    if err != nil {
        log.Fatalf("ClientStreamingEcho 调用失败: %v\n", err)
    }

    // Read the header when the header arrives.
    header, err := stream.Header()
    if err != nil {
        log.Fatalf("failed to get header from stream: %v", err)
    }
    // Read metadata from server's header.
    if t, ok := header["timestamp"]; ok {
        fmt.Printf("timestamp from header:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("timestamp expected but doesn't exist in header")
    }

    if l, ok := header["location"]; ok {
        fmt.Printf("location from header:\n")
        for i, e := range l {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("location expected but doesn't exist in header")
    }

    // Send all requests to the server.
    for i := 0; i < 10; i++ {
        if err := stream.Send(&pb.HelloRequest{Name: "clientStreamWithMetadata"}); err != nil {
            log.Fatalf("failed to send streaming: %v\n", err)
        }
    }

    // Read the response.
    r, err := stream.CloseAndRecv()
    if err != nil {
        log.Fatalf("failed to CloseAndRecv: %v\n", err)
    }
    fmt.Printf("response:\n")
    fmt.Printf(" - %s\n\n", r.Message)

    // Read the trailer after the RPC is finished.
    trailer := stream.Trailer()
    // Read metadata from server's trailer.
    if t, ok := trailer["timestamp"]; ok {
        fmt.Printf("timestamp from trailer:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("timestamp expected but doesn't exist in trailer")
    }
}

整个流程

    1. 客户端客户端调用metadata.NewOutgoingContextmetadata放入context中;接着调用服务端方法c.ClientStreamingEcho(ctx),并且传入context
    1. 服务端方法被调用,使用metadata.FromIncomingContext(stream.Context()),从context取到metadata
    1. 服务端调用stream.SendHeader,发送带有metadataHeader给客户端
    1. 客户端调用stream.Header(),解析Header中的metadata
    1. 服务端开始接受处理客户端发送的stream信息,处理完成后,运行defer调用stream.SetTrailer(trailer),设置Trailermetadata
    1. 客户端收到服务端处理完stream的返回后,调用stream.Trailer(),拿到最后放在Trailermetadata

5.双向Streammetadata数据使用

5.1 服务端代码

func (s *server) BidirectionalStreamingEcho(stream pb.Greeter_BidirectionalStreamingEchoServer) error {
    fmt.Printf("--- BidirectionalStreamingEcho ---\n")

    defer func() {
        trailer := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
        stream.SetTrailer(trailer)
    }()

    // Read metadata from client.
    md, ok := metadata.FromIncomingContext(stream.Context())
    if !ok {
        return status.Errorf(codes.DataLoss, "BidirectionalStreamingEcho: failed to get metadata")
    }

    if t, ok := md["timestamp"]; ok {
        fmt.Printf("timestamp from metadata:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    }

    // Create and send header.
    header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(time.StampNano)})
    stream.SendHeader(header)

    // Read requests and send responses.
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        fmt.Printf("request received %v, sending echo\n", in)
        if err := stream.Send(&pb.HelloReply{Message: "Hello again " + in.GetName()}); err != nil {
            return err
        }
    }
}

5.2 客户端代码

func bidirectionalWithMetadata(c pb.GreeterClient) {
    fmt.Printf("--- bidirectional ---\n")
    // Create metadata and context.
    md := metadata.Pairs("timestamp", time.Now().Format(time.StampNano))
    ctx := metadata.NewOutgoingContext(context.Background(), md)

    // Make RPC using the context with the metadata.
    stream, err := c.BidirectionalStreamingEcho(ctx)
    if err != nil {
        log.Fatalf("failed to call BidirectionalStreamingEcho: %v\n", err)
    }

    go func() {
        // Read the header when the header arrives.
        header, err := stream.Header()
        if err != nil {
            log.Fatalf("failed to get header from stream: %v", err)
        }
        // Read metadata from server's header.
        if t, ok := header["timestamp"]; ok {
            fmt.Printf("timestamp from header:\n")
            for i, e := range t {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("timestamp expected but doesn't exist in header")
        }
        if l, ok := header["location"]; ok {
            fmt.Printf("location from header:\n")
            for i, e := range l {
                fmt.Printf(" %d. %s\n", i, e)
            }
        } else {
            log.Fatal("location expected but doesn't exist in header")
        }

        // Send all requests to the server.
        for i := 0; i < 10; i++ {
            if err := stream.Send(&pb.HelloRequest{Name: "clientStreamWithMetadata"}); err != nil {
                log.Fatalf("failed to send streaming: %v\n", err)
            }
        }
        stream.CloseSend()
    }()

    // Read all the responses.
    var rpcStatus error
    fmt.Printf("response:\n")
    for {
        r, err := stream.Recv()
        if err != nil {
            rpcStatus = err
            break
        }
        fmt.Printf(" - %s\n", r.Message)
    }
    if rpcStatus != io.EOF {
        log.Fatalf("failed to finish server streaming: %v", rpcStatus)
    }

    // Read the trailer after the RPC is finished.
    trailer := stream.Trailer()
    // Read metadata from server's trailer.
    if t, ok := trailer["timestamp"]; ok {
        fmt.Printf("timestamp from trailer:\n")
        for i, e := range t {
            fmt.Printf(" %d. %s\n", i, e)
        }
    } else {
        log.Fatal("timestamp expected but doesn't exist in trailer")
    }

}

整个流程

    1. 客户端客户端调用metadata.NewOutgoingContextmetadata放入context中;接着调用服务端方法c.BidirectionalStreamingEcho(ctx),并且传入context
    1. 服务端方法被调用,使用metadata.FromIncomingContext(stream.Context()),从context取到metadata
    1. 服务端调用stream.SendHeader,发送带有metadataHeader给客户端
    1. 客户端开启一个协程,调用stream.Header(),解析Header中的metadata,并且开始发送stream到服务端
    1. 服务端接收到客户端发送的stream数据,并通过stream响应数据到客户端,接收到客户端的stream.CloseSend()后,运行defer调用stream.SetTrailer(trailer),设置Trailermetadata
    1. 客户端处理服务端发送的stream的返回后,调用stream.Trailer(),拿到最后放在Trailermetadata

源码

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,402评论 6 499
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,377评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,483评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,165评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,176评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,146评论 1 297
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,032评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,896评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,311评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,536评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,696评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,413评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,008评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,659评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,815评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,698评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,592评论 2 353

推荐阅读更多精彩内容