我们写一个grpc服务的时候,grpc 服务注册流程如下
baseServer = grpc.NewServer
xxpb.RegisterxxServiceServer(baseServer, xxServer)
下面我们以健康检查为例,分析下服务注册的逻辑
func RegisterHealthServer(s grpc.ServiceRegistrar, srv HealthServer) {
s.RegisterService(&Health_ServiceDesc, srv)
}
其中Health_ServiceDesc是一个全局变量,它存储了服务的元数据:
var Health_ServiceDesc = grpc.ServiceDesc{
ServiceName: "grpc.health.v1.Health",
HandlerType: (*HealthServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Check",
Handler: _Health_Check_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Watch",
Handler: _Health_Watch_Handler,
ServerStreams: true,
},
},
Metadata: "grpc/health/v1/health.proto",
}
服务注册最终调用了生成的服务端代码的注册函数:
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
s.register(sd, ss)
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
for i := range sd.Streams {
d := &sd.Streams[i]
info.streams[d.StreamName] = d
s.services[sd.ServiceName] = info
可看到,注册的过程就是建立元数据到具体实现函数的映射。这个过程和我们写http服务的路由注册流程类似。注册完成后就开始进行服务的监听。
baseServer.Serve(grpcListener)
它的实现和golang的http包的Serve实现类似:
func (s *Server) Serve(lis net.Listener) error
s.serveWG.Add(1)
for {
rawConn, err := lis.Accept()
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
st := s.newHTTP2Transport(rawConn)
if !s.addConn(lisAddr, st) {
s.serveStreams(st)
s.removeConn(lisAddr, st)
最终调用了ServeHTTP方法
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
s.serveStreams(st)
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
google.golang.org/grpc@v1.45.0/server.go
handleStream的过程就是根据请求信息,在我们前面注册的map里面取出对应的处理方法,进行逻辑的处理。
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
srv, knownService := s.services[service]
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
sh.HandleRPC(stream.Context(), statsBegin)