diff --git a/app/src/main/java/com/mogo/cloud/netty/server/NettyServerActivity.java b/app/src/main/java/com/mogo/cloud/netty/server/NettyServerActivity.java index d4d7bf1..9c01f96 100644 --- a/app/src/main/java/com/mogo/cloud/netty/server/NettyServerActivity.java +++ b/app/src/main/java/com/mogo/cloud/netty/server/NettyServerActivity.java @@ -130,14 +130,16 @@ public class NettyServerActivity extends AppCompatActivity implements View.OnCli if (!NSDNettyManager.getInstance().isServerStart()) { Toast.makeText(getApplicationContext(), "未连接,请先连接", LENGTH_SHORT).show(); } else { - NSDNettyManager.getInstance().sendMogoProtocolMsgToClient(mogoProtocolMsg, channelFuture -> { - if (channelFuture.isSuccess()) { - Log.d(TAG, "Write auth successful"); - logSend(Arrays.toString(mogoProtocolMsg.getBody())); - } else { - Log.d(TAG, "Write auth error"); - } - }); +// NSDNettyManager.getInstance().sendMogoProtocolMsgToClient(mogoProtocolMsg, channelFuture -> { +// if (channelFuture.isSuccess()) { +// Log.d(TAG, "Write auth successful"); +// logSend(Arrays.toString(mogoProtocolMsg.getBody())); +// } else { +// Log.d(TAG, "Write auth error"); +// } +// }); + NSDNettyManager.getInstance().sendMsgToAllClients(mogoProtocolMsg); + logSend(Arrays.toString(mogoProtocolMsg.getBody())); mSendET.setText(Arrays.toString(mogoProtocolMsg.getBody())); } break; diff --git a/libraries/mogo-telematic/build.gradle b/libraries/mogo-telematic/build.gradle index 0d861d6..f3561d4 100644 --- a/libraries/mogo-telematic/build.gradle +++ b/libraries/mogo-telematic/build.gradle @@ -1,4 +1,6 @@ apply plugin: 'com.android.library' +apply plugin: 'kotlin-android' +apply plugin: 'kotlin-android-extensions' android { compileSdkVersion rootProject.ext.android.compileSdkVersion @@ -29,6 +31,7 @@ android { dependencies { implementation fileTree(dir: "libs", include: ["*.jar"]) api rootProject.ext.dependencies.spi + implementation rootProject.ext.dependencies.kotlinstdlibjdk7 api 'io.netty:netty-all:4.1.8.Final' if (Boolean.valueOf(RELEASE)) { @@ -36,7 +39,6 @@ dependencies { } else { } - } -apply from: new File(rootProject.rootDir, "gradle/upload.gradle").toString() +apply from: new File(rootProject.rootDir, "gradle/upload.gradle").toString() \ No newline at end of file diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NettyTcpClient.java b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NettyTcpClient.java index 8be1148..b80ab7e 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NettyTcpClient.java +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NettyTcpClient.java @@ -5,7 +5,6 @@ import static com.mogo.telematic.MogoLengthFrameDecoder.LENGTH_FIELD_SIZE; import static com.mogo.telematic.MogoLengthFrameDecoder.MAX_FRAME_LENGTH; import static com.mogo.telematic.MogoProtocolMsg.IDENTITY_REGIST; -import android.os.SystemClock; import android.text.TextUtils; import android.util.Log; @@ -46,7 +45,8 @@ public class NettyTcpClient { private Channel channel; - private boolean isConnect = false; + private volatile boolean canReconnect = true; + private volatile boolean isConnected = false; /** * 最大重连次数 @@ -119,6 +119,7 @@ public class NettyTcpClient { public void connect() { if (isConnecting) { + Log.d(TAG, "正在连接中,connect return"); return; } Thread clientThread = new Thread("client-Netty") { @@ -133,62 +134,86 @@ public class NettyTcpClient { clientThread.start(); } + private void initBootstrap() { + try { + bootstrap = new Bootstrap().group(group) + .option(ChannelOption.SO_KEEPALIVE, true) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) + .channel(NioSocketChannel.class) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new ReconnectHandler(NettyTcpClient.this)); + if (isSendheartBeat) { + ch.pipeline().addLast("ping", new IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS));//5s未发送数据,回调userEventTriggered + } + // decoder + ch.pipeline().addLast(new MogoLengthFrameDecoder(MAX_FRAME_LENGTH, + LENGTH_FIELD_OFFSET, + LENGTH_FIELD_SIZE)); + // encoder + ch.pipeline().addLast(new MogoMessageEncoder()); + ch.pipeline().addLast(new NettyClientHandler(mProxyListener, mSign, isSendheartBeat, heartBeatData)); + } + }); + } catch (Exception e) { + Log.w(TAG, "初始化出错的原因为:" + e.getMessage()); + } + } private void connectServer() { + Log.d(TAG, "connectServer fun is called."); synchronized (NettyTcpClient.this) { - ChannelFuture channelFuture = null; - if (!isConnect) { + if (!isConnected) { isConnecting = true; group = new NioEventLoopGroup(); - bootstrap = new Bootstrap().group(group) - .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000) - .channel(NioSocketChannel.class) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new ReconnectHandler(NettyTcpClient.this)); - if (isSendheartBeat) { - ch.pipeline().addLast("ping", new IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS));//5s未发送数据,回调userEventTriggered - } - // decoder - ch.pipeline().addLast(new MogoLengthFrameDecoder(MAX_FRAME_LENGTH, - LENGTH_FIELD_OFFSET, - LENGTH_FIELD_SIZE)); - // encoder - ch.pipeline().addLast(new MogoMessageEncoder()); - ch.pipeline().addLast(new NettyClientHandler(mProxyListener, mSign, isSendheartBeat, heartBeatData)); - } - }); - - try { - channelFuture = bootstrap.connect(host, tcp_port).addListener(mFutureListener);//.sync() - // Wait until the connection is closed. - channelFuture.channel().closeFuture().addListener((ChannelFutureListener) future -> { - if (future.channel() != null) { - future.channel().eventLoop().shutdownGracefully(); - } - channel = null; - }); - Log.e(TAG, " 断开连接"); - } catch (Exception e) { - e.printStackTrace(); - } finally { - isConnect = false; - if (listener != null) { - listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, mSign); - } -// if (null != channelFuture) { -// if (channelFuture.channel() != null && channelFuture.channel().isOpen()) { -// channelFuture.channel().close(); -// } -// } -// group.shutdownGracefully(); -// reconnect(); - } + initBootstrap(); + doConnect(); + } else { + Log.w(TAG, "socket正在连接中,无需重连"); } } } + private void doConnect() { + if (channel != null && channel.isOpen() && channel.isActive()) { + Log.w(TAG, "Channel is active, no need to call start."); + return; + } + if (!canReconnect) { + Log.d(TAG, "重连已被关闭则直接return."); + return; + } + ChannelFuture channelFuture; + try { + if (bootstrap != null) { + channelFuture = bootstrap.connect(host, tcp_port); + channel = channelFuture.channel(); + channelFuture.addListener(mFutureListener); + + channel.closeFuture().addListener((ChannelFutureListener) future -> { +// // 确保连接不再被使用时使用,否则重连时task会被拒绝 +// if (future.channel() != null) { +// future.channel().eventLoop().shutdownGracefully(); +// } + channel = null; + Log.e(TAG, "连接已经结束!"); + }); + Log.d(TAG, "连接获取channel已完成!"); + } + } catch (Exception e) { + Log.e(TAG, "连接Server ip:" + host + ",port:" + tcp_port + ",失败信息:" + e.getMessage()); + isConnected = false; + } + } + + public void reconnectServer() { + Log.d(TAG, "ReconnectServer function is called."); + if (!TextUtils.isEmpty(host)) { + doConnect(); + } + } + private NettyClientListener mProxyListener = new NettyClientListener() { @Override public void onMessageResponseClient(Object msg, String sign) { @@ -212,19 +237,25 @@ public class NettyTcpClient { } }; - private ChannelFutureListener mFutureListener = new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { + private ChannelFutureListener mFutureListener = channelFuture -> { + if (channelFuture.isDone()) { if (channelFuture.isSuccess()) { - Log.e(TAG, "连接成功"); + Log.d(TAG, "连接成功"); reconnectNum = MAX_CONNECT_TIMES; - isConnect = true; - channel = channelFuture.channel(); + isConnected = true; } else { - Log.e(TAG, "连接失败"); - isConnect = false; + if (channelFuture.isCancelled()) { + Log.w(TAG, "Connection attempt cancelled by user."); + } else { + Log.e(TAG, "连接失败原因为:" + channelFuture.cause().getMessage()); + } + isConnected = false; + channelFuture.channel().pipeline().fireChannelInactive(); } isConnecting = false; + } else { + Log.w(TAG, "Connection isn't done."); + isConnected = false; } }; @@ -234,43 +265,8 @@ public class NettyTcpClient { group.shutdownGracefully(); } - public void reconnect() { - Log.e(TAG, "reconnect"); - if (isNeedReconnect && reconnectNum > 0 && !isConnect) { - reconnectNum--; - SystemClock.sleep(reconnectIntervalTime); - if (isNeedReconnect && reconnectNum > 0 && !isConnect) { - Log.e(TAG, "重新连接"); - connectServer(); - } - } - } - - public void reconnectServer() { - Log.e(TAG, "***reconnectServer***"); - if (channel != null && channel.isOpen() && channel.isActive()) { - return; - } - if (bootstrap != null) { - ChannelFuture channelFuture = null; - try { - channelFuture = bootstrap.connect(host, tcp_port).addListener(mFutureListener); - // Wait until the connection is closed. - channelFuture.channel().closeFuture().sync(); - Log.e(TAG, " 断开连接"); - } catch (Exception e) { - e.printStackTrace(); - } finally { - isConnect = false; - if (listener != null) { - listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, mSign); - } - } - } - } - public boolean sendMsgToServer(MogoProtocolMsg protocolMsg, final MessageStateListener listener) { - boolean flag = channel != null && isConnect; + boolean flag = channel != null && isConnected; if (flag) { channel.writeAndFlush(protocolMsg).addListener((ChannelFutureListener) channelFuture -> { if (listener != null) { @@ -284,10 +280,10 @@ public class NettyTcpClient { /** * 获取TCP连接状态 * - * @return 获取TCP连接状态 + * @return 获取TCP连接状态 */ public boolean getConnectStatus() { - return isConnect; + return isConnected; } public boolean isConnecting() { @@ -295,7 +291,7 @@ public class NettyTcpClient { } public void setConnectStatus(boolean status) { - this.isConnect = status; + this.isConnected = status; } public void setListener(NettyClientListener listener) { @@ -343,7 +339,8 @@ public class NettyTcpClient { */ private Object heartBeatData; - public Builder() {} + public Builder() { + } public Builder setMaxReconnectTimes(int reConnectTimes) { this.MAX_CONNECT_TIMES = reConnectTimes; diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NsdClient.java b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NsdClient.java index 72ff638..0c75dad 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NsdClient.java +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/NsdClient.java @@ -42,9 +42,6 @@ public class NsdClient { private static final int MSG_NULL = 1003; - - int count; - /** * @param context this * @param serviceName 客户端扫描 指定的地址 暂时没用到 @@ -72,25 +69,23 @@ public class NsdClient { @Override public void onStartDiscoveryFailed(String serviceType, int errorCode) { mNsdManager.stopServiceDiscovery(this); - Log.e(TAG, "onStartDiscoveryFailed():"); + Log.e(TAG, "onStartDiscoveryFailed()"); } @Override public void onStopDiscoveryFailed(String serviceType, int errorCode) { mNsdManager.stopServiceDiscovery(this); - Log.e(TAG, "onStopDiscoveryFailed():"); + Log.e(TAG, "onStopDiscoveryFailed()"); } @Override public void onDiscoveryStarted(String serviceType) { - Log.e(TAG, "onDiscoveryStarted():"); - + Log.e(TAG, "onDiscoveryStarted()"); } @Override public void onDiscoveryStopped(String serviceType) { - Log.e(TAG, "onDiscoveryStopped():"); - + Log.e(TAG, "onDiscoveryStopped()"); } /** @@ -99,15 +94,13 @@ public class NsdClient { */ @Override public void onServiceFound(NsdServiceInfo serviceInfo) { + Log.e(TAG, "onServiceFound()"); //根据咱服务器的定义名称,指定解析该 NsdServiceInfo if (serviceInfo.getServiceName().equals(mServiceName)) { mNsdManager.resolveService(serviceInfo, mResolverListener); } else { mHandler.sendEmptyMessage(MSG_NULL); } - - Log.e(TAG, "onServiceFound():"); - mNsdServiceInfoListBefore.add(serviceInfo); } @@ -115,7 +108,6 @@ public class NsdClient { @Override public void onServiceLost(NsdServiceInfo serviceInfo) { Log.e(TAG, "onServiceLost(): serviceInfo=" + serviceInfo); - } }; } @@ -126,7 +118,6 @@ public class NsdClient { super.handleMessage(msg); switch (msg.what) { case MSG_RESOLVER: - //回调到主线 进行解析結果的回調 NsdServiceInfo serviceInfo = (NsdServiceInfo) msg.obj; if (mIServerFound != null) { @@ -152,6 +143,7 @@ public class NsdClient { mResolverListener = new NsdManager.ResolveListener() { @Override public void onResolveFailed(NsdServiceInfo serviceInfo, int errorCode) { + Log.e(TAG, "onResolveFailed()"); } @Override diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/NettyClientHandler.java b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/NettyClientHandler.java index 38ec6c0..e9eead2 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/NettyClientHandler.java +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/NettyClientHandler.java @@ -1,19 +1,13 @@ package com.mogo.telematic.client.handler; -import android.text.TextUtils; import android.util.Log; import com.mogo.telematic.MogoProtocolMsg; import com.mogo.telematic.client.listener.NettyClientListener; import com.mogo.telematic.client.status.ConnectState; -import java.io.UnsupportedEncodingException; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; @@ -89,9 +83,6 @@ public class NettyClientHandler extends SimpleChannelInboundHandler MAX_RETRIES_LIMIT) { - retries = MAX_RETRIES_LIMIT; - } - sleepTimeMs = baseSleepTimeMs * Math.max(1, 1 << retries); - if (sleepTimeMs > maxSleepMs) { - sleepTimeMs = maxSleepMs; - } - - EventLoop eventLoop = ctx.channel().eventLoop(); - eventLoop.schedule(() -> { - Log.d(TAG,"Reconnecting ..."); - mNettyClient.reconnectServer(); - }, sleepTimeMs, TimeUnit.MILLISECONDS); - ++retries; - }else { - Log.d(TAG, "Try to reconnect to the server Over Retry count:" + (retries+1)); - } - } -} diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/ReconnectHandler.kt b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/ReconnectHandler.kt new file mode 100644 index 0000000..dcdba38 --- /dev/null +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/handler/ReconnectHandler.kt @@ -0,0 +1,68 @@ +package com.mogo.telematic.client.handler + +import android.util.Log +import com.mogo.telematic.client.NettyTcpClient +import io.netty.channel.ChannelInboundHandlerAdapter +import io.netty.channel.ChannelHandlerContext +import kotlin.Throws +import java.lang.Exception +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import kotlin.math.max + +class ReconnectHandler(private val mNettyClient: NettyTcpClient) : ChannelInboundHandlerAdapter() { + + private val maxRetries = Int.MAX_VALUE + private var retries = 0 + private val baseSleepTimeMs = 3000 + private val maxSleepMs = 10 * 1000 + private val scheduledThreadPool by lazy(LazyThreadSafetyMode.SYNCHRONIZED) { + Executors.newScheduledThreadPool(1) + } + + companion object { + private const val TAG = "ReconnectHandler" + private const val MAX_RETRIES_LIMIT = 29 + } + + override fun channelActive(ctx: ChannelHandlerContext) { + retries = 0 + ctx.fireChannelActive() + } + + @Throws(Exception::class) + override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { + Log.e(TAG, "exceptionCaught channelId is:${ctx.channel().id()}, cause message is:${cause.message}") + ctx.close() + } + + @Throws(Exception::class) + override fun channelInactive(ctx: ChannelHandlerContext) { + Log.d(TAG, "ReconnectHandler channelInactive ...") + // TODO:(策略暂时耦合在这里,后面使用策略模式) + var allowRetry = false + if (retries < maxRetries) { + allowRetry = true + } + if (allowRetry) { + var sleepTimeMs: Long + require(retries >= 0) { "Retries count must greater than 0." } + if (retries > MAX_RETRIES_LIMIT) { + retries = MAX_RETRIES_LIMIT + } + sleepTimeMs = (baseSleepTimeMs * max(1, 1 shl retries)).toLong() + if (sleepTimeMs > maxSleepMs) { + sleepTimeMs = maxSleepMs.toLong() + } + Log.d(TAG, "${sleepTimeMs}ms后执行重连操作!") + scheduledThreadPool.schedule({ + Log.d(TAG, "Reconnecting server ...") + // 异步重连或单独线程池中同步重连,不要阻塞netty的io线程,同时也不要关闭EventLoopGroup + mNettyClient.reconnectServer() + }, sleepTimeMs, TimeUnit.MILLISECONDS) + ++retries + } else { + Log.d(TAG, "Try to reconnect to the server Over Retry count:${retries + 1}") + } + } +} \ No newline at end of file diff --git a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/status/ConnectState.java b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/status/ConnectState.java index 08747a9..5620c9f 100644 --- a/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/status/ConnectState.java +++ b/libraries/mogo-telematic/src/main/java/com/mogo/telematic/client/status/ConnectState.java @@ -9,4 +9,6 @@ public class ConnectState { public final static int STATUS_CONNECT_CLOSED = 0; public final static int STATUS_CONNECT_ERROR = -1; + + public final static int STATUS_CONNECT_LOST = -2; }