gPRC

寒江蓑笠翁大约 88 分钟

gPRC

远程过程调用rpc应该是微服务当中必须要学习的一个点了,在学习的过程中会遇到各式各样的rpc框架,不过在go这个领域,几乎所有的rpc框架都是基于gRPC的,并且它还成为了云原生领域的一个基础协议,为什么选择它,官方如下回答:

gRPC 是一个现代化的开源高性能远程过程调用(Remote Process Call,RPC)框架,可以在任何环境中运行。它可以通过可插拔的负载平衡、跟踪、健康检查和身份验证支持,有效地连接数据中心内和数据中心之间的服务。它还适用于连接设备、移动应用程序和浏览器到后端服务的最后一英里分布式计算。

官方网址:gRPCopen in new window

官方文档:Documentation | gRPCopen in new window

gRPC技术教程:Basics tutorial | Go | gRPCopen in new window

ProtocBuf官网:Reference Guides | Protocol Buffers Documentation (protobuf.dev)open in new window

它也是CNCF基金会下一个的开源项目,CNCF全名CLOUD NATIVE COMPUTING FOUNDATION,译名云原生计算基金会

特点

简单的服务定义

使用Protocol Buffers 定义服务,这是一个强大的二进制序列化工具集和语言。

启动和扩容都十分迅捷

只需一行代码即可安装运行时和开发环境,仅需几秒钟既可以扩张到每秒数百万个RPC

跨语言,跨平台

根据不同平台不同语言自动生成客户端和服务端的服务存根

双向流和集成授权

基于HTTP/2的双向流和可插拔的认证授权

虽然GRPC是语言无关的,但是本站的内容大部分都是go相关的,所以本文也会使用go作为主要语言进行讲解,后续用到的pb编译器和生成器如果是其他语言的使用者可以自行到Protobuf官网查找。为了方便起见,接下来会直接省略项目的创建过程。

依赖安装

先下载Protocol Buffer编译器,下载地址:Releases · protocolbuffers/protobuf (github.com)open in new window

根据自己的情况选择系统和版本即可,下载完成后需要将bin目录添加到环境变量中。

然后还要下载代码生成器,编译器是将proto文件生成对应语言的序列化代码,生成器是用于生成业务代码。

$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

创建一个空的项目,名字这里取grpc_learn,然后引入如下依赖

$ go get google.golang.org/grpc

最后看一下版本,是不是真的安装成功了

$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

项目结构

下面将以一个Hello World示例来进行演示,创建如下的项目结构。

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

定义protobuf文件

其中,在pb/hello.proto中,写入如下内容,这是一个相当简单的示例,如果不会protoc语法,请移步相关文档。

syntax = "proto3";

// .表示就直接生成在输出路径下,hello是包名
option go_package = ".;hello";

// 请求
message HelloReq {
  string name = 1;


// 响应
message HelloRep {
  string msg = 1;
}

// 定义服务
service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

生成代码

编写完成后,使用protoc编译器生成数据序列化相关的代码,使用生成器生成rpc相关代码

$ protoc -I ./pb \
		--go_out=./hello ./pb/*.proto\
		--go-grpc_out=./hello ./pb/*.proto

此时可以发现hello文件夹生成了hello.pb.gohello_grpc.pb.go文件,浏览hello.pb.go可以发现我们定义的message

type HelloReq struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

    // 定义的字段
	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

   	// 定义的字段
	Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

hello_grpc.pb.go中可以发现我们定义的服务

type SayHelloServer interface {
	Hello(context.Context, *HelloReq) (*HelloRep, error)
	mustEmbedUnimplementedSayHelloServer()
}

// 后续如果我们自己实现服务接口,必须要嵌入该结构体,就不用实现mustEmbedUnimplementedSayHelloServer方法
type UnimplementedSayHelloServer struct {
}

// 默认返回nil
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
	return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// 接口约束
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
	mustEmbedUnimplementedSayHelloServer()
}

编写服务端

server/main.go中编写如下代码

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	pb "grpc_learn/server/protoc"
	"log"
	"net"
)

type GrpcServer struct {
	pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
	log.Printf("received grpc req: %+v", req.String())
	return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
	// 监听端口
	listen, err := net.Listen("tcp", ":8080")
	if err != nil {
		panic(err)
	}
	// 创建gprc服务器
	server := grpc.NewServer()
	// 注册服务
	pb.RegisterSayHelloServer(server, &GrpcServer{})
	// 运行
	err = server.Serve(listen)
	if err != nil {
		panic(err)
	}
}

编写客户端

client/main.go中写入如下代码

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "grpc_learn/server/protoc"
	"log"
)

func main() {
    // 建立连接,没有加密验证
	conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	// 创建客户端
	client := pb.NewSayHelloClient(conn)
	// 远程调用
	helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
	if err != nil {
		panic(err)
	}
	log.Printf("received grpc resp: %+v", helloRep.String())
}

运行

先运行服务端,再运行客户端,服务端输出如下

2023/07/16 16:26:51 received grpc req: name:"client"

客户端输出如下

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

在本例中,客户端建立好连接后,在调用远程方法时就跟调用本地方法一样,直接访问clientHello方法并获取结果,这就是一个最简单的GRPC例子,许多开源的框架也都是对这一个流程进行了封装。

bufbuild

在上述例子中,是直接使用命令生成的代码,如果后期插件多了命令会显得相当繁琐,这时可以通过工具来进行管理protobuf文件,正好就有这么一个开源的管理工具bufbuild/buf

开源地址:bufbuild/buf: A new way of working with Protocol Buffers. (github.com)open in new window

文档地址:Buf - Install the Buf CLIopen in new window

特点

  • BSR管理
  • Linter
  • 代码生成
  • 格式化
  • 依赖管理

有了这个工具可以相当方便的管理protobuf文件。

文档中提供了相当多的安装方式,可以自己选择。如果本地安装了go环境的话,直接使用go install安装即可

$ go install github.com/bufbuild/buf/cmd/buf@latest

安装完毕后查看版本

$ buf --version
1.24.0

来到helloworld/pb文件夹,执行如下命令创建一个module来管理pb文件。

$ buf mod init
$ ls
buf.yaml  hello.proto

buf.yaml文件内容默认如下

version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

再来到helloworld/目录下,创建buf.gen.yaml,写入如下内容

version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

再执行命令生成代码

$ buf generate

完成后就可以看到生成的文件了,当然buf不止这点功能,其他的功能可以自己去文档学习。

流式RPC

grpc的调用方式有两大类,一元RPC(Unary RPC)和流式RPC(Stream RPC)。Hello World中的示例就是一个典型的一元RPC。

一元rpc(或者叫普通rpc更能理解些,实在不知道怎么翻译这个unary了)用起来就跟普通的http一样,客户端请求,服务端返回数据,一问一答的方式。而流式RPC的请求和响应都 可以是流式的,如下图

使用流式请求时,只返回一次响应,客户端可以通过流来多次发送参数给服务端,服务端可以不需要像一元RPC那样等到所有参数都接收完毕再处理,具体处理逻辑可以由服务端决定。正常情况下,只有客户端可以主动关闭流式请求,一旦流被关闭,当前RPC请求也就会结束。

使用流式响应时,只发送一次参数,服务端可以通过流多次发送数据给客户端,客户端不需要像一元RPC那样接受完所有数据再处理,具体的处理逻辑可以由客户端自己决定。正常请求下,只有服务端可以主动关闭流式响应,一旦流被关闭,当前RPC请求也就会结束。

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

也可以是只有响应是流式的(Server-Streaming RPC)

service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

或者请求和响应都是流式的(Bi-driectional-Streaming RPC)

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

单向流式

下面通过一个例子来演示单向流式的操作,首先创建如下的项目结构

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

message.proto内容如下

syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

通过buf生成代码

$ buf generate

这里演示是消息服务,receiveMessage接收一个指定的用户名,类型为字符串,返回消息流,sendMessage接收消息流,返回成功发送的消息数目,类型为64位整型。接下来创建server/message_service.go,自己实现默认的代码生成的服务

package main

import (
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"grpc_learn/server_client_stream/message"
)

type MessageService struct {
	message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
	return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
	return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

可以看到接收消息和发送消息的参数里面都有一个流包装接口

type MessageService_ReceiveMessageServer interface {
    // 发送消息
	Send(*Message) error
	grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // 发送返回值并关闭连接
	SendAndClose(*wrapperspb.StringValue) error
    // 接收消息
	Recv() (*Message, error)
	grpc.ServerStream
}

它们都嵌入了gprc.ServerStream接口

type ServerStream interface {
	SetHeader(metadata.MD) error
	SendHeader(metadata.MD) error
	SetTrailer(metadata.MD)
	Context() context.Context
	SendMsg(m interface{}) error
	RecvMsg(m interface{}) error
}

可以看到,流式RPC并不像一元RPC那样入参和返回值都可以很明确的体现在函数签名上,这些方法乍一看是看不出来入参和返回值是什么类型的,需要调用传入的Stream类型完成流式传输,接下来开始编写服务端的具体逻辑。在编写服务端逻辑的时候,用了一个sync.map来模拟消息队列,当客户端发送ReceiveMessage请求时,服务端通过流式响应不断返回客户端想要的消息,直到超时过后断开请求。当客户端请求SendMessage时,通过流式请求不断发送消息过来,服务端不断的将消息放入队列中,直到客户端主动断开请求,并返回给客户端消息发送条数。

package main

import (
	"errors"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"grpc_learn/server_client_stream/message"
	"io"
	"log"
	"sync"
	"time"
)

// 一个模拟的消息队列
var messageQueue sync.Map

type MessageService struct {
	message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// 接收指定用户的消息
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
	timer := time.NewTimer(time.Second * 5)
	for {
		time.Sleep(time.Millisecond * 100)
		select {
		case <-timer.C:
			log.Printf("5秒钟内没有收到%s的消息,关闭连接", user.GetValue())
			return nil
		default:
			value, ok := messageQueue.Load(user.GetValue())
			if !ok {
				messageQueue.Store(user.GetValue(), []*message.Message{})
				continue
			}
			queue := value.([]*message.Message)
			if len(queue) < 1 {
				continue
			}

			// 拿到消息
			msg := queue[0]
			// 通过流式传输将消息发送给客户端
			err := recvServer.Send(msg)
			log.Printf("receive %+v\n", msg)
			if err != nil {
				return err
			}

			queue = queue[1:]
			messageQueue.Store(user.GetValue(), queue)
			timer.Reset(time.Second * 5)
		}
	}
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// 发送消息给指定用户
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
	count := 0
	for {
		// 从客户端接收消息
		msg, err := sendServer.Recv()
		if errors.Is(err, io.EOF) {
			return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
		}
		if err != nil {
			return err
		}
		log.Printf("send %+v\n", msg)

		value, ok := messageQueue.Load(msg.From)
		if !ok {
			messageQueue.Store(msg.From, []*message.Message{msg})
			continue
		}
		queue := value.([]*message.Message)
		queue = append(queue, msg)
		// 将消息放入消息队列中
		messageQueue.Store(msg.From, queue)
		count++
	}
}

客户端开了两个协程,一个协程用来发送消息,另一个协程用来接收消息,当然也可以一边发送一边接收,代码如下。

package main

import (
	"context"
	"errors"
	"github.com/dstgo/task"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"grpc_learn/server_client_stream/message"
	"io"
	"log"
	"time"
)

var Client message.MessageServiceClient

func main() {
	dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Panicln(err)
	}
	defer dial.Close()

	Client = message.NewMessageServiceClient(dial)

	log.SetPrefix("client\t")
	msgTask := task.NewTask(func(err error) {
		log.Panicln(err)
	})

	ctx := context.Background()

	// 接收消息请求
	msgTask.AddJobs(func() {
		receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
		if err != nil {
			log.Panicln(err)
		}
		for {
			recv, err := receiveMessageStream.Recv()
			if errors.Is(err, io.EOF) {
				log.Println("暂无消息,关闭连接")
				break
			} else if err != nil {
				break
			}
			log.Printf("receive %+v", recv)
		}
	})

	msgTask.AddJobs(func() {
		from := "jack"
		to := "mike"

		sendMessageStream, err := Client.SendMessage(ctx)
		if err != nil {
			log.Panicln(err)
		}
		msgs := []string{
			"在吗",
			"下午有没有时间一起打游戏",
			"那行吧,以后有时间一起约",
			"就这个周末应该可以吧",
			"那就这么定了",
		}
		for _, msg := range msgs {
			time.Sleep(time.Second)
			sendMessageStream.Send(&message.Message{
				From:    from,
				Content: msg,
				To:      to,
			})
		}
		// 消息发送完了,主动关闭请求并获取返回值
		recv, err := sendMessageStream.CloseAndRecv()
		if err != nil {
			log.Println(err)
		} else {
			log.Printf("发送完毕,总共发送了%d条消息\n", recv.GetValue())
		}
	})

	msgTask.Run()
}

执行过后服务端输出如下

server  2023/07/18 16:28:24 send from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:33 5秒钟内没有收到jack的消息,关闭连接

客户端输出如下

client  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
client  2023/07/18 16:28:28 发送完毕,总共发送了5条消息
client  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
client  2023/07/18 16:28:33 暂无消息,关闭连接

通过这个例子可以发现单向流式RPC请求处理起来的话不论是客户端还是服务端都要比一元rpc复杂,不过双向流式RPC比它们还要更复杂些。

双向流式

双向流式PRC,即请求和响应都是流式的,就相当于把上例中的两个服务结合成一个。对于流式RPC而言,第一个请求肯定是由客户端发起的,随后客户端可以随时通过流来发送请求参数,服务端也可以随时通过流来返回数据,不管哪一方主动关闭流,当前请求都会结束。

提示

后续的内容除非必要,都会直接省略掉pb代码生成以及创建rpc客户端服务端这些步骤的代码描述

首先创建如下项目结构

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

message.proto内容如下

syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

服务端逻辑中,建立连接后,开启两个协程,一个协程负责接收消息,一个负责发送消息,具体的处理逻辑与上个例子类似,不过这次去掉了超时的判定逻辑。

package main

import (
	"github.com/dstgo/task"
	"google.golang.org/grpc/metadata"
	"grpc_learn/bi_stream/message"
	"log"
	"sync"
	"time"
)

// MessageQueue 模拟的消息队列
var MessageQueue sync.Map

type ChatService struct {
	message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// 聊天服务,服务端逻辑我们用多协程来进行处理
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
	md, _ := metadata.FromIncomingContext(chatServer.Context())
	from := md.Get("from")[0]
	defer log.Println(from, "end chat")

	var chatErr error
	chatCh := make(chan error)

	// 创建两个协程,一个收消息,一个发消息
	chatTask := task.NewTask(func(err error) {
		chatErr = err
	})

	// 接收消息的协程
	chatTask.AddJobs(func() {
		for {
			msg, err := chatServer.Recv()
			log.Printf("receive %+v err %+v\n", msg, err)
			if err != nil {
				chatErr = err
				chatCh <- err
				break
			}

			value, ok := MessageQueue.Load(msg.To)
			if !ok {
				MessageQueue.Store(msg.To, []*message.Message{msg})
			} else {
				queue := value.([]*message.Message)
				queue = append(queue, msg)
				MessageQueue.Store(msg.To, queue)
			}
		}
	})

	// 发送消息的协程
	chatTask.AddJobs(func() {
	Send:
		for {
			time.Sleep(time.Millisecond * 100)
			select {
			case <-chatCh:
				log.Println(from, "close send")
				break Send
			default:
				value, ok := MessageQueue.Load(from)
				if !ok {
					value = []*message.Message{}
					MessageQueue.Store(from, value)
				}

				queue := value.([]*message.Message)
				if len(queue) < 1 {
					continue Send
				}

				msg := queue[0]
				queue = queue[1:]
				MessageQueue.Store(from, queue)
				err := chatServer.Send(msg)
				log.Printf("send %+v\n", msg)
				if err != nil {
					chatErr = err
					break Send
				}
			}
		}
	})

	chatTask.Run()

	return chatErr
}

客户端逻辑中,开启了两个子协程来模拟两个人的聊天过程,两个子协程中分别又各有两个孙协程负责收发消息(客户端逻辑中并没有保证两个人聊天的消息收发顺序正确,只是一个简单的双方发送与接收的例子)

package main

import (
	"context"
	"github.com/dstgo/task"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/metadata"
	"grpc_learn/bi_stream/message"
	"log"
	"time"
)

var Client message.ChatServiceClient

func main() {
	log.SetPrefix("client ")
	dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
	defer dial.Close()

	if err != nil {
		log.Panicln(err)
	}
	Client = message.NewChatServiceClient(dial)

	chatTask := task.NewTask(func(err error) {
		log.Panicln(err)
	})

	chatTask.AddJobs(func() {
		NewChat("jack", "mike", "你好", "有没有时间一起打游戏?", "好吧")
	})

	chatTask.AddJobs(func() {
		NewChat("mike", "jack", "你好", "没有", "没时间,你找别人吧")
	})

	chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
	ctx := context.Background()
	mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
	chat, err := Client.Chat(mdCtx)
	defer log.Println("end chat", from)

	if err != nil {
		log.Panicln(err)
	}

	chatTask := task.NewTask(func(err error) {
		log.Panicln(err)
	})

	chatTask.AddJobs(func() {
		for _, content := range contents {
			time.Sleep(time.Second)
			chat.Send(&message.Message{
				From:    from,
				Content: content,
				To:      to,
			})
		}
		// 消息发完了,就关闭连接
		time.Sleep(time.Second * 5)
		chat.CloseSend()
	})

	// 接收消息的协程
	chatTask.AddJobs(func() {
		for {
			msg, err := chat.Recv()
			log.Printf("receive %+v\n", msg)
			if err != nil {
				log.Println(err)
				break
			}
		}
	})

	chatTask.Run()
}

正常情况下,服务端输出

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"没有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"没有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有没有时间一起打游戏?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"没时间,你找别人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"没时间,你找别人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

正常情况下,客户端输出(可以看到消息的顺序逻辑是乱的)

client 2023/07/19 17:26:24 receive from:"jack"  content:"你好"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"你好"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"没有"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"有没有时间一起打游戏?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"好吧"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"没时间,你找别人吧"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

通过示例可以看到的是,双向流式的处理逻辑无论是客户端还是服务端,都要比单向流式更复杂,需要结合多协程开启异步任务才能更好的处理逻辑。

metadata

metadata本质上是一个map,它的value是一个字符串切片,就类似http1的header一样,并且它在gRPC中扮演的角色也和http header类似,提供一些本次RPC调用的一些信息,同时metadata的生命周期跟随着一次rpc调用的整个过程,调用结束,它的生命周期也就结束了。

它在gRPC中主要通过context来进行传输和存储,不过gRPC提供了metadata包,里面有相当多的方便函数来简化操作,不需要我们去手动操作context 。metadata在gRPC中对应的类型为metadata.MD,如下所示。

// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]string

我们可以直接使用metadata.New函数来创建,不过在创建之前,有几个点需要注意

func New(m map[string]string) MD

metadata对键名有所限制,仅能是以下规则限制的字符:

  • ASCII字符
  • 数字:0-9
  • 小写字母:a-z
  • 大写字母:A-Z
  • 特殊字符:-_

提示

在metadata中,大写的字母都会被转换为小写,也就是说会占用同一个key,值也会被覆盖。

提示

grpc-开头的key是grpc保留使用的内部key,如果使用这类key的话可能会导致一些错误。

手动创建

创建metadata的方式有很多,这里介绍手动创建metadata最常用的两种方法,第一种就是使用metadata.New函数,直接传入一个map。

func New(m map[string]string) MD
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

第二种是metadata.Pairs,传入偶数长度的字符串切片,会自动的解析成键值对。

func Pairs(kv ...string) MD
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

还可以使用metadata.join来合并多个metadata

func Join(mds ...MD) MD
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.join(md1,md2)

服务端使用

获取metadata

服务端获取metadata可以使用metadata.FromIncomingContext函数来获取

func FromIncomingContext(ctx context.Context) (MD, bool)

对于一元rpc而言,service的参数里面会带一个context参数,直接从里面获取metadata即可

func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
	md, b := metadata.FromIncomingContext(ctx)
	...
}

对于流式rpc,service的参数里面会有一个流对象,通过它可以获取流的context

func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
	md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

发送metadata

发送metadata可以使用grpc.sendHeader函数

func SendHeader(ctx context.Context, md metadata.MD) error

该函数最多调用一次,在一些导致header被自动发送的事件发生后使用则不会生效。在一些情况下,如果不想直接发送header,这时可以使用grpc.SetHeader函数。

func SetHeader(ctx context.Context, md metadata.MD) error 

该函数多次调用的话,会将每次传入的metadata合并,并在以下几种情况发送给客户端

  • gprc.SendHeaderServertream.SendHeader被调用时
  • 一元rpc的handler返回时
  • 调用流式rpc中流对象的Stream.SendMsg
  • rpc请求的状态变为send out,这种情况要么是rpc请求成功了,要么就是出错了。

对于流式rpc而言,建议使用流对象的SendHeader方法和SetHeader方法。

type ServerStream interface {
	SetHeader(metadata.MD) error
	SendHeader(metadata.MD) error
	SetTrailer(metadata.MD)
	...
}

提示

在使用过程中会发现Header和Trailer两个功能差不多,不过它们的主要区别在于发送的时机,一元rpc中可能体会不到,但是这一差别在流式RPC中尤为明显,因为流式RPC中的Header可以不用等待请求结束就可以发送Header。前面提到过了Header会在特定的情况下被发送,而Trailer仅仅只会在整个RPC请求结束后才会被发送,在此之前,获取到的trailer都是空的。

客户端使用

获取metadata

客户端想要获取响应的header,可以通过grpc.Headergrpc.Trailer来实现

func Header(md *metadata.MD) CallOption
func Trailer(md *metadata.MD) CallOption

不过需要注意的是,并不能直接获取,可以看到以上两个函数返回值是CallOption,也就是说是在发起RPC请求时作为option参数传入的。

// 声明用于接收值的md
var header, trailer metadata.MD

// 调用rpc请求时传入option
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

在请求完成后,会将值写到传入的md中。对于流式rpc而言,可以通过发起请求时返回的流对象直接获取

type ClientStream interface {
	Header() (metadata.MD, error)
	Trailer() metadata.MD
    ...
}
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

发送metadata

客户端想要发送metadata很简单,之前提到过metadata的表现形式就是valueContext,将metadata结合到context中,然后在请求的时候把context传入即可,metadata包提供了两个函数来方便构造context。

func NewOutgoingContext(ctx context.Context, md MD) context.Context 
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// 一元rpc
res,err := client.SomeRPC(outgoingContext,data)
// 流式rpc
stream,err := client.StreamRPC(outgoingContext)

如果原有的ctx已经有metadata了的话,再使用NewOutgoingContext会将先前的数据直接覆盖掉,为了避免这种情况,可以使用下面这个函数,它不会覆盖,而是会将数据合并。

func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// 一元rpc
res,err := client.SomeRPC(appendContext,data)
// 流式rpc
stream,err := client.StreamRPC(appendContext)

拦截器

gRPC的拦截器就类似于gin中的Middleware一样,都是为了在请求前或者请求后做一些特殊的工作并且不影响到本身的业务逻辑。在gRPC中,拦截器有两大类,服务端拦截器和客户端拦截器,根据请求类型来分则有一元RPC拦截器,和流式RPC拦截器,下图

为了能更好的理解拦截器,下面会根据一个非常简单的示例来进行描述。

grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

person.proto内容如下

syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

服务端代码如下,逻辑全是之前的内容,比较简单不再赘述。

package main

import (
	"context"
	"errors"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"grpc_learn/interceptor/person"
	"io"
	"sync"
)

// 存放数据
var personData sync.Map

type PersonService struct {
	person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
	value, ok := personData.Load(name.Value)
	if !ok {
		return nil, person.PersonNotFoundErr
	}
	personInfo := value.(*person.PersonInfo)
	return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
	count := 0
	for {
		personInfo, err := personStream.Recv()
		if errors.Is(err, io.EOF) {
			return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
		} else if err != nil {
			return err
		}

		personData.Store(personInfo.Name, personInfo)
		count++
	}
}

服务端拦截

拦截服务端rpc请求的有UnaryServerInterceptorStreamServerInterceptor,具体类型如下所示

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

一元RPC

创建一元RPC拦截器,只需要实现UnaryserverInterceptor类型即可,下面是一个简单的一元RPC拦截器的例子,功能是输出每一次rpc的请求和响应。

// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} rpc的请求数据
// param info *grpc.UnaryServerInfo 本次一元RPC的一些请求信息
// param unaryHandler grpc.UnaryHandler 具体的handler
// return resp interface{} rpc的响应数据
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
	log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
	resp, err = unaryHandler(ctx, req)
	log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
	return resp, err
}

对于一元RPC而言,拦截器拦截的是每一个RPC的请求和响应,即拦截的是RPC的请求阶段和响应阶段,如果拦截器返回error,那么本次请求就会结束。

流式rpc

创建流式RPC拦截器,只需要实现StreamServerInterceptor类型即可,下面是一个简单的流式RPC拦截器的例子。

// StreamPersonLogInterceptor
// param srv interface{} 对应服务端实现的server
// param stream grpc.ServerStream 流对象
// param info *grpc.StreamServerInfo 流信息
// param streamHandler grpc.StreamHandler 处理器
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
	log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
	err := streamHandler(srv, stream)
	log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
	return err
}

对于流式RPC而言,拦截器拦截的是每一个流对象的SendRecve方法被调用的时机,如果拦截器返回error,并不会导致本次RPC请求的结束,仅仅只是代表着本次send recv出现了错误。

使用拦截器

要想使创建的拦截器生效,需要在创建gRPC服务器的时候作为option传入,官方也提供了相关的函数以供使用。如下所示,有添加单个拦截器的函数,也有添加链式拦截器的函数。

func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption 

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption 

提示

重复使用UnaryInterceptor会抛出如下panic

panic: The unary server interceptor was already set and may not be reset. 

StreamInterceptor也是同理,而链式拦截器重复调用则会append到同一个链上。

使用示例如下

package main

import (
	"google.golang.org/grpc"
	"grpc_learn/interceptor/person"
	"log"
	"net"
)

func main() {
	log.SetPrefix("server ")
	listen, err := net.Listen("tcp", "9090")
	if err != nil {
		log.Panicln(err)
	}
	server := grpc.NewServer(
        // 添加链式拦截器
		grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
		grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
	)
	person.RegisterPersonServer(server, &PersonService{})
	server.Serve(listen)
}

客户端拦截

客户端拦截器跟服务端差不多,一个一元拦截器UnaryClientInterceptor,一个流式拦截器StreamClientInterceptor,具体类型如下所示。

type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

一元RPC

创建一元RPC客户端拦截器,实现UnaryClientInterceptor即可,下面就是一个简单的例子。

// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string 方法名
// param req interface{} 请求数据
// param reply interface{} 响应数据
// param cc *grpc.ClientConn 客户端连接对象
// param invoker grpc.UnaryInvoker 被拦截的具体客户端方法
// param opts ...grpc.CallOption 本次请求的配置项
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
	err := invoker(ctx, method, req, reply, cc, opts...)
	log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
	return err
}

通过客户端的一元RPC拦截器,可以获取到本地请求的请求数据和响应数据以及一些其他的请求信息。

流式RPC

创建一个流式RPC客户端拦截器,实现StreamClientInterceptor即可,下面就是一个例子。

// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 流对象的描述信息
// param cc *grpc.ClientConn 连接对象
// param method string 方法名
// param streamer grpc.Streamer 用于创建流对象的对象
// param opts ...grpc.CallOption 连接配置项
// return grpc.ClientStream 创建好的客户端流对象
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
	stream, err := streamer(ctx, desc, cc, method, opts...)
	log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
	return stream, err
}

通过流式RPC客户端拦截器,只能拦截到客户端与服务端建立连接的时候也就是创建流的时机,并不能拦截到客户端流对象每一次收发消息的时候,不过我们把拦截器中创建好的流对象包装一下就可以实现拦截收发消息了,就像下面这样

// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 流对象的描述信息
// param cc *grpc.ClientConn 连接对象
// param method string 方法名
// param streamer grpc.Streamer 用于创建流对象的对象
// param opts ...grpc.CallOption 连接配置项
// return grpc.ClientStream 创建好的客户端流对象
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
	log.Println(fmt.Sprintf("before create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
	stream, err := streamer(ctx, desc, cc, method, opts...)
	log.Println(fmt.Sprintf("after create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
	return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
	method string
	desc   *grpc.StreamDesc
	grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
	// 消息发送前
	err := c.ClientStream.SendMsg(m)
	// 消息发送后
	log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
	return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
	// 消息接收前
	err := c.ClientStream.RecvMsg(m)
	// 消息接收后
	log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
	return err
}

使用拦截器

使用时,与服务端类似也是四个工具函数通过option来添加拦截器,分为单个拦截器和链式拦截器。

func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption 

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

提示

客户端重复使用WithUnaryInterceptor不会抛出panic,但是仅最后一个会生效。

下面是一个使用案例

package main

import (
	"context"
	"fmt"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/protobuf/types/known/wrapperspb"
	"grpc_learn/interceptor/person"
	"log"
)

func main() {
	log.SetPrefix("client ")
	dial, err := grpc.Dial("localhost:9090",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
		grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
	)
	if err != nil {
		log.Panicln(err)
	}

	ctx := context.Background()
	client := person.NewPersonClient(dial)

	personStream, err := client.CreatePersonInfo(ctx)
	personStream.Send(&person.PersonInfo{
		Name:    "jack",
		Age:     18,
		Address: "usa",
	})
	personStream.Send(&person.PersonInfo{
		Name:    "mike",
		Age:     20,
		Address: "cn",
	})
	recv, err := personStream.CloseAndRecv()
	log.Println(recv, err)

	log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
	log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

到目前为止,整个案例已经编写完毕,是时候来运行一下看看结果是什么样的。服务端输出如下

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"                                                                          
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>                                           
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"                                                                         
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found   

客户端输出如下

C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false 
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

可以看到两边的输出都符合预期,起到了拦截的效果,这个案例只是一个很简单的示例,利用gRPC的拦截器可以做很多事情比如授权,日志,监控等等其他功能,可以选择自己造轮子,也可以选择使用开源社区现成的轮子,gRPC Ecosystemopen in new window专门收集了一系列开源的gRPC拦截器中间件,地址:grpc-ecosystem/go-grpc-middlewareopen in new window

错误处理

在开始之前先来看一个例子,在上一个拦截器案例中,如果用户查询不到,会向客户端返回错误person not found,那么问题来了,客户端能不能根据返回的错误做特殊的处理呢?接下来试一试,在客户端代码中,尝试使用errors.Is来判断错误。

func main() {
	log.SetPrefix("client ")
	dial, err := grpc.Dial("localhost:9090",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
		grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
	)
	if err != nil {
		log.Panicln(err)
	}

	ctx := context.Background()
	client := person.NewPersonClient(dial)

	personStream, err := client.CreatePersonInfo(ctx)
	personStream.Send(&person.PersonInfo{
		Name:    "jack",
		Age:     18,
		Address: "usa",
	})
	personStream.Send(&person.PersonInfo{
		Name:    "mike",
		Age:     20,
		Address: "cn",
	})
	recv, err := personStream.CloseAndRecv()
	log.Println(recv, err)

	info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
	log.Println(info, err)
	if errors.Is(err, person.PersonNotFoundErr) {
		log.Println("person not found err")
	}
}

结果输出如下

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

可以看到客户端接收的error是这样的,会发现服务端返回的error在desc这个字段里面

rpc error: code = Unknown desc = person not found

自然errors.Is这段逻辑也就没有执行,即便换成errors.As也是一样的结果。

if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

为此,gRPC提供了status包来解决这类问题,这也是为什么客户端接收到的错误会有code和desc字段的原因,因为gRPC实际上返回给客户端的是一个Status,其具体类型如下,可以看出也是一个protobuf定义的message。

type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
message Status {
  // The status code, which should be an enum value of
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // A developer-facing error message, which should be in English. Any
  // user-facing error message should be localized and sent in the
  // [google.rpc.Status.details][google.rpc.Status.details] field, or localized
  // by the client.
  string message = 2;

  // A list of messages that carry the error details.  There is a common set of
  // message types for APIs to use.
  repeated google.protobuf.Any details = 3;
}

错误码

Status结构体中的Code,是一种类似Http Status形式的存在,用于表示当前rpc请求的状态,gRPC定义了16个code位于grpc/codes,涵盖了大部分的场景,分别如下所示

// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
	// 调用成功
	OK Code = 0

	// 请求被取消
	Canceled Code = 1

	// 未知错误
	Unknown Code = 2

	// 参数不合法
	InvalidArgument Code = 3
	
    // 请求超时
	DeadlineExceeded Code = 4

	// 资源不存在
	NotFound Code = 5

    // 已存在相同的资源(能出现这个我是没想到的)
	AlreadyExists Code = 6

	// 权限不足被拒绝访问
	PermissionDenied Code = 7

	// 资源枯竭,剩下的容量不足以使用,比如磁盘容量不够了之类的情况
	ResourceExhausted Code = 8

	// 执行条件不足,比如使用rm删除一个非空的目录,删除的条件是目录是空的,但条件不满足
	FailedPrecondition Code = 9

	// 请求被打断
	Aborted Code = 10

	// 操作访问超出限制范围
	OutOfRange Code = 11

	// 表示当前服务没有实现
	Unimplemented Code = 12

	// 系统内部错误
	Internal Code = 13

	// 服务不可用
	Unavailable Code = 14

	// 数据丢失
	DataLoss Code = 15

	// 没有通过认证
	Unauthenticated Code = 16

	_maxCode = 17
)

grpc/status包提供了相当多的函数以方status与error之间的相互转换。我们可以直接使用status.New来创建一个Status,或者Newf

func New(c codes.Code, msg string) *Status 

func Newf(c codes.Code, format string, a ...interface{}) *Status

例如下面的代码

success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

通过status的err方法可以获取到其中的error,当状态为ok的时候error为nil。

func (s *Status) Err() error {
	if s.Code() == codes.OK {
		return nil
	}
	return &Error{s: s}
}

也可以直接创建error

func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

于是我们可以将服务代码修改成如下

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
	value, ok := personData.Load(name.Value)
	if !ok {
		return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
	}
	personInfo := value.(*person.PersonInfo)
	return personInfo, status.Errorf(codes.OK, "request success")
}

在此之前,服务端返回的所有的code都是unknown,现在经过修改后有了更加明确的语义。于是在客户端就可以通过status.FromError或者使用下面的函数从error中获取status,从而根据不同的code来做出响应的处理

func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code 

示例如下

info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

不过尽管grpc的code已经尽可能的涵盖了一些通用场景,不过有时候还是无法满足开发人员的需求,这个时候就可以使用Status中的Details字段,并且它还是一个切片,可以容纳多个信息。通过Status.WithDetails来传入一些自定义的信息

func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

通过Status.Details来获取信息

func (s *Status) Details() []interface{}

需要注意的是,传入的信息最好是由protobuf定义的,这样才能方便服务端客户端两端都能解析,官方给出了几个示例

message ErrorInfo {
  // 错误的原因
  string reason = 1;

  // 定义服务的主体
  string domain = 2;

  // 其他信息
  map<string, string> metadata = 3;
}

// 重试信息
message RetryInfo {
  // 同一个请求的等待间隔时间
  google.protobuf.Duration retry_delay = 1;
}

// 调试信息
message DebugInfo {
  // 堆栈
  repeated string stack_entries = 1;

  // 一些细节信息
  string detail = 2;
}

...
...

更多的例子可以前往googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com)open in new window查看。如果需要可以通过下面的代码来引入。

import "google.golang.org/genproto/googleapis/rpc/errdetails"

使用ErrorInfo作为details

notFound := status.Newf(codes.NotFound, "person not found: %s", name)
	notFound.WithDetails(&errdetails.ErrorInfo{
		Reason:   "person not found",
		Domain:   "xxx",
		Metadata: nil,
	})

在客户端就可以拿到数据做出处理,不过上述只是gRPC推荐使用的一些例子,除此之外,同样也可以自己定义message,来更好的满足相应的业务需求,如果想做一些统一的错误处理,也可以放到拦截器里面操作。

超时控制

在大多数情况下,通常不会只有一个服务,并且可能上游有很多服务,下游也有很多服务。客户端发起一次请求,从最上游的服务到最下游,就形成了一个服务调用链,就像图中那样,或许可能比图中的还要长。

如此长的一个调用链,如果其中一个服务的逻辑处理需要花费很长时间,就会导致上游一直处于等待状态。为了减少不必要的资源浪费,因此有必要引入超时这一机制,这样一来最上游调用时传入的超时时间,便是整个调用链所允许的执行花费最大时间。而gRPC可以跨进程跨语言传递超时,它把一些需要跨进程传递的数据放在了HTTP2的HEADERS Frame帧中,如下图

gRPC请求中的超时数据对应着HEADERS Frame中的grpc-timeout字段。需要注意的是,并不是所有的gRPC库都实现了这一超时传递机制,不过gRPC-go肯定是支持的,如果使用其他语言的库,并且使用了这一特性,则需要额外留意这一点。

连接超时

gRPC客户端在向服务端建立连接时,默认是异步建立的,如果连接建立失败只会返回一个空的Client。如果想要使连接同步进行,则可以使用grpc.WithBlock()来使连接未建立成功时阻塞等待。

dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

如果想要控制一个超时时间,则只需要传入一个TimeoutContext,使用grpc.DialContext来替代gprc.Dial以传入context。

timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

如此一来,如果连接建立超时,就会返回error

context deadline exceeded

在服务端同样也可以设置连接超时,在与客户端建立新连接的时候设置一个超时时间,默认是120秒,如果在规定时间内没有成功建立连接,服务端会主动断开连接。

server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

提示

grpc.ConnectionTimeout仍处于实验阶段,未来的API可能会被修改或删除。

请求超时

gRPC客户端在发起请求的时候,第一个参数就是Context类型,同样的,要想给RPC请求加上一个超时时间,只需要传入一个TimeoutContext即可

timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // 超时逻辑处理
}

经过gRPC的处理,超时时间被传递到了服务端,在传输过程中它以在帧字段的形式存在中,在go里面它以context的形式存在,就此在整个链路中进行传递。在链路传递过程中,不建议去修改超时时间,具体在请求时设置多长的超时时间,这应该是最上游应该考虑的问题。

认证授权

在微服务领域中,每一个服务都需要对请求验证用户身份和权限,如果和单体应用一样,每个服务都要自己实现一套认证逻辑,这显然是不太现实的。所以需要一个统一的认证与授权服务,而常见的解决方案是使用OAuth2,分布式Session,和JWT,这其中,OAuth2使用最为广泛,一度已经成为了业界标准,OAuth2最常用的令牌类型就是是JWT。下面是一张OAuth2授权码模式的流程图,基本流程如图所示。

安全传输

服务注册与发现

客户端调用服务端的指定服务之前,需要知晓服务端的ip和port,在先前的案例中,服务端地址都是写死的。在实际的网络环境中不总是那么稳定,一些服务可能会因故障下线而无法访问,也有可能会因为业务发展进行机器迁移而导致地址变化,在这些情况下就不能使用静态地址访问服务了,而这些动态的问题就是服务发现与注册要解决的,服务发现负责监视服务地址的变化并更新,服务注册负责告诉外界自己的地址。gRPC中,提供了基础的服务发现功能,并且支持拓展和自定义。

不能用静态地址,可以用一些特定的名称来进行代替,比如浏览器通过DNS解析域名来获取地址,同样的,gRPC默认的服务发现就是通过DNS来进行的,修改本地的host文件,添加如下映射

127.0.0.1 example.grpc.com

然后将helloworld示例中客户端Dial的地址改为对应的域名

func main() {
	// 建立连接,没有加密验证
	conn, err := grpc.Dial("example.grpc.com:8080",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	// 创建客户端
	client := hello2.NewSayHelloClient(conn)
	// 远程调用
	helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
	if err != nil {
		panic(err)
	}
	log.Printf("received grpc resp: %+v", helloRep.String())
}

同样能看到正常的输出

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

在gRPC中,这类名称必须要遵守RFC 3986中定义的URI语法,格式为

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

上例中的URI就是如下形式,由于默认支持dns所以省略掉了前缀的scheme。

dns:example.grpc.com:8080

除此之外gRPC还默认支持Unix domain sockets,而对于其他的方式,需要我们根据gRPC的拓展来进行自定义实现,为此需要实现一个自定义的解析器resolver.Resovler,resolver负责监控目标地址和服务配置的更新。

type Resolver interface {
    // gRPC将调用ResolveNow来尝试再次解析目标名称。这只是一个提示,如果不需要,解析器可以忽略它,该方法可能被并发的调用
	ResolveNow(ResolveNowOptions)
	Close()
}

gRPC要求我们传入一个Resolver构造器,也就是resolver.Builder,它负责构造Resolver实例。

type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	Scheme() string
}

Builder的Scheme方法返回它负责解析的Scheme类型,例如默认的dnsBuilder它返回的就是dns,构造器在初始化时应该使用resolver.Register注册到全局Builder中,又或者作为options,使用grpc.WithResolver传入ClientConn内部,后者的优先级高于前者。

上图简单描述了一下resolver的工作流程,接下来就演示如何自定义resolver

自定义服务解析

下面编写一个自定义解析器,需要一个自定义的解析构造器来进行构造。

package myresolver

import (
	"fmt"
	"google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
	return &MyBuilder{ads: ads}
}

type MyBuilder struct {
	ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
	if target.URL.Scheme != c.Scheme() {
		return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
	}
	m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // 这里必须要updatestate,否则会死锁
	m.start()
	return m, nil
}

func (c *MyBuilder) Scheme() string {
	return "hello"
}

type MyResolver struct {
	t   resolver.Target
	cc  resolver.ClientConn
	ads map[string][]string
}

func (m *MyResolver) start() {
	addres := make([]resolver.Address, 0)
	for _, ad := range m.ads[m.t.URL.Opaque] {
		addres = append(addres, resolver.Address{Addr: ad})
	}

	err := m.cc.UpdateState(resolver.State{
		Addresses: addres,
        // 配置,loadBalancingPolicy指的是负载均衡策略
		ServiceConfig: m.cc.ParseServiceConfig(
			`{"loadBalancingPolicy":"round_robin"}`),
	})

	if err != nil {
		m.cc.ReportError(err)
	}
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

自定义解析器就是把map里面的匹配的地址传入到updatestate,同时还指定了负载均衡的策略,round_robin指的是轮询的意思。

// service config 结构如下
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

客户端代码如下

package main

import (
	"context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/resolver"
	"grpc_learn/helloworld/client/myresolver"
	hello2 "grpc_learn/helloworld/hello"
	"log"
	"time"
)

func init() {
	// 注册builder
	resolver.Register(myresolver.NewBuilder(map[string][]string{
		"myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
	}))
}

func main() {

	// 建立连接,没有加密验证
	conn, err := grpc.Dial("hello:myworld",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	// 创建客户端
	client := hello2.NewSayHelloClient(conn)
   	// 每秒调用一次
	for range time.Tick(time.Second) {
		// 远程调用
		helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
		if err != nil {
			panic(err)
		}
		log.Printf("received grpc resp: %+v", helloRep.String())
	}

}

正常来说,流程应该是服务端向注册中心注册自身服务,然后客户端从注册中心中获取服务列表然后进行匹配,这里传入的map就是一个模拟的注册中心,数据是静态的就省略掉了服务注册这一环节,只剩下服务发现。客户端使用的target为hello:myworld,hello是自定义的scheme,myworld就是服务名,经过自定义的解析器解析后,就得到了127.0.0.1:8080的真实地址,在实际情况中,为了做负载均衡,一个服务名可能会匹配多个真实地址,所以这就是为什么服务名对应的是一个切片,这里开两个服务端,占用不同的端口,负载均衡策略为轮询,服务端输出分别如下,通过请求时间可以看到负载均衡策略确实是在起作用的,如果不指定策略的话默认只选取第一个服务。

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

注册中心其实就是存放着的就是服务注册名与真实服务地址的映射集合,只要是能够进行数据存储的中间件都可以满足条件,甚至拿mysql来做注册中心也不是不可以(应该没有人会这么做)。一般来说注册中心都是KV存储的,redis就是一个很不错的选择,但如果使用redis来做注册中心的话,我们就需要自行实现很多逻辑,比如服务的心跳检查,服务下线等,服务调度等等,还是相当麻烦的,虽然redis在这方面有一定的应用但是较少。正所谓专业的事情交给专业的人做,这方面做的比较出名的有很多:Zookeeper,Consul,Eureka,ETCD,Nacos等。

可以前往注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul和ETCD - 掘金 (juejin.cn)open in new window来了解这几个中间件的一些区别。

结合consul

结合consul使用的案例可以前往consul

负载均衡