执行DubboProtocol.export会执行createServer方法创建dubbo 服务监听线程。最后由HeaderExchanger.bind完成。Transporters.bind()完成dubbo 服务监听线程的创建, HeaderExchangeServer负责心跳线程的创建
1 2 3 4 |
ExchangeServer HeaderExchanger.bind(URL url, ExchangeHandler handler) { return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); } |
HeaderExchangeServer通过单线程的ScheduledExecutorService来完成心跳事件触发
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public class HeaderExchangeServer { private final ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1, new NamedThreadFactory("dubbo-remoting-server-heartbeat",true)); void HeaderExchangeServer.startHeatbeatTimer() { stopHeartbeatTimer(); if (heartbeat > 0) { heatbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask( new HeartBeatTask.ChannelProvider() { public Collection getChannels() { return Collections.unmodifiableCollection( HeaderExchangeServer.this.getChannels() ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat,TimeUnit.MILLISECONDS); } } } |
ScheduledExecutorService 检测和对方最后一次通讯操作(无论是写还是读)是否超过了心跳检测的时间间隔,超过了就发送HEARTBEAT_EVENT 事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
void HeartBeatTask.run(){ Long lastRead = ( Long ) channel.getAttribute( HeaderExchangeHandler.KEY_READ_TIMESTAMP ); Long lastWrite = ( Long ) channel.getAttribute( HeaderExchangeHandler.KEY_WRITE_TIMESTAMP ); if ( ( lastRead != null && now - lastRead > heartbeat ) || ( lastWrite != null && now - lastWrite > heartbeat ) ) { Request req = new Request(); req.setVersion( "2.0.0" ); req.setTwoWay( true ); req.setEvent( Request.HEARTBEAT_EVENT ); channel.send( req ); } } |
HeartbeatHandler完成心跳事件接收的操作, 接收到心跳请求消息和心跳响应消息的逻辑如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
void HeartbeatHandler.received(Channel channel, Object message) throws RemotingException{ setReadTimestamp(channel); if (isHeartbeatRequest(message)) { //收到心跳的检测消息,如果是需要确认,那么回复之。 Request req = (Request) message; if (req.isTwoWay()) { Response res = new Response(req.getId(), req.getVersion()); res.setEvent(Response.HEARTBEAT_EVENT); channel.send(res); } return; } if (isHeartbeatResponse(message)) { //收到心跳响应消息,日志记录后直接丢弃, return; } handler.received(channel, message); } |
Posted in: MySQL practise
Comments are closed.