diff --git a/foudations/mogo-socket/build.gradle b/foudations/mogo-socket/build.gradle index 8000fbe..37f7a13 100644 --- a/foudations/mogo-socket/build.gradle +++ b/foudations/mogo-socket/build.gradle @@ -28,7 +28,7 @@ dependencies { implementation fileTree(dir: "libs", include: ["*.jar"]) api 'com.google.protobuf:protobuf-java:3.12.4' - implementation 'io.netty:netty-all:4.1.8.Final' + implementation 'io.netty:netty-all:4.1.29.Final' api 'com.zhidao.ptech:connsvr-protoco:0.1.37' api 'com.elegant.network:network:1.1.28' api 'com.elegant.analytics:analytics:1.1.28' diff --git a/gradle.properties b/gradle.properties index cd7d3b5..99a947b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -36,24 +36,24 @@ PASSWORD=xintai2018 RELEASE=true # AI CLOUD 云平台 # 工具类 -MOGO_UTILS_VERSION=1.4.5.12 +MOGO_UTILS_VERSION=1.4.6.4 # 网络请求 -MOGO_NETWORK_VERSION=1.4.5.12 +MOGO_NETWORK_VERSION=1.4.6.4 # 鉴权 -MOGO_PASSPORT_VERSION=1.4.5.12 +MOGO_PASSPORT_VERSION=1.4.6.4 # 常链接 -MOGO_SOCKET_VERSION=1.4.5.12 +MOGO_SOCKET_VERSION=1.4.6.4 # 数据采集 -MOGO_REALTIME_VERSION=1.4.5.12 +MOGO_REALTIME_VERSION=1.4.6.4 # 探路,道路事件发布,获取 -MOGO_TANLU_VERSION=1.4.5.12 +MOGO_TANLU_VERSION=1.4.6.4 # 直播推流 -MOGO_LIVE_VERSION=1.4.5.12 +MOGO_LIVE_VERSION=1.4.6.4 # 直播拉流 -MOGO_TRAFFICLIVE_VERSION=1.4.5.12 +MOGO_TRAFFICLIVE_VERSION=1.4.6.4 # 定位服务 -MOGO_LOCATION_VERSION=1.4.5.12 +MOGO_LOCATION_VERSION=1.4.6.4 # 远程通讯模块 -MOGO_TELEMATIC_VERSION=1.4.5.12 +MOGO_TELEMATIC_VERSION=1.4.6.4 # v2x -MOGO_V2X_VERSION=1.4.5.12 +MOGO_V2X_VERSION=1.4.6.4 diff --git a/libraries/mogo-telematic/build.gradle b/libraries/mogo-telematic/build.gradle index 9cd7ec4..41f958c 100644 --- a/libraries/mogo-telematic/build.gradle +++ b/libraries/mogo-telematic/build.gradle @@ -32,7 +32,7 @@ dependencies { implementation fileTree(dir: "libs", include: ["*.jar"]) api rootProject.ext.dependencies.spi implementation rootProject.ext.dependencies.kotlinstdlibjdk7 - api 'io.netty:netty-all:4.1.8.Final' + api 'io.netty:netty-all:4.1.29.Final' implementation rootProject.ext.dependencies.logger if (Boolean.valueOf(RELEASE)) { diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/NSDNettyManager.java b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/NSDNettyManager.java index fadd509..a2bfbcd 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/NSDNettyManager.java +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/NSDNettyManager.java @@ -1,10 +1,12 @@ package com.mogo.telematic; import static com.mogo.telematic.client.status.ConnectState.STATUS_CONNECT_CLOSED; +import static com.mogo.telematic.client.status.ConnectState.STATUS_CONNECT_SUCCESS; import android.content.Context; import android.net.nsd.NsdServiceInfo; import android.os.Looper; +import android.util.Log; import com.elegant.log.simplelog.Logger; import com.mogo.telematic.client.NettyTcpClient; @@ -321,17 +323,11 @@ public class NSDNettyManager implements TelematicHandler.ITelematicListener { } private void realConnectServer(ArrayList hostList, String uuid) { - String hostAddress = ""; - for (HostBean hostBean : hostList) { - if (mNettyTcpClient != null && (mNettyTcpClient.isConnecting() || mNettyTcpClient.getConnectStatus())) { - Logger.d(TAG, "当前乘客屏幕已经连接到了ip:" + hostAddress + ",port为:" + 1088); - return; - } - hostAddress = hostBean.ipAddress; - Logger.d(TAG, "NSD查询到指定服务器信息ip为:" + hostAddress + ",port为:" + 1088); - //获取到指定的地址,进行Netty的连接 - connectNettyServer(hostAddress, 1088, uuid); - } + if (hostList == null || hostList.isEmpty()) return; + String hostAddress = hostList.get(0).ipAddress; + Logger.d(TAG, "查询到指定服务器信息ip为:" + hostAddress + ",port为:" + 1088); + //获取到指定的地址,进行Netty的连接 + connectNettyServer(hostAddress, 1088, uuid); } private void connectNettyServer(String serverAddress, int port, String sign) { @@ -340,52 +336,46 @@ public class NSDNettyManager implements TelematicHandler.ITelematicListener { Logger.e(TAG, "Netty Server的ip不能为空!"); return; } + if (mNettyTcpClient != null && (mNettyTcpClient.isConnecting() || mNettyTcpClient.isConnected())) { + return; + } if (!isCanceled) { TelematicHandler.removeDelay(); } - if (mNettyTcpClient == null) { - mNettyTcpClient = new NettyTcpClient.Builder() - .setHost(serverAddress) //设置服务端地址 - .setTcpPort(port) //设置服务端端口号 - .setMaxReconnectTimes(5) //设置最大重连次数 - .setReconnectIntervalTime(5) //设置重连间隔时间。单位:秒 - .setSendHeartBeat(true) //设置是否发送心跳 - .setHeartBeatInterval(120) //设置心跳间隔时间。单位:秒 - .setHeartBeatData(new MogoProtocolMsg(MogoProtocolMsg.HEART_DATA, 2, new byte[]{0x00, 0x00})) //设置心跳数据,可以是String类型,也可以是byte[],以后设置的为准 - .setSign(sign) //设置客户端标识.(因为可能存在多个tcp连接) - .build(); - mNettyTcpClient.setListener(new NettyClientListener() { - @Override - public void onMessageResponseClient(Object msg, String sign, Channel channel) { - if (mClientListener != null) { - mClientListener.onMessageResponseClient(msg, sign, channel); - } + mNettyTcpClient = new NettyTcpClient.Builder() + .setHost(serverAddress) //设置服务端地址 + .setTcpPort(port) //设置服务端端口号 + .setMaxReconnectTimes(5) //设置最大重连次数 + .setReconnectIntervalTime(5) //设置重连间隔时间。单位:秒 + .setSign(sign) //设置客户端标识.(因为可能存在多个tcp连接) + .setSendHeartBeat(true) + .build(); + mNettyTcpClient.setListener(new NettyClientListener() { + @Override + public void onMessageResponseClient(Object msg, String sign, Channel channel) { + if (mClientListener != null) { + mClientListener.onMessageResponseClient(msg, sign, channel); } + } - @Override - public void onClientStatusConnectChanged(int statusCode, String content, Channel channel) { - if (mClientListener != null) { - mClientListener.onClientStatusConnectChanged(statusCode, content, channel); - } - if (statusCode == STATUS_CONNECT_CLOSED) { - // 开启重连倒计时 - isCanceled = false; - TelematicHandler.delay(COUNTDOWN_TIME); - } + @Override + public void onClientStatusConnectChanged(int statusCode, String content, Channel channel) { + if (mClientListener != null) { + mClientListener.onClientStatusConnectChanged(statusCode, content, channel); } - }); - } + if (statusCode == STATUS_CONNECT_CLOSED) { + // 开启重连倒计时 + isCanceled = false; + TelematicHandler.delay(COUNTDOWN_TIME); + } else if (statusCode == STATUS_CONNECT_SUCCESS) { + TelematicHandler.removeDelay(); + TelematicHandler.removeReconnect(); + } + } + }); if (!mNettyTcpClient.getConnectStatus()) { mNettyTcpClient.connect();//连接服务器 - // 等待连接进行,当连接成功或者失败后复活继续轮训连接 - synchronized (mNettyTcpClient) { - try { - mNettyTcpClient.wait(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } } else { Logger.d(TAG, "Client is connected."); } @@ -452,10 +442,19 @@ public class NSDNettyManager implements TelematicHandler.ITelematicListener { @Override public void delayCompleted() { + Log.e(TAG, "收到delayCompleted事件"); // 倒计时结束关闭客户端EventGroup,重新扫描ip TelematicHandler.removeDelay(); + TelematicHandler.removeReconnect(); isCanceled = true; disconnect(); searchAndConnectServer(mContext, mUuid, mIsTaxi, mClientListener); } + + @Override + public void reconnectServer() { + Log.e(TAG, "收到reconnectServer事件"); + TelematicHandler.removeReconnect(); + mNettyTcpClient.reconnectServer(); + } } diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/TelematicHandler.kt b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/TelematicHandler.kt index bcbba79..0a9e233 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/TelematicHandler.kt +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/TelematicHandler.kt @@ -7,6 +7,7 @@ import android.os.Message object TelematicHandler { private const val MSG_DELAY = 100 + private const val MSG_RECONNECT_DELAY = 101 private val handlerThread by lazy(LazyThreadSafetyMode.SYNCHRONIZED) { HandlerThread("TelematicHandler") @@ -21,8 +22,13 @@ object TelematicHandler { handler = object : Handler(handlerThread.looper) { override fun handleMessage(msg: Message) { super.handleMessage(msg) - if (msg.what == MSG_DELAY) { - listener?.delayCompleted() + when (msg.what) { + MSG_DELAY -> { + listener?.delayCompleted() + } + MSG_RECONNECT_DELAY -> { + listener?.reconnectServer() + } } } } @@ -38,12 +44,25 @@ object TelematicHandler { handler?.sendEmptyMessageDelayed(MSG_DELAY, delayMillis) } + @JvmStatic + fun delayReConnect(delayMillis: Long) { + removeReconnect() + handler?.sendEmptyMessageDelayed(MSG_RECONNECT_DELAY, delayMillis) + } + @JvmStatic fun removeDelay() { handler?.removeMessages(MSG_DELAY) } + @JvmStatic + fun removeReconnect() { + handler?.removeMessages(MSG_RECONNECT_DELAY) + } + interface ITelematicListener { fun delayCompleted() + + fun reconnectServer() } } \ No newline at end of file diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NettyTcpClient.java b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NettyTcpClient.java index d54eb3c..8a8efe0 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NettyTcpClient.java +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NettyTcpClient.java @@ -62,7 +62,7 @@ public class NettyTcpClient { private int reconnectNum = MAX_CONNECT_TIMES; private boolean isNeedReconnect = true; - private boolean isConnecting = false; + private volatile boolean isConnecting = false; private long reconnectIntervalTime = 5000; @@ -154,18 +154,18 @@ public class NettyTcpClient { .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new ReconnectHandler(NettyTcpClient.this, mProxyListener)); if (isSendheartBeat) { ch.pipeline().addLast(new IdleStateHandler(heartBeatInterval, heartBeatInterval, 0, TimeUnit.SECONDS)); ch.pipeline().addLast(new HeartbeatHandler(mProxyListener)); } + ch.pipeline().addLast(new ReconnectHandler(mProxyListener)); // decoder ch.pipeline().addLast(new MogoLengthFrameDecoder(MAX_FRAME_LENGTH, LENGTH_FIELD_OFFSET, LENGTH_FIELD_SIZE)); // encoder ch.pipeline().addLast(new MogoMessageEncoder()); - ch.pipeline().addLast(new NettyClientHandler(mProxyListener, mSign, isSendheartBeat, heartBeatData)); + ch.pipeline().addLast(new NettyClientHandler(mProxyListener, mSign)); } }); } catch (Exception e) { @@ -190,6 +190,7 @@ public class NettyTcpClient { } private void doConnect() { + Log.d(TAG, "开始连接Server端"); if (channel != null && channel.isOpen() && channel.isActive()) { Log.w(TAG, "Channel is active, no need to call start."); return; @@ -217,7 +218,6 @@ public class NettyTcpClient { channel = null; Log.e(TAG, "连接已经结束!"); }); - Log.d(TAG, "连接获取channel已完成!"); } } catch (Exception e) { Log.e(TAG, "连接Server ip:" + host + ",port:" + tcp_port + ",失败信息:" + e.getMessage()); @@ -274,14 +274,12 @@ public class NettyTcpClient { Log.w(TAG, "Connection isn't done."); isConnected = false; } - synchronized (NettyTcpClient.this) { - NettyTcpClient.this.notifyAll(); - } }; public void disconnect() { - Log.e(TAG, "disconnect"); + Log.e(TAG, "断开连接!"); isNeedReconnect = false; + isConnecting = false; isConnected = false; if (group != null) { group.shutdownGracefully(); @@ -319,6 +317,10 @@ public class NettyTcpClient { this.isConnected = status; } + public boolean isConnected() { + return this.isConnected; + } + public void setListener(NettyClientListener listener) { this.listener = listener; } diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/HeartbeatHandler.kt b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/HeartbeatHandler.kt index 2d65344..27d29d8 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/HeartbeatHandler.kt +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/HeartbeatHandler.kt @@ -1,23 +1,23 @@ package com.mogo.telematic.client.handler +import com.mogo.telematic.MogoProtocolMsg +import com.mogo.telematic.MogoProtocolMsg.HEART_DATA import com.mogo.telematic.client.listener.NettyClientListener import com.mogo.telematic.client.status.ConnectState -import io.netty.buffer.Unpooled import io.netty.channel.ChannelFuture import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.handler.timeout.IdleStateEvent -import io.netty.util.CharsetUtil class HeartbeatHandler(val listener: NettyClientListener<*>): ChannelInboundHandlerAdapter() { private val heartbeatData by lazy { - Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8)) + MogoProtocolMsg(HEART_DATA, 1, byteArrayOf(0x0)) } override fun userEventTriggered(ctx: ChannelHandlerContext?, evt: Any?) { if (evt is IdleStateEvent) { - ctx?.writeAndFlush(heartbeatData.duplicate())?.addListener { future -> + ctx?.channel()?.writeAndFlush(heartbeatData)?.addListener { future -> if (future is ChannelFuture) { if (!future.isSuccess) { listener.onClientStatusConnectChanged( diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/NettyClientHandler.java b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/NettyClientHandler.java index 1d23e34..5c26842 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/NettyClientHandler.java +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/NettyClientHandler.java @@ -16,16 +16,13 @@ import io.netty.channel.SimpleChannelInboundHandler; public class NettyClientHandler extends SimpleChannelInboundHandler { private static final String TAG = "NettyClientHandler"; - private final boolean isSendHeartBeat; private NettyClientListener listener; private final String mSign; private Object heartBeatData; - public NettyClientHandler(@NotNull NettyClientListener listener, String sign, boolean isSendHeartBeat, Object heartBeatData) { + public NettyClientHandler(@NotNull NettyClientListener listener, String sign) { this.listener = listener; this.mSign = sign; - this.isSendHeartBeat = isSendHeartBeat; - this.heartBeatData = heartBeatData; } /** diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/ReconnectHandler.kt b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/ReconnectHandler.kt index 0771d33..cabe05f 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/ReconnectHandler.kt +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/ReconnectHandler.kt @@ -1,17 +1,16 @@ package com.mogo.telematic.client.handler import android.util.Log +import com.mogo.telematic.TelematicHandler import com.mogo.telematic.client.NettyTcpClient import com.mogo.telematic.client.listener.NettyClientListener import com.mogo.telematic.client.status.ConnectState import com.mogo.telematic.client.status.ConnectState.STATUS_CONNECT_CLOSED import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter -import java.util.concurrent.TimeUnit import kotlin.math.max class ReconnectHandler( - private val mNettyClient: NettyTcpClient, private val listener: NettyClientListener<*> ) : ChannelInboundHandlerAdapter() { @@ -71,9 +70,7 @@ class ReconnectHandler( sleepTimeMs = maxSleepMs.toLong() } Log.d(TAG, "${sleepTimeMs}ms后执行重连操作!") - ctx.channel().eventLoop().schedule({ - mNettyClient.reconnectServer() - }, sleepTimeMs, TimeUnit.MILLISECONDS) + TelematicHandler.delayReConnect(sleepTimeMs) ++retries } else { Log.d(TAG, "Try to reconnect to the server Over Retry count:${retries + 1}")