go run client.go from stream server answer: the 1 question is stream client rpc 0 from stream server answer: the 2 question is stream client rpc 1 from stream server answer: the 3 question is stream client rpc 2 from stream server answer: the 4 question is stream client rpc 3 from stream server answer: the 5 question is stream client rpc 4
服务端获取到来自客户端的提问
1 2 3 4 5
from stream client question: stream client rpc 0 from stream client question: stream client rpc 1 from stream client question: stream client rpc 2 from stream client question: stream client rpc 3 from stream client question: stream client rpc 4#
二、分析
1. server端启动流程
1.1 构建本地监听端口
1 2 3 4
lis, err := net.Listen("tcp", "127.0.0.1:8001") if err != nil { log.Fatalf("failed to listen: %v", err) }
// NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. // 创建一个新的server,该server还没有注册服务,并且没有接受请求 funcNewServer(opt ...ServerOption) *Server { //把默认配置放到入参中 opts := defaultServerOptions for _, o := range extraServerOptions { o.apply(&opts) } for _, o := range opt { o.apply(&opts) } // 构造Server实例 s := &Server{ lis: make(map[net.Listener]bool), opts: opts, conns: make(map[string]map[transport.ServerTransport]bool), services: make(map[string]*serviceInfo), quit: grpcsync.NewEvent(), done: grpcsync.NewEvent(), czData: new(channelzData), } //chains all unary server interceptors into one. chainUnaryServerInterceptors(s) //chains all stream server interceptors into one. chainStreamServerInterceptors(s) s.cv = sync.NewCond(&s.mu) // 判断是否开启链路追踪 if EnableTracing { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) }
if s.opts.numServerWorkers > 0 { s.initServerWorkers() }
// RegisterService registers a service and its implementation to the gRPC // server. It is called from the IDL generated code. This must be called before // invoking Serve. If ss is non-nil (for legacy code), its type is checked to // ensure it implements sd.HandlerType. func(s *Server)RegisterService(sd *ServiceDesc, ss interface{}) { if ss != nil { ht := reflect.TypeOf(sd.HandlerType).Elem() st := reflect.TypeOf(ss) if !st.Implements(ht) { logger.Fatalf("grpc: Server.RegisterService found the handler of type %v that does not satisfy %v", st, ht) } } s.register(sd, ss) }
func(s *Server)register(sd *ServiceDesc, ss interface{}) { s.mu.Lock() defer s.mu.Unlock() s.printf("RegisterService(%q)", sd.ServiceName) if s.serve { logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) } if _, ok := s.services[sd.ServiceName]; ok { logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) } info := &serviceInfo{ serviceImpl: ss, methods: make(map[string]*MethodDesc), streams: make(map[string]*StreamDesc), mdata: sd.Metadata, } for i := range sd.Methods { d := &sd.Methods[i] info.methods[d.MethodName] = d } for i := range sd.Streams { d := &sd.Streams[i] info.streams[d.StreamName] = d } s.services[sd.ServiceName] = info }
1.4 注册反射服务
1 2 3 4 5 6 7 8
// 往grpc服务端注册反射服务 reflection.Register(s)
// Register registers the server reflection service on the given gRPC server. funcRegister(s GRPCServer) { svr := NewServer(ServerOptions{Services: s}) v1alphagrpc.RegisterServerReflectionServer(s, svr) }
// 启动grpc服务 if err := s.Serve(lis); err != nil { log.Fatalf("failed to serve: %v", err) }
// Serve accepts incoming connections on the listener lis, creating a new // ServerTransport and service goroutine for each. The service goroutines // read gRPC requests and then call the registered handlers to reply to them. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when // this method returns. // Serve will return a non-nil error unless Stop or GracefulStop is called. func(s *Server)Serve(lis net.Listener)error { s.mu.Lock() s.printf("serving") s.serve = true if s.lis == nil { // Serve called after Stop or GracefulStop. s.mu.Unlock() lis.Close() return ErrServerStopped }
s.serveWG.Add(1) deferfunc() { s.serveWG.Done() if s.quit.HasFired() { // Stop or GracefulStop called; block until done and return nil. <-s.done.Done() } }()
ls := &listenSocket{Listener: lis} s.lis[ls] = true
var tempDelay time.Duration // how long to sleep on accept failure for { rawConn, err := lis.Accept() if err != nil { if ne, ok := err.(interface { Temporary() bool }); ok && ne.Temporary() { if tempDelay == 0 { tempDelay = 5 * time.Millisecond } else { tempDelay *= 2 } if max := 1 * time.Second; tempDelay > max { tempDelay = max } s.mu.Lock() s.printf("Accept error: %v; retrying in %v", err, tempDelay) s.mu.Unlock() timer := time.NewTimer(tempDelay) select { case <-timer.C: case <-s.quit.Done(): timer.Stop() returnnil } continue } s.mu.Lock() s.printf("done serving; Accept = %v", err) s.mu.Unlock()
if s.quit.HasFired() { returnnil } return err } tempDelay = 0 // Start a new goroutine to deal with rawConn so we don't stall this Accept // loop goroutine. // // Make sure we account for the goroutine so GracefulStop doesn't nil out // s.conns before this conn can be added. s.serveWG.Add(1) gofunc() { s.handleRawConn(lis.Addr().String(), rawConn) s.serveWG.Done() }() } }
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. funcnewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onPrefaceReceipt func(), onGoAwayfunc(GoAwayReason), onClosefunc()) (_ *http2Client, err error) { ... if t.keepaliveEnabled { t.kpDormancyCond = sync.NewCond(&t.mu) go t.keepalive() } ... }
func(t *http2Client)keepalive() { p := &ping{data: [8]byte{}} //ping 的内容 timer := time.NewTimer(t.kp.Time) // 启动一个定时器, 触发时间为配置的 Time 值 //for loop for { select { // 定时器触发 case <-timer.C: if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { timer.Reset(t.kp.Time) continue } // Check if keepalive should go dormant. t.mu.Lock() iflen(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { // Make awakenKeepalive writable. <-t.awakenKeepalive t.mu.Unlock() select { case <-t.awakenKeepalive: // If the control gets here a ping has been sent // need to reset the timer with keepalive.Timeout. case <-t.ctx.Done(): return } } else { t.mu.Unlock() if channelz.IsOn() { atomic.AddInt64(&t.czData.kpCount, 1) } // Send ping. t.controlBuf.put(p) }
// By the time control gets here a ping has been sent one way or the other. timer.Reset(t.kp.Timeout) select { case <-timer.C: if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { timer.Reset(t.kp.Time) continue } t.Close() return case <-t.ctx.Done(): if !timer.Stop() { <-timer.C } return } // 上层通知 context 结束 case <-t.ctx.Done(): if !timer.Stop() { // 返回 false,表示 timer 未被销毁 <-timer.C } return } }
var kaep = keepalive.EnforcementPolicy{ MinTime: 5 * time.Second, // If a client pings more than once every 5 seconds, terminate the connection PermitWithoutStream: true, // Allow pings even when there are no active streams }
var kasp = keepalive.ServerParameters{ MaxConnectionIdle: 15 * time.Second, // If a client is idle for 15 seconds, send a GOAWAY MaxConnectionAge: 30 * time.Second, // If any connection is alive for more than 30 seconds, send a GOAWAY MaxConnectionAgeGrace: 5 * time.Second, // Allow 5 seconds for pending RPCs to complete before forcibly closing connections Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead }
funcmain(){ ... s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) ... }
now := time.Now() deferfunc() { t.lastPingAt = now }() // A reset ping strikes means that we don't need to check for policy // violation for this ping and the pingStrikes counter should be set // to 0. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) { t.pingStrikes = 0 return } t.mu.Lock() ns := len(t.activeStreams) t.mu.Unlock() if ns < 1 && !t.kep.PermitWithoutStream { // Keepalive shouldn't be active thus, this new ping should // have come after at least defaultPingTimeout. if t.lastPingAt.Add(defaultPingTimeout).After(now) { t.pingStrikes++ } } else { // Check if keepalive policy is respected. if t.lastPingAt.Add(t.kep.MinTime).After(now) { t.pingStrikes++ } }
if t.pingStrikes > maxPingStrikes { // Send goaway and close the connection. if logger.V(logLevel) { logger.Errorf("transport: Got too many pings from the client, closing the connection.") } t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: true}) } }
注意,对 pingStrikes 累加的逻辑:
t.lastPingAt.Add(defaultPingTimeout).After(now):
t.lastPingAt.Add(t.kep.MinTime).After(now):
gRPC 服务端新建一个 HTTP2 server 的时候会启动一个单独的 goroutine 处理 keepalive 逻辑,newHTTP2Server 方法:
func (t *http2Server) keepalive() { p := &ping{} var pingSent bool maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) maxAge := time.NewTimer(t.kp.MaxConnectionAge) keepalive := time.NewTimer(t.kp.Time) // NOTE: All exit paths of this function should reset their // respective timers. A failure to do so will cause the // following clean-up to deadlock and eventually leak. defer func() { // 退出前,完成定时器的回收工作 if !maxIdle.Stop() { <-maxIdle.C } if !maxAge.Stop() { <-maxAge.C } if !keepalive.Stop() { <-keepalive.C } }() for { select { case <-maxIdle.C: t.mu.Lock() idle := t.idle if idle.IsZero() { // The connection is non-idle. t.mu.Unlock() maxIdle.Reset(t.kp.MaxConnectionIdle) continue } val := t.kp.MaxConnectionIdle - time.Since(idle) t.mu.Unlock() if val <= 0 { // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. // Gracefully close the connection. t.drain(http2.ErrCodeNo, []byte{}) // Resetting the timer so that the clean-up doesn't deadlock. maxIdle.Reset(infinity) return } maxIdle.Reset(val) case <-maxAge.C: t.drain(http2.ErrCodeNo, []byte{}) maxAge.Reset(t.kp.MaxConnectionAgeGrace) select { case <-maxAge.C: // Close the connection after grace period. t.Close() // Resetting the timer so that the clean-up doesn't deadlock. maxAge.Reset(infinity) case <-t.ctx.Done(): } return case <-keepalive.C: if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { pingSent = false keepalive.Reset(t.kp.Time) continue } if pingSent { t.Close() // Resetting the timer so that the clean-up doesn't deadlock. keepalive.Reset(infinity) return } pingSent = true if channelz.IsOn() { atomic.AddInt64(&t.czData.kpCount, 1) } t.controlBuf.put(p) keepalive.Reset(t.kp.Timeout) case <-t.ctx.Done(): return } } }