文章目录
- 结论
- Client端KeepAlive
- 使用入口
- 简要时序列表
- Server端KeepAlive
- 使用入口
- 简要时序列表
- KeepAliveEnforcer
基于java gRPC 1.24.2 分析
结论
- gRPC keepAlive是grpc框架在应用层面连接保活的一种措施。即当grpc连接上没有业务数据时,是否发送pingpong,以保持连接活跃性,不因长时间空闲而被Server或操作系统关闭
- gRPC keepAlive在client与server都有,client端默认关闭(keepAliveTime为Long.MAX_VALUE), server端默认打开,keepAliveTime为2小时,即每2小时向client发送一次ping
// io.grpc.internal.GrpcUtil
public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
- KeepAlive的管理使用类
io.grpc.internal.KeepAliveManager
, 用于管理KeepAlive状态,ping任务调度与执行.
Client端KeepAlive
使用入口
- 我们在使用io.grpc框架创建grpc连接的时候,可以设置keeplive, 例如下面:
NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) //.usePlaintext() //.defaultLoadBalancingPolicy(props.getBalancePolicy()) //.maxInboundMessageSize(props.getMaxInboundMessageSize()) //.keepAliveTime(1,TimeUnit.MINUTES).keepAliveWithoutCalls(true).keepAliveTimeout(10,TimeUnit.SECONDS).intercept(channelManager.getInterceptors()); //
- 其中与keepAlive相关的参数有三个,
keepAliveTime
,keepAliveTimeout
,keepAliveWithoutCalls
。这三个变量有什么作用呢?
- keepAliveTime: 表示当grpc连接没有数据传递时,多久之后开始向server发送ping packet
- keepAliveTimeout: 表示当发送完ping packet后多久没收到server回应算超时
- keepAliveTimeoutCalls: 表示如果grpc连接没有数据传递时,是否keepAlive,默认为false
简要时序列表
Create & Start
NettyChannelBuilder-----> NettyTransportFactory---------> NettyClientTransport-------------> KeepAliveManager & NettyClientHandler
响应各种事件
当Active、Idle、DataReceived、Started、Termination事件发生时,更改KeepAlive状态,调度发送ping任务。
Server端KeepAlive
使用入口
// 只截取关键代码,详细代码请看`NettyServerBuilder`
ServerImpl server = new ServerImpl(this,buildTransportServers(getTracerFactories()),Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {notifyTarget.notifyOnBuild(server);
}
return server;// 在buildTransportServers方法中创建NettyServer
List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
for (SocketAddress listenAddress : listenAddresses) {NettyServer transportServer = new NettyServer(listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,workerEventLoopGroupPool, negotiator, streamTracerFactories,getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());transportServers.add(transportServer);
}
简要时序列表
Create & Start
NettyServerBuilder---> NettyServer---------> NettyServerTransport-------------> NettyServerHandler-----------------> KeepAliveEnforcer
连接准备就绪
调用 io.netty.channel.ChannelHandler的handlerAdded
方法,关于此方法的描述:
Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
NettyServerHandler(handlerAdded)---> 创建KeepAliveManager对象
响应各种事件
同Client
KeepAliveEnforcer
在上面Server端的简要时序图中,可以看见,server端有一个特有的io.grpc.netty.KeepAliveEnforcer
类
此类的作用是监控clinet ping的频率,以确保其在一个合理范围内。
package io.grpc.netty;import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;/** Monitors the client's PING usage to make sure the rate is permitted. */
class KeepAliveEnforcer {@VisibleForTestingstatic final int MAX_PING_STRIKES = 2;@VisibleForTestingstatic final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2);private final boolean permitWithoutCalls;private final long minTimeNanos;private final Ticker ticker;private final long epoch;private long lastValidPingTime;private boolean hasOutstandingCalls;private int pingStrikes;public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) {this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE);}@VisibleForTestingKeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) {Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative");this.permitWithoutCalls = permitWithoutCalls;this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS);this.ticker = ticker;this.epoch = ticker.nanoTime();lastValidPingTime = epoch;}/** Returns {@code false} when client is misbehaving and should be disconnected. */@CheckReturnValuepublic boolean pingAcceptable() {long now = ticker.nanoTime();boolean valid;if (!hasOutstandingCalls && !permitWithoutCalls) {valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0;} else {valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0;}if (!valid) {pingStrikes++;return !(pingStrikes > MAX_PING_STRIKES);} else {lastValidPingTime = now;return true;}}/*** Reset any counters because PINGs are allowed in response to something sent. Typically called* when sending HEADERS and DATA frames.*/public void resetCounters() {lastValidPingTime = epoch;pingStrikes = 0;}/** There are outstanding RPCs on the transport. */public void onTransportActive() {hasOutstandingCalls = true;}/** There are no outstanding RPCs on the transport. */public void onTransportIdle() {hasOutstandingCalls = false;}/*** Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important* to use something like this instead of directly comparing nano times. See {@link* System#nanoTime}.*/private static long compareNanos(long time1, long time2) {// Possibility of overflow/underflow is on purpose and necessary for correctnessreturn time1 - time2;}@VisibleForTestinginterface Ticker {long nanoTime();}@VisibleForTestingstatic class SystemTicker implements Ticker {public static final SystemTicker INSTANCE = new SystemTicker();@Overridepublic long nanoTime() {return System.nanoTime();}}
}
- 先来看
pingAcceptable
方法,此方法是判断是否接受client ping。
lastValidPingTime
是上次client valid ping的时间, 连接建立时此时间等于KeepAliveEnforcer对象创建的时间。当client ping有效时,其等于当时ping的时间hasOutstandingCalls
其初始值为false,当连接activie时,其值为true,当连接idle时,其值为false。如果grpc调用为阻塞时调用,则调用时连接变为active,调用完成,连接变为idle.permitWithoutCalls
其值是创建NettyServer时传入,默认为false.IMPLICIT_PERMIT_TIME_NANOS
其值为常量,2hminTimeNanos
其值是创建NettyServer时传入,默认为5min.MAX_PING_STRIKES
其值为常量2
resetCounters
方法是当grpc当中有数据时会被调用,即有grpc调用时lastValidPingTime和pingStrikes会被重置。- 如果client要想使用keepAlive,
permitWithoutCalls
值需要设置为true,而且client keepAliveTime需要>=minTimeNanos