原文在这里

本教程为Go程序员提供了使用gRPC的基本介绍。

通过跟随本示例,你将学会如何:

  • 在.proto文件中定义一个服务。
  • 使用协议缓冲编译器生成服务器和客户端代码。
  • 使用Go gRPC API编写一个简单的服务端和客户端。

本教程假设你已经阅读了gRPC入门并熟悉协议缓冲(Protocol Buffers)。请注意,本教程中的示例使用了proto3版本的协议缓冲语言。你可以在proto3语言指南Go生成的代码指南中了解更多信息。

为什么使用gRPC?

本示例是一个简单的路线映射应用程序,允许客户端获取有关其路线上的特点信息,创建其路线的摘要,并与服务器和其他客户端交换路线信息,如交通更新。

通过gRPC,我们可以在.proto文件中定义我们的服务,并在gRPC支持的任何语言中生成客户端和服务器。这些代码可以运行在从大型数据中心内的服务器到你自己的平板电脑等各种环境中,gRPC会为你处理不同语言和环境之间的通信复杂性。我们还可以获得与协议缓冲一起工作的所有优势,包括高效的序列化、简单的IDL和易于更新的接口。

设置

在开始之前,你应该已经安装了生成客户端和服务器接口代码所需的工具。如果还没有安装,请参考快速入门指南的先决条件部分进行安装设置。

获取示例代码

示例代码位于grpc-go仓库中。

你可以下载该仓库的zip文件并解压,或者通过克隆仓库来获取示例代码:

$ git clone -b v1.56.2 --depth 1 https://github.com/grpc/grpc-go

然后进入示例代码的目录:

$ cd grpc-go/examples/route_guide

定义服务

作为第一步,我们需要使用protocol buffers来定义gRPC服务以及方法请求响应类型。完整的.proto文件可以在routeguide/route_guide.proto中找到。

在.proto文件中,要定义一个服务,你需要在其中指定一个命名的服务:

service RouteGuide {
   ...
}

然后在服务定义内部定义rpc方法,并指定它们的请求和响应类型。gRPC允许你定义四种类型的服务方法,其中在RouteGuide服务中都会使用到:

  • 一个简单的RPC,客户端使用存根(stub)向服务器发送请求,并等待响应返回,就像普通的函数调用一样。
// Obtains the feature at a given position.
rpc GetFeature(Point) returns (Feature) {}
  • 一个服务端流式RPC,在这种RPC中,客户端发送请求给服务器,并获得一个流以读取一系列的响应消息。客户端从返回的流中读取,直到没有更多的消息为止。在我们的例子中,你可以通过在响应类型之前使用stream关键字来指定一个服务端流式方法。
// Obtains the Features available within the given Rectangle.  Results are
// streamed rather than returned at once (e.g. in a response message with a
// repeated field), as the rectangle may cover a large area and contain a
// huge number of features.
rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • 客户端流式RPC,客户端编写一系列消息并通过提供的流发送到服务器。一旦客户端完成写入消息,它会等待服务器读取所有消息并返回响应。你可以通过在请求类型之前放置stream关键字来指定客户端流式方法。
// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 双向流式RPC,双方使用读写流发送一系列消息。两个流操作独立,因此客户端和服务器可以按任意顺序读取和写入:例如,服务器可以在写入其响应之前等待接收所有客户端消息,或者可以交替读取消息然后写入消息,或者进行一些其他读取和写入的组合。每个流中消息的顺序保持不变。你可以通过在请求类型和响应类型之前都放置stream关键字来指定这种类型的方法。
// Accepts a stream of RouteNotes sent while a route is being traversed,
// while receiving other RouteNotes (e.g. from other users).
rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}

我们的.proto文件还包含了用于所有服务方法中的请求和响应类型的协议缓冲区消息类型定义 - 例如,这里是Point消息类型的定义:

// Points are represented as latitude-longitude pairs in the E7 representation
// (degrees multiplied by 10**7 and rounded to the nearest integer).
// Latitudes should be in the range +/- 90 degrees and longitude should be in
// the range +/- 180 degrees (inclusive).
message Point {
  int32 latitude = 1;
  int32 longitude = 2;
}

在上面的代码中,我们定义了一个名为Point的消息类型,它包含两个字段:latitude和longitude,分别对应整数类型的字段标识为1和2。这个消息类型可以用来表示地理位置的纬度和经度信息。

生成客户端和服务器代码

接下来,我们需要从.proto服务定义生成gRPC客户端和服务器接口。我们使用protoc以及gRPC Go插件来完成这个任务。

examples/route_guide目录中,运行以下命令:

$ protoc --go_out=. --go_opt=paths=source_relative \
    --go-grpc_out=. --go-grpc_opt=paths=source_relative \
    routeguide/route_guide.proto

运行这个命令会在routeguide目录下生成以下文件:

  • route_guide.pb.go:包含所有协议缓冲区代码,用于填充、序列化和检索请求和响应消息类型。
  • route_guide_grpc.pb.go:包含以下内容:
    • 一个接口类型(或存根),供客户端调用,其中定义了RouteGuide服务中的方法。
    • 一个接口类型,供服务器实现,也包含RouteGuide服务中定义的方法。

创建服务

首先,让我们看一下如何创建一个RouteGuide服务器。如果你只关心创建gRPC客户端,可以跳过本节,直接查看创建客户端部分(不过你可能还是会对此感兴趣!)。

使我们的RouteGuide服务发挥作用有两个部分:

  1. 实现从服务定义生成的服务接口:执行我们服务的实际”工作”。
  2. 运行gRPC服务器以侦听来自客户端的请求,并将它们分派到正确的服务实现。

你可以在server/server.go文件中找到我们的示例RouteGuide服务器。让我们来仔细看看它是如何工作的。

实现RouteGuide

正如你所见,我们的服务器有一个routeGuideServer结构类型,它实现了生成的RouteGuideServer接口:

type routeGuideServer struct {
    ...
}
...

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
    ...
}
...

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
    ...
}
...

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
    ...
}
...

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
    ...
}
...

简单RPC

routeGuideServer实现了我们的所有服务方法。首先,让我们看一下最简单的方法GetFeature。它只从客户端获取一个Point,然后返回其数据库中相应特征的信息。

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
    for _, feature := range s.savedFeatures {
        if proto.Equal(feature.Location, point) {
            return feature, nil
        }
    }
    // 如果未找到特征,则返回一个未命名特征
    return &pb.Feature{Location: point}, nil
}

该方法接收一个RPC的上下文对象和客户端的Point协议缓冲区请求。它返回一个包含响应信息的Feature协议缓冲区对象和一个错误。在方法中,我们将Feature填充为适当的信息,然后将其与空错误一起返回,以告诉gRPC我们已经完成了对RPC的处理,并且Feature可以返回给客户端。

服务器端流式RPC

现在,让我们来看一个流式RPC的例子。ListFeatures是一个服务器端流式RPC,因此我们需要向客户端发送多个Feature

func (s *routeGuideServer) ListFeatures(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
    for _, feature := range s.savedFeatures {
        if inRange(feature.Location, rect) {
            if err := stream.Send(feature); err != nil {
                return err
            }
        }
    }
    return nil
}

如你所见,与在方法参数中获取简单的请求和响应对象不同,这次我们获取了一个请求对象(客户端要查找的Rectangle中的Feature)和一个特殊的RouteGuide_ListFeaturesServer对象,用于编写我们的响应。

在该方法中,我们填充了需要返回的多个Feature对象,并使用RouteGuide_ListFeaturesServerSend()方法将它们写入其中。最后,就像在我们的简单RPC中一样,我们返回一个空错误,以告诉gRPC我们已经完成了写入响应。如果在此调用中发生任何错误,则我们返回一个非空错误;gRPC层将将其转换为适当的RPC状态发送到网络。

客户端端流式RPC

接下来,让我们看一些更复杂的东西:客户端端流式方法RecordRoute。在这里,我们从客户端获取一个Point流,并返回有关他们的行程的单个RouteSummary信息。如你所见,这次该方法根本没有请求参数。相反,它获取了一个RouteGuide_RecordRouteServer流,服务器可以使用该流来读取和写入消息。

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
  var pointCount, featureCount, distance int32
  var lastPoint *pb.Point
  startTime := time.Now()
  for {
    point, err := stream.Recv()
    if err == io.EOF {
      endTime := time.Now()
      return stream.SendAndClose(&pb.RouteSummary{
        PointCount:   pointCount,
        FeatureCount: featureCount,
        Distance:     distance,
        ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
      })
    }
    if err != nil {
      return err
    }
    pointCount++
    for _, feature := range s.savedFeatures {
      if proto.Equal(feature.Location, point) {
        featureCount++
      }
    }
    if lastPoint != nil {
      distance += calcDistance(lastPoint, point)
    }
    lastPoint = point
  }
}

在方法体中,我们使用RouteGuide_RecordRouteServerRecv()方法重复读取客户端的请求到一个请求对象(在本例中是Point),直到没有更多的消息为止:服务器需要在每次调用后检查Recv()返回的错误。如果返回的错误为nil,则表示流仍然有效,可以继续读取;如果为io.EOF,则表示消息流已结束,服务器可以返回其RouteSummary。如果返回的错误是其他值,则我们将其“原样”返回,以便由gRPC层将其转换为RPC状态。

双向流式RPC

最后,让我们看一下我们的双向流式传输RPC RouteChat()。

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      return nil
    }
    if err != nil {
      return err
    }
    key := serialize(in.Location)
    ... // 寻找要发送给客户端的注释
    for _, note := range s.routeNotes[key] {
      if err := stream.Send(note); err != nil {
        return err
      }
    }
  }
}

这次我们得到了一个 RouteGuide_RouteChatServer 流,就像我们的客户端流式传输示例中一样,可以用来读写消息。然而,这次我们通过方法的流式传输返回值,而客户端仍然在向其消息流写入消息。

在这里,读写的语法与客户端流式传输方法非常相似,不同之处在于服务器使用流的 Send() 方法而不是 SendAndClose() 方法,因为它正在写入多个响应。尽管每一方始终按照它们被写入的顺序获得另一方的消息,但客户端和服务器都可以按任意顺序读写 - 这些流完全独立运行。

启动服务器

一旦我们实现了所有的方法,我们还需要启动一个gRPC服务器,这样客户端才能真正使用我们的服务。以下代码片段显示了我们如何为我们的RouteGuide服务执行此操作:

flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
if err != nil {
  log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
...
grpcServer := grpc.NewServer(opts...)
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)

要构建和启动服务器,我们需要:

  • 使用net.Listen(...)指定要用于监听客户端请求的端口。
  • 使用grpc.NewServer(...)创建一个gRPC服务器实例。
  • 使用pb.RegisterRouteGuideServer(grpcServer, newServer())将我们的服务实现注册到gRPC服务器。
  • 调用Serve()在服务器上使用我们的端口详情进行阻塞等待,直到进程被终止或调用Stop()

创建客户端

在本节中,我们将看一下如何为我们的RouteGuide服务创建一个Go客户端。你可以在grpc-go/examples/route_guide/client/client.go中看到我们完整的示例客户端代码。

创建存根

要调用服务方法,我们首先需要创建一个gRPC通道以与服务器通信。我们通过将服务器地址和端口号传递给grpc.Dial()来创建这个通道:

var opts []grpc.DialOption
...
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
  ...
}
defer conn.Close()

可以在grpc.Dial中使用DialOptions来设置认证凭据(例如TLS、GCE凭据或JWT凭据),当服务需要时。RouteGuide服务不需要任何凭证。

一旦设置了gRPC通道,我们需要一个客户端存根来执行RPC。我们通过pb包生成的NewRouteGuideClient方法获得它。

client := pb.NewRouteGuideClient(conn)

调用服务方法

现在让我们看一下如何调用我们的服务方法。请注意,在gRPC-Go中,RPC以阻塞/同步模式运行,这意味着RPC调用会等待服务器响应,并且会返回响应或错误。

简单RPC

调用简单的RPC GetFeature几乎与调用本地方法一样简单。

feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
  ...
}

正如你所看到的,我们在之前获得的存根上调用方法。在方法参数中,我们创建并填充一个请求协议缓冲区对象(在我们的例子中为Point)。我们还传递了一个context.Context对象,它允许我们在必要时更改我们的RPC的行为,例如超时/取消正在进行的RPC。如果调用没有返回错误,则可以从第一个返回值中读取服务器的响应信息。

log.Println(feature)

服务器端流式传输RPC

这是我们调用服务器端流式方法ListFeatures的地方,它返回地理Feature的流。如果你已经阅读了创建服务器的部分,这可能看起来很熟悉 - 流式RPC在双方都实现了类似的方式。

rect := &pb.Rectangle{ ... }  // 初始化pb.Rectangle
stream, err := client.ListFeatures(context.Background(), rect)
if err != nil {
  ...
}
for {
    feature, err := stream.Recv()
    if err == io.EOF {
        break
    }
    if err != nil {
        log.Fatalf("%v.ListFeatures(_) = _, %v", client, err)
    }
    log.Println(feature)
}

与简单的RPC一样,我们向方法传递一个上下文和一个请求。然而,不同于获取响应对象,这次我们得到了一个RouteGuide_ListFeaturesClient的实例。客户端可以使用RouteGuide_ListFeaturesClient流来读取服务器的响应。

我们使用RouteGuide_ListFeaturesClientRecv()方法来重复地将服务器的响应读入到响应协议缓冲区对象(在这种情况下为Feature)中,直到没有更多的消息为止:客户端在每次调用后都需要检查从Recv()返回的错误err。如果为nil,则流仍然有效,可以继续读取; 如果是io.EOF,则消息

流已结束; 否则必须有一个RPC错误,该错误通过err传递。

客户端流式传输RPC

客户端流式传输方法RecordRoute与服务器端方法类似,除了我们只传递上下文给方法,并返回一个RouteGuide_RecordRouteClient流,我们可以使用它来同时写入和读取消息。

// 创建随机数量的随机点
r := rand.New(rand.NewSource(time.Now().UnixNano()))
pointCount := int(r.Int31n(100)) + 2 // 遍历至少两个点
var points []*pb.Point
for i := 0; i < pointCount; i++ {
  points = append(points, randomPoint(r))
}
log.Printf("Traversing %d points.", len(points))
stream, err := client.RecordRoute(context.Background())
if err != nil {
  log.Fatalf("%v.RecordRoute(_) = _, %v", client, err)
}
for _, point := range points {
  if err := stream.Send(point); err != nil {
    log.Fatalf("%v.Send(%v) = %v", stream, point, err)
  }
}
reply, err := stream.CloseAndRecv()
if err != nil {
  log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
}
log.Printf("Route summary: %v", reply)

RouteGuide_RecordRouteClient具有一个Send()方法,我们可以使用它来向服务器发送请求。一旦我们使用Send()将客户端的请求写入流中,我们需要在流上调用CloseAndRecv()来让gRPC知道我们已经完成了写入,并且正在等待接收一个响应。我们从从CloseAndRecv()返回的err中获得我们的RPC状态。如果状态是nil,则CloseAndRecv()的第一个返回值将是一个有效的服务器响应。

双向流式传输RPC

最后,让我们来看一下我们的双向流式传输RPC RouteChat()。与RecordRoute的情况类似,我们只传递一个上下文对象给方法,并返回一个流,我们可以使用它来同时写入和读取消息。但是,这次我们通过方法的流式传输返回值,而服务器在向其消息流写入消息时。

stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
  for {
    in, err := stream.Recv()
    if err == io.EOF {
      // read done.
      close(waitc)
      return
    }
    if err != nil {
      log.Fatalf("Failed to receive a note : %v", err)
    }
    log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
  }
}()
for _, note := range notes {
  if err := stream.Send(note); err != nil {
    log.Fatalf("Failed to send a note: %v", err)
  }
}
stream.CloseSend()
<-waitc

在这里,读写的语法与客户端流式传输方法非常相似,不同之处在于我们在完成调用后使用流的CloseSend()方法。尽管每一方始终按照它们被写入的顺序获得另一方的消息,但客户端和服务器都可以按任意顺序读写 - 这些流完全独立运行。

尝试一下!

从examples/route_guide目录中执行以下命令:

运行服务器:

$ go run server/server.go

从另一个终端运行客户端:

$ go run client/client.go

你将看到类似于以下内容的输出:

Getting feature for point (409146138, -746188906)
name:"Berkshire Valley Management Area Trail, Jefferson, NJ, USA" location:<latitude:409146138 longitude:-746188906 >
Getting feature for point (0, 0)
location:<>
Looking for features within lo:<latitude:400000000 longitude:-750000000 > hi:<latitude:420000000 longitude:-730000000 >
name:"Patriots Path, Mendham, NJ 07945, USA" location:<latitude:407838351 longitude:-746143763 >
...
name:"3 Hasta Way, Newton, NJ 07860, USA" location:<latitude:410248224 longitude:-747127767 >
Traversing 56 points.
Route summary: point_count:56 distance:497013163
Got message First message at point(0, 1)
Got message Second message at point(0, 2)
Got message Third message at point(0, 3)
Got message First message at point(0, 1)
Got message Fourth message at point(0, 1)
Got message Second message at point(0, 2)
Got message Fifth message at point(0, 2)
Got message Third message at point(0, 3)
Got message Sixth message at point(0, 3)

孟斯特

声明:本作品采用署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)进行许可,使用时请注明出处。
Author: mengbin
blog: mengbin
Github: mengbin92
cnblogs: 恋水无意