[fix]解决心跳包协议不对导致的断开连接的问题

This commit is contained in:
chenfufeng
2023-03-20 19:43:19 +08:00
parent e091310d93
commit ea6d3d3479
9 changed files with 98 additions and 84 deletions

View File

@@ -28,7 +28,7 @@ dependencies {
implementation fileTree(dir: "libs", include: ["*.jar"])
api 'com.google.protobuf:protobuf-java:3.12.4'
implementation 'io.netty:netty-all:4.1.8.Final'
implementation 'io.netty:netty-all:4.1.29.Final'
api 'com.zhidao.ptech:connsvr-protoco:0.1.37'
api 'com.elegant.network:network:1.1.28'
api 'com.elegant.analytics:analytics:1.1.28'

View File

@@ -36,24 +36,24 @@ PASSWORD=xintai2018
RELEASE=true
# AI CLOUD 云平台
# 工具类
MOGO_UTILS_VERSION=1.4.5.12
MOGO_UTILS_VERSION=1.4.6.4
# 网络请求
MOGO_NETWORK_VERSION=1.4.5.12
MOGO_NETWORK_VERSION=1.4.6.4
# 鉴权
MOGO_PASSPORT_VERSION=1.4.5.12
MOGO_PASSPORT_VERSION=1.4.6.4
# 常链接
MOGO_SOCKET_VERSION=1.4.5.12
MOGO_SOCKET_VERSION=1.4.6.4
# 数据采集
MOGO_REALTIME_VERSION=1.4.5.12
MOGO_REALTIME_VERSION=1.4.6.4
# 探路,道路事件发布,获取
MOGO_TANLU_VERSION=1.4.5.12
MOGO_TANLU_VERSION=1.4.6.4
# 直播推流
MOGO_LIVE_VERSION=1.4.5.12
MOGO_LIVE_VERSION=1.4.6.4
# 直播拉流
MOGO_TRAFFICLIVE_VERSION=1.4.5.12
MOGO_TRAFFICLIVE_VERSION=1.4.6.4
# 定位服务
MOGO_LOCATION_VERSION=1.4.5.12
MOGO_LOCATION_VERSION=1.4.6.4
# 远程通讯模块
MOGO_TELEMATIC_VERSION=1.4.5.12
MOGO_TELEMATIC_VERSION=1.4.6.4
# v2x
MOGO_V2X_VERSION=1.4.5.12
MOGO_V2X_VERSION=1.4.6.4

View File

@@ -32,7 +32,7 @@ dependencies {
implementation fileTree(dir: "libs", include: ["*.jar"])
api rootProject.ext.dependencies.spi
implementation rootProject.ext.dependencies.kotlinstdlibjdk7
api 'io.netty:netty-all:4.1.8.Final'
api 'io.netty:netty-all:4.1.29.Final'
implementation rootProject.ext.dependencies.logger
if (Boolean.valueOf(RELEASE)) {

View File

@@ -1,10 +1,12 @@
package com.mogo.telematic;
import static com.mogo.telematic.client.status.ConnectState.STATUS_CONNECT_CLOSED;
import static com.mogo.telematic.client.status.ConnectState.STATUS_CONNECT_SUCCESS;
import android.content.Context;
import android.net.nsd.NsdServiceInfo;
import android.os.Looper;
import android.util.Log;
import com.elegant.log.simplelog.Logger;
import com.mogo.telematic.client.NettyTcpClient;
@@ -321,18 +323,12 @@ public class NSDNettyManager implements TelematicHandler.ITelematicListener {
}
private void realConnectServer(ArrayList<HostBean> hostList, String uuid) {
String hostAddress = "";
for (HostBean hostBean : hostList) {
if (mNettyTcpClient != null && (mNettyTcpClient.isConnecting() || mNettyTcpClient.getConnectStatus())) {
Logger.d(TAG, "当前乘客屏幕已经连接到了ip" + hostAddress + ",port为" + 1088);
return;
}
hostAddress = hostBean.ipAddress;
Logger.d(TAG, "NSD查询到指定服务器信息ip为" + hostAddress + ",port为" + 1088);
if (hostList == null || hostList.isEmpty()) return;
String hostAddress = hostList.get(0).ipAddress;
Logger.d(TAG, "查询到指定服务器信息ip为" + hostAddress + ",port为" + 1088);
//获取到指定的地址进行Netty的连接
connectNettyServer(hostAddress, 1088, uuid);
}
}
private void connectNettyServer(String serverAddress, int port, String sign) {
Logger.d(TAG, "connectNettyServer");
@@ -340,19 +336,19 @@ public class NSDNettyManager implements TelematicHandler.ITelematicListener {
Logger.e(TAG, "Netty Server的ip不能为空");
return;
}
if (mNettyTcpClient != null && (mNettyTcpClient.isConnecting() || mNettyTcpClient.isConnected())) {
return;
}
if (!isCanceled) {
TelematicHandler.removeDelay();
}
if (mNettyTcpClient == null) {
mNettyTcpClient = new NettyTcpClient.Builder()
.setHost(serverAddress) //设置服务端地址
.setTcpPort(port) //设置服务端端口号
.setMaxReconnectTimes(5) //设置最大重连次数
.setReconnectIntervalTime(5) //设置重连间隔时间。单位:秒
.setSendHeartBeat(true) //设置是否发送心跳
.setHeartBeatInterval(120) //设置心跳间隔时间。单位:秒
.setHeartBeatData(new MogoProtocolMsg(MogoProtocolMsg.HEART_DATA, 2, new byte[]{0x00, 0x00})) //设置心跳数据可以是String类型也可以是byte[],以后设置的为准
.setSign(sign) //设置客户端标识.(因为可能存在多个tcp连接)
.setSendHeartBeat(true)
.build();
mNettyTcpClient.setListener(new NettyClientListener() {
@Override
@@ -371,21 +367,15 @@ public class NSDNettyManager implements TelematicHandler.ITelematicListener {
// 开启重连倒计时
isCanceled = false;
TelematicHandler.delay(COUNTDOWN_TIME);
} else if (statusCode == STATUS_CONNECT_SUCCESS) {
TelematicHandler.removeDelay();
TelematicHandler.removeReconnect();
}
}
});
}
if (!mNettyTcpClient.getConnectStatus()) {
mNettyTcpClient.connect();//连接服务器
// 等待连接进行,当连接成功或者失败后复活继续轮训连接
synchronized (mNettyTcpClient) {
try {
mNettyTcpClient.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else {
Logger.d(TAG, "Client is connected.");
}
@@ -452,10 +442,19 @@ public class NSDNettyManager implements TelematicHandler.ITelematicListener {
@Override
public void delayCompleted() {
Log.e(TAG, "收到delayCompleted事件");
// 倒计时结束关闭客户端EventGroup重新扫描ip
TelematicHandler.removeDelay();
TelematicHandler.removeReconnect();
isCanceled = true;
disconnect();
searchAndConnectServer(mContext, mUuid, mIsTaxi, mClientListener);
}
@Override
public void reconnectServer() {
Log.e(TAG, "收到reconnectServer事件");
TelematicHandler.removeReconnect();
mNettyTcpClient.reconnectServer();
}
}

View File

@@ -7,6 +7,7 @@ import android.os.Message
object TelematicHandler {
private const val MSG_DELAY = 100
private const val MSG_RECONNECT_DELAY = 101
private val handlerThread by lazy(LazyThreadSafetyMode.SYNCHRONIZED) {
HandlerThread("TelematicHandler")
@@ -21,9 +22,14 @@ object TelematicHandler {
handler = object : Handler(handlerThread.looper) {
override fun handleMessage(msg: Message) {
super.handleMessage(msg)
if (msg.what == MSG_DELAY) {
when (msg.what) {
MSG_DELAY -> {
listener?.delayCompleted()
}
MSG_RECONNECT_DELAY -> {
listener?.reconnectServer()
}
}
}
}
}
@@ -38,12 +44,25 @@ object TelematicHandler {
handler?.sendEmptyMessageDelayed(MSG_DELAY, delayMillis)
}
@JvmStatic
fun delayReConnect(delayMillis: Long) {
removeReconnect()
handler?.sendEmptyMessageDelayed(MSG_RECONNECT_DELAY, delayMillis)
}
@JvmStatic
fun removeDelay() {
handler?.removeMessages(MSG_DELAY)
}
@JvmStatic
fun removeReconnect() {
handler?.removeMessages(MSG_RECONNECT_DELAY)
}
interface ITelematicListener {
fun delayCompleted()
fun reconnectServer()
}
}

View File

@@ -62,7 +62,7 @@ public class NettyTcpClient {
private int reconnectNum = MAX_CONNECT_TIMES;
private boolean isNeedReconnect = true;
private boolean isConnecting = false;
private volatile boolean isConnecting = false;
private long reconnectIntervalTime = 5000;
@@ -154,18 +154,18 @@ public class NettyTcpClient {
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ReconnectHandler(NettyTcpClient.this, mProxyListener));
if (isSendheartBeat) {
ch.pipeline().addLast(new IdleStateHandler(heartBeatInterval, heartBeatInterval, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(new HeartbeatHandler(mProxyListener));
}
ch.pipeline().addLast(new ReconnectHandler(mProxyListener));
// decoder
ch.pipeline().addLast(new MogoLengthFrameDecoder(MAX_FRAME_LENGTH,
LENGTH_FIELD_OFFSET,
LENGTH_FIELD_SIZE));
// encoder
ch.pipeline().addLast(new MogoMessageEncoder());
ch.pipeline().addLast(new NettyClientHandler(mProxyListener, mSign, isSendheartBeat, heartBeatData));
ch.pipeline().addLast(new NettyClientHandler(mProxyListener, mSign));
}
});
} catch (Exception e) {
@@ -190,6 +190,7 @@ public class NettyTcpClient {
}
private void doConnect() {
Log.d(TAG, "开始连接Server端");
if (channel != null && channel.isOpen() && channel.isActive()) {
Log.w(TAG, "Channel is active, no need to call start.");
return;
@@ -217,7 +218,6 @@ public class NettyTcpClient {
channel = null;
Log.e(TAG, "连接已经结束!");
});
Log.d(TAG, "连接获取channel已完成");
}
} catch (Exception e) {
Log.e(TAG, "连接Server ip:" + host + ",port:" + tcp_port + ",失败信息:" + e.getMessage());
@@ -274,14 +274,12 @@ public class NettyTcpClient {
Log.w(TAG, "Connection isn't done.");
isConnected = false;
}
synchronized (NettyTcpClient.this) {
NettyTcpClient.this.notifyAll();
}
};
public void disconnect() {
Log.e(TAG, "disconnect");
Log.e(TAG, "断开连接!");
isNeedReconnect = false;
isConnecting = false;
isConnected = false;
if (group != null) {
group.shutdownGracefully();
@@ -319,6 +317,10 @@ public class NettyTcpClient {
this.isConnected = status;
}
public boolean isConnected() {
return this.isConnected;
}
public void setListener(NettyClientListener listener) {
this.listener = listener;
}

View File

@@ -1,23 +1,23 @@
package com.mogo.telematic.client.handler
import com.mogo.telematic.MogoProtocolMsg
import com.mogo.telematic.MogoProtocolMsg.HEART_DATA
import com.mogo.telematic.client.listener.NettyClientListener
import com.mogo.telematic.client.status.ConnectState
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.handler.timeout.IdleStateEvent
import io.netty.util.CharsetUtil
class HeartbeatHandler(val listener: NettyClientListener<*>): ChannelInboundHandlerAdapter() {
private val heartbeatData by lazy {
Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8))
MogoProtocolMsg(HEART_DATA, 1, byteArrayOf(0x0))
}
override fun userEventTriggered(ctx: ChannelHandlerContext?, evt: Any?) {
if (evt is IdleStateEvent) {
ctx?.writeAndFlush(heartbeatData.duplicate())?.addListener { future ->
ctx?.channel()?.writeAndFlush(heartbeatData)?.addListener { future ->
if (future is ChannelFuture) {
if (!future.isSuccess) {
listener.onClientStatusConnectChanged(

View File

@@ -16,16 +16,13 @@ import io.netty.channel.SimpleChannelInboundHandler;
public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocolMsg> {
private static final String TAG = "NettyClientHandler";
private final boolean isSendHeartBeat;
private NettyClientListener listener;
private final String mSign;
private Object heartBeatData;
public NettyClientHandler(@NotNull NettyClientListener listener, String sign, boolean isSendHeartBeat, Object heartBeatData) {
public NettyClientHandler(@NotNull NettyClientListener listener, String sign) {
this.listener = listener;
this.mSign = sign;
this.isSendHeartBeat = isSendHeartBeat;
this.heartBeatData = heartBeatData;
}
/**

View File

@@ -1,17 +1,16 @@
package com.mogo.telematic.client.handler
import android.util.Log
import com.mogo.telematic.TelematicHandler
import com.mogo.telematic.client.NettyTcpClient
import com.mogo.telematic.client.listener.NettyClientListener
import com.mogo.telematic.client.status.ConnectState
import com.mogo.telematic.client.status.ConnectState.STATUS_CONNECT_CLOSED
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter
import java.util.concurrent.TimeUnit
import kotlin.math.max
class ReconnectHandler(
private val mNettyClient: NettyTcpClient,
private val listener: NettyClientListener<*>
) : ChannelInboundHandlerAdapter() {
@@ -71,9 +70,7 @@ class ReconnectHandler(
sleepTimeMs = maxSleepMs.toLong()
}
Log.d(TAG, "${sleepTimeMs}ms后执行重连操作")
ctx.channel().eventLoop().schedule({
mNettyClient.reconnectServer()
}, sleepTimeMs, TimeUnit.MILLISECONDS)
TelematicHandler.delayReConnect(sleepTimeMs)
++retries
} else {
Log.d(TAG, "Try to reconnect to the server Over Retry count:${retries + 1}")