[Update]优化连接状态接口

This commit is contained in:
chenfufeng
2022-05-18 16:26:18 +08:00
parent 7c4a07efee
commit 5bbb623edd
6 changed files with 37 additions and 50 deletions

View File

@@ -36,26 +36,26 @@ PASSWORD=xintai2018
RELEASE=true
# AI CLOUD 云平台
# 工具类
MOGO_UTILS_VERSION=1.3.56
MOGO_UTILS_VERSION=1.3.57
# 网络请求
MOGO_NETWORK_VERSION=1.3.56
MOGO_NETWORK_VERSION=1.3.57
# 网络DNS
MOGO_HTTPDNS_VERSION=1.3.56
MOGO_HTTPDNS_VERSION=1.3.57
# 鉴权
MOGO_PASSPORT_VERSION=1.3.56
MOGO_PASSPORT_VERSION=1.3.57
# 常链接
MOGO_SOCKET_VERSION=1.3.56
MOGO_SOCKET_VERSION=1.3.57
# 数据采集
MOGO_REALTIME_VERSION=1.3.56
MOGO_REALTIME_VERSION=1.3.57
# 探路,道路事件发布,获取
MOGO_TANLU_VERSION=1.3.56
MOGO_TANLU_VERSION=1.3.57
# 直播推流
MOGO_LIVE_VERSION=1.3.56
MOGO_LIVE_VERSION=1.3.57
# 直播拉流
MOGO_TRAFFICLIVE_VERSION=1.3.56
MOGO_TRAFFICLIVE_VERSION=1.3.57
# 定位服务
MOGO_LOCATION_VERSION=1.3.56
MOGO_LOCATION_VERSION=1.3.57
# 远程通讯模块
MOGO_TELEMATIC_VERSION=1.3.56
MOGO_TELEMATIC_VERSION=1.3.57
# v2x
MOGO_V2X_VERSION=1.3.56
MOGO_V2X_VERSION=1.3.57

View File

@@ -243,7 +243,7 @@ public class NSDNettyManager {
if (!mNettyTcpClient.getConnectStatus()) {
mNettyTcpClient.connect();//连接服务器
} else {
mNettyTcpClient.disconnect();
Logger.d(TAG, "Client is connected.");
}
}

View File

@@ -146,7 +146,7 @@ public class NettyTcpClient {
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ReconnectHandler(NettyTcpClient.this));
ch.pipeline().addLast(new ReconnectHandler(NettyTcpClient.this, mProxyListener));
if (isSendheartBeat) {
ch.pipeline().addLast("ping", new IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS));//5s未发送数据回调userEventTriggered
}
@@ -190,6 +190,10 @@ public class NettyTcpClient {
ChannelFuture channelFuture;
try {
if (bootstrap != null) {
if (group == null || group.isShutdown()) {
group = new NioEventLoopGroup();
bootstrap.group(group);
}
channelFuture = bootstrap.connect(host, tcp_port);
channel = channelFuture.channel();
channelFuture.addListener(mFutureListener);

View File

@@ -75,16 +75,6 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocol
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_SUCCESS, mSign, ctx.channel());
}
/**
* <p>客户端下线</p>
*
* @param ctx ChannelHandlerContext
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
Logger.e(TAG, "channelInactive");
}
/**
* 客户端收到消息
*
@@ -102,16 +92,4 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocol
}
}
}
/**
* @param ctx ChannelHandlerContext
* @param cause 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Logger.e(TAG, "NettyClientHandler#exceptionCaught is:" + cause.getMessage());
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, mSign, ctx.channel());
cause.printStackTrace();
ctx.close();
}
}

View File

@@ -2,23 +2,23 @@ package com.mogo.telematic.client.handler
import com.elegant.log.simplelog.Logger
import com.mogo.telematic.client.NettyTcpClient
import io.netty.channel.ChannelInboundHandlerAdapter
import com.mogo.telematic.client.listener.NettyClientListener
import com.mogo.telematic.client.status.ConnectState
import com.mogo.telematic.client.status.ConnectState.STATUS_CONNECT_CLOSED
import io.netty.channel.ChannelHandlerContext
import kotlin.Throws
import java.lang.Exception
import java.util.concurrent.Executors
import io.netty.channel.ChannelInboundHandlerAdapter
import java.util.concurrent.TimeUnit
import kotlin.math.max
class ReconnectHandler(private val mNettyClient: NettyTcpClient) : ChannelInboundHandlerAdapter() {
class ReconnectHandler(
private val mNettyClient: NettyTcpClient,
private val listener: NettyClientListener<*>?
) : ChannelInboundHandlerAdapter() {
private val maxRetries = Int.MAX_VALUE
private var retries = 0
private val baseSleepTimeMs = 3000
private val maxSleepMs = 10 * 1000
private val scheduledThreadPool by lazy(LazyThreadSafetyMode.SYNCHRONIZED) {
Executors.newScheduledThreadPool(1)
}
companion object {
private const val TAG = "ReconnectHandler"
@@ -32,14 +32,20 @@ class ReconnectHandler(private val mNettyClient: NettyTcpClient) : ChannelInboun
@Throws(Exception::class)
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
ctx.close()
Logger.e(TAG, "ReconnectHandler#exceptionCaught cause message is:${cause.message}")
listener?.onClientStatusConnectChanged(
ConnectState.STATUS_CONNECT_ERROR,
cause.message,
ctx.channel()
)
cause.printStackTrace()
ctx.close()
}
@Throws(Exception::class)
override fun channelInactive(ctx: ChannelHandlerContext) {
Logger.d(TAG, "ReconnectHandler channelInactive ...")
// TODO:(策略暂时耦合在这里,后面使用策略模式)
listener?.onClientStatusConnectChanged(STATUS_CONNECT_CLOSED, "channelInactive", ctx.channel())
var allowRetry = false
if (retries < maxRetries) {
allowRetry = true
@@ -55,8 +61,7 @@ class ReconnectHandler(private val mNettyClient: NettyTcpClient) : ChannelInboun
sleepTimeMs = maxSleepMs.toLong()
}
Logger.d(TAG, "${sleepTimeMs}ms后执行重连操作")
scheduledThreadPool.schedule({
// 异步重连或单独线程池中同步重连不要阻塞netty的io线程同时也不要关闭EventLoopGroup
ctx.channel().eventLoop().schedule({
mNettyClient.reconnectServer()
}, sleepTimeMs, TimeUnit.MILLISECONDS)
++retries

View File

@@ -18,7 +18,7 @@ public interface NettyClientListener<T> {
/**
* 当服务状态发生变化时触发
* @param statusCode 状态变化
* @param sign tcp 客户端的标识,因为一个应用程序可能有很多个长链接
* @param content
*/
void onClientStatusConnectChanged(int statusCode, String sign, Channel channel);
void onClientStatusConnectChanged(int statusCode, String content, Channel channel);
}