[Update]Netty支持断线重连机制

This commit is contained in:
chenfufeng
2022-02-21 19:43:14 +08:00
parent 6e850b0cab
commit d3a2374270
8 changed files with 184 additions and 203 deletions

View File

@@ -130,14 +130,16 @@ public class NettyServerActivity extends AppCompatActivity implements View.OnCli
if (!NSDNettyManager.getInstance().isServerStart()) {
Toast.makeText(getApplicationContext(), "未连接,请先连接", LENGTH_SHORT).show();
} else {
NSDNettyManager.getInstance().sendMogoProtocolMsgToClient(mogoProtocolMsg, channelFuture -> {
if (channelFuture.isSuccess()) {
Log.d(TAG, "Write auth successful");
// NSDNettyManager.getInstance().sendMogoProtocolMsgToClient(mogoProtocolMsg, channelFuture -> {
// if (channelFuture.isSuccess()) {
// Log.d(TAG, "Write auth successful");
// logSend(Arrays.toString(mogoProtocolMsg.getBody()));
// } else {
// Log.d(TAG, "Write auth error");
// }
// });
NSDNettyManager.getInstance().sendMsgToAllClients(mogoProtocolMsg);
logSend(Arrays.toString(mogoProtocolMsg.getBody()));
} else {
Log.d(TAG, "Write auth error");
}
});
mSendET.setText(Arrays.toString(mogoProtocolMsg.getBody()));
}
break;

View File

@@ -1,4 +1,6 @@
apply plugin: 'com.android.library'
apply plugin: 'kotlin-android'
apply plugin: 'kotlin-android-extensions'
android {
compileSdkVersion rootProject.ext.android.compileSdkVersion
@@ -29,6 +31,7 @@ android {
dependencies {
implementation fileTree(dir: "libs", include: ["*.jar"])
api rootProject.ext.dependencies.spi
implementation rootProject.ext.dependencies.kotlinstdlibjdk7
api 'io.netty:netty-all:4.1.8.Final'
if (Boolean.valueOf(RELEASE)) {
@@ -36,7 +39,6 @@ dependencies {
} else {
}
}
apply from: new File(rootProject.rootDir, "gradle/upload.gradle").toString()

View File

@@ -5,7 +5,6 @@ import static com.mogo.telematic.MogoLengthFrameDecoder.LENGTH_FIELD_SIZE;
import static com.mogo.telematic.MogoLengthFrameDecoder.MAX_FRAME_LENGTH;
import static com.mogo.telematic.MogoProtocolMsg.IDENTITY_REGIST;
import android.os.SystemClock;
import android.text.TextUtils;
import android.util.Log;
@@ -46,7 +45,8 @@ public class NettyTcpClient {
private Channel channel;
private boolean isConnect = false;
private volatile boolean canReconnect = true;
private volatile boolean isConnected = false;
/**
* 最大重连次数
@@ -119,6 +119,7 @@ public class NettyTcpClient {
public void connect() {
if (isConnecting) {
Log.d(TAG, "正在连接中connect return");
return;
}
Thread clientThread = new Thread("client-Netty") {
@@ -133,14 +134,10 @@ public class NettyTcpClient {
clientThread.start();
}
private void connectServer() {
synchronized (NettyTcpClient.this) {
ChannelFuture channelFuture = null;
if (!isConnect) {
isConnecting = true;
group = new NioEventLoopGroup();
private void initBootstrap() {
try {
bootstrap = new Bootstrap().group(group)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@@ -159,33 +156,61 @@ public class NettyTcpClient {
ch.pipeline().addLast(new NettyClientHandler(mProxyListener, mSign, isSendheartBeat, heartBeatData));
}
});
try {
channelFuture = bootstrap.connect(host, tcp_port).addListener(mFutureListener);//.sync()
// Wait until the connection is closed.
channelFuture.channel().closeFuture().addListener((ChannelFutureListener) future -> {
if (future.channel() != null) {
future.channel().eventLoop().shutdownGracefully();
}
channel = null;
});
Log.e(TAG, " 断开连接");
} catch (Exception e) {
e.printStackTrace();
} finally {
isConnect = false;
if (listener != null) {
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, mSign);
Log.w(TAG, "初始化出错的原因为:" + e.getMessage());
}
// if (null != channelFuture) {
// if (channelFuture.channel() != null && channelFuture.channel().isOpen()) {
// channelFuture.channel().close();
}
private void connectServer() {
Log.d(TAG, "connectServer fun is called.");
synchronized (NettyTcpClient.this) {
if (!isConnected) {
isConnecting = true;
group = new NioEventLoopGroup();
initBootstrap();
doConnect();
} else {
Log.w(TAG, "socket正在连接中无需重连");
}
}
}
private void doConnect() {
if (channel != null && channel.isOpen() && channel.isActive()) {
Log.w(TAG, "Channel is active, no need to call start.");
return;
}
if (!canReconnect) {
Log.d(TAG, "重连已被关闭则直接return.");
return;
}
ChannelFuture channelFuture;
try {
if (bootstrap != null) {
channelFuture = bootstrap.connect(host, tcp_port);
channel = channelFuture.channel();
channelFuture.addListener(mFutureListener);
channel.closeFuture().addListener((ChannelFutureListener) future -> {
// // 确保连接不再被使用时使用否则重连时task会被拒绝
// if (future.channel() != null) {
// future.channel().eventLoop().shutdownGracefully();
// }
// }
// group.shutdownGracefully();
// reconnect();
channel = null;
Log.e(TAG, "连接已经结束!");
});
Log.d(TAG, "连接获取channel已完成");
}
} catch (Exception e) {
Log.e(TAG, "连接Server ip:" + host + ",port:" + tcp_port + ",失败信息:" + e.getMessage());
isConnected = false;
}
}
public void reconnectServer() {
Log.d(TAG, "ReconnectServer function is called.");
if (!TextUtils.isEmpty(host)) {
doConnect();
}
}
@@ -212,19 +237,25 @@ public class NettyTcpClient {
}
};
private ChannelFutureListener mFutureListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
private ChannelFutureListener mFutureListener = channelFuture -> {
if (channelFuture.isDone()) {
if (channelFuture.isSuccess()) {
Log.e(TAG, "连接成功");
Log.d(TAG, "连接成功");
reconnectNum = MAX_CONNECT_TIMES;
isConnect = true;
channel = channelFuture.channel();
isConnected = true;
} else {
Log.e(TAG, "连接失败");
isConnect = false;
if (channelFuture.isCancelled()) {
Log.w(TAG, "Connection attempt cancelled by user.");
} else {
Log.e(TAG, "连接失败原因为:" + channelFuture.cause().getMessage());
}
isConnected = false;
channelFuture.channel().pipeline().fireChannelInactive();
}
isConnecting = false;
} else {
Log.w(TAG, "Connection isn't done.");
isConnected = false;
}
};
@@ -234,43 +265,8 @@ public class NettyTcpClient {
group.shutdownGracefully();
}
public void reconnect() {
Log.e(TAG, "reconnect");
if (isNeedReconnect && reconnectNum > 0 && !isConnect) {
reconnectNum--;
SystemClock.sleep(reconnectIntervalTime);
if (isNeedReconnect && reconnectNum > 0 && !isConnect) {
Log.e(TAG, "重新连接");
connectServer();
}
}
}
public void reconnectServer() {
Log.e(TAG, "***reconnectServer***");
if (channel != null && channel.isOpen() && channel.isActive()) {
return;
}
if (bootstrap != null) {
ChannelFuture channelFuture = null;
try {
channelFuture = bootstrap.connect(host, tcp_port).addListener(mFutureListener);
// Wait until the connection is closed.
channelFuture.channel().closeFuture().sync();
Log.e(TAG, " 断开连接");
} catch (Exception e) {
e.printStackTrace();
} finally {
isConnect = false;
if (listener != null) {
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, mSign);
}
}
}
}
public boolean sendMsgToServer(MogoProtocolMsg protocolMsg, final MessageStateListener listener) {
boolean flag = channel != null && isConnect;
boolean flag = channel != null && isConnected;
if (flag) {
channel.writeAndFlush(protocolMsg).addListener((ChannelFutureListener) channelFuture -> {
if (listener != null) {
@@ -287,7 +283,7 @@ public class NettyTcpClient {
* @return 获取TCP连接状态
*/
public boolean getConnectStatus() {
return isConnect;
return isConnected;
}
public boolean isConnecting() {
@@ -295,7 +291,7 @@ public class NettyTcpClient {
}
public void setConnectStatus(boolean status) {
this.isConnect = status;
this.isConnected = status;
}
public void setListener(NettyClientListener listener) {
@@ -343,7 +339,8 @@ public class NettyTcpClient {
*/
private Object heartBeatData;
public Builder() {}
public Builder() {
}
public Builder setMaxReconnectTimes(int reConnectTimes) {
this.MAX_CONNECT_TIMES = reConnectTimes;

View File

@@ -42,9 +42,6 @@ public class NsdClient {
private static final int MSG_NULL = 1003;
int count;
/**
* @param context this
* @param serviceName 客户端扫描 指定的地址 暂时没用到
@@ -72,25 +69,23 @@ public class NsdClient {
@Override
public void onStartDiscoveryFailed(String serviceType, int errorCode) {
mNsdManager.stopServiceDiscovery(this);
Log.e(TAG, "onStartDiscoveryFailed():");
Log.e(TAG, "onStartDiscoveryFailed()");
}
@Override
public void onStopDiscoveryFailed(String serviceType, int errorCode) {
mNsdManager.stopServiceDiscovery(this);
Log.e(TAG, "onStopDiscoveryFailed():");
Log.e(TAG, "onStopDiscoveryFailed()");
}
@Override
public void onDiscoveryStarted(String serviceType) {
Log.e(TAG, "onDiscoveryStarted():");
Log.e(TAG, "onDiscoveryStarted()");
}
@Override
public void onDiscoveryStopped(String serviceType) {
Log.e(TAG, "onDiscoveryStopped():");
Log.e(TAG, "onDiscoveryStopped()");
}
/**
@@ -99,15 +94,13 @@ public class NsdClient {
*/
@Override
public void onServiceFound(NsdServiceInfo serviceInfo) {
Log.e(TAG, "onServiceFound()");
//根据咱服务器的定义名称,指定解析该 NsdServiceInfo
if (serviceInfo.getServiceName().equals(mServiceName)) {
mNsdManager.resolveService(serviceInfo, mResolverListener);
} else {
mHandler.sendEmptyMessage(MSG_NULL);
}
Log.e(TAG, "onServiceFound():");
mNsdServiceInfoListBefore.add(serviceInfo);
}
@@ -115,7 +108,6 @@ public class NsdClient {
@Override
public void onServiceLost(NsdServiceInfo serviceInfo) {
Log.e(TAG, "onServiceLost(): serviceInfo=" + serviceInfo);
}
};
}
@@ -126,7 +118,6 @@ public class NsdClient {
super.handleMessage(msg);
switch (msg.what) {
case MSG_RESOLVER:
//回调到主线 进行解析結果的回調
NsdServiceInfo serviceInfo = (NsdServiceInfo) msg.obj;
if (mIServerFound != null) {
@@ -152,6 +143,7 @@ public class NsdClient {
mResolverListener = new NsdManager.ResolveListener() {
@Override
public void onResolveFailed(NsdServiceInfo serviceInfo, int errorCode) {
Log.e(TAG, "onResolveFailed()");
}
@Override

View File

@@ -1,19 +1,13 @@
package com.mogo.telematic.client.handler;
import android.text.TextUtils;
import android.util.Log;
import com.mogo.telematic.MogoProtocolMsg;
import com.mogo.telematic.client.listener.NettyClientListener;
import com.mogo.telematic.client.status.ConnectState;
import java.io.UnsupportedEncodingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
@@ -89,9 +83,6 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocol
@Override
public void channelInactive(ChannelHandlerContext ctx) {
Log.e(TAG, "channelInactive");
// NettyTcpClient.getInstance().setConnectStatus(false);
// listener.onServiceStatusConnectChanged(NettyClientListener.STATUS_CONNECT_CLOSED);
// NettyTcpClient.getInstance().reconnect();
}
/**
@@ -112,8 +103,6 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocol
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
// NettyTcpClient.getInstance().setConnectStatus(false);
Log.e(TAG, "exceptionCaught");
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, mSign);
cause.printStackTrace();

View File

@@ -1,71 +0,0 @@
package com.mogo.telematic.client.handler;
import android.util.Log;
import com.mogo.telematic.client.NettyTcpClient;
import java.util.concurrent.TimeUnit;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
public class ReconnectHandler extends ChannelInboundHandlerAdapter {
private static final String TAG = "ReconnectHandler";
private static final int MAX_RETRIES_LIMIT = 29;
private int maxRetries = Integer.MAX_VALUE;
private int retries = 0;
private int baseSleepTimeMs = 1000;
private int maxSleepMs = 60 * 1000;
private NettyTcpClient mNettyClient;
public ReconnectHandler(NettyTcpClient nettyClient) {
mNettyClient = nettyClient;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
retries = 0;
ctx.fireChannelActive();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// TODO:(策略暂时耦合在这里,后面使用策略模式)
boolean allowRetry = false;
if (retries < maxRetries) {
allowRetry = true;
}
if (allowRetry) {
long sleepTimeMs;
if (retries < 0) {
throw new IllegalArgumentException("retries count must greater than 0.");
}
if (retries > MAX_RETRIES_LIMIT) {
retries = MAX_RETRIES_LIMIT;
}
sleepTimeMs = baseSleepTimeMs * Math.max(1, 1 << retries);
if (sleepTimeMs > maxSleepMs) {
sleepTimeMs = maxSleepMs;
}
EventLoop eventLoop = ctx.channel().eventLoop();
eventLoop.schedule(() -> {
Log.d(TAG,"Reconnecting ...");
mNettyClient.reconnectServer();
}, sleepTimeMs, TimeUnit.MILLISECONDS);
++retries;
}else {
Log.d(TAG, "Try to reconnect to the server Over Retry count:" + (retries+1));
}
}
}

View File

@@ -0,0 +1,68 @@
package com.mogo.telematic.client.handler
import android.util.Log
import com.mogo.telematic.client.NettyTcpClient
import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelHandlerContext
import kotlin.Throws
import java.lang.Exception
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.math.max
class ReconnectHandler(private val mNettyClient: NettyTcpClient) : ChannelInboundHandlerAdapter() {
private val maxRetries = Int.MAX_VALUE
private var retries = 0
private val baseSleepTimeMs = 3000
private val maxSleepMs = 10 * 1000
private val scheduledThreadPool by lazy(LazyThreadSafetyMode.SYNCHRONIZED) {
Executors.newScheduledThreadPool(1)
}
companion object {
private const val TAG = "ReconnectHandler"
private const val MAX_RETRIES_LIMIT = 29
}
override fun channelActive(ctx: ChannelHandlerContext) {
retries = 0
ctx.fireChannelActive()
}
@Throws(Exception::class)
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
Log.e(TAG, "exceptionCaught channelId is:${ctx.channel().id()}, cause message is:${cause.message}")
ctx.close()
}
@Throws(Exception::class)
override fun channelInactive(ctx: ChannelHandlerContext) {
Log.d(TAG, "ReconnectHandler channelInactive ...")
// TODO:(策略暂时耦合在这里,后面使用策略模式)
var allowRetry = false
if (retries < maxRetries) {
allowRetry = true
}
if (allowRetry) {
var sleepTimeMs: Long
require(retries >= 0) { "Retries count must greater than 0." }
if (retries > MAX_RETRIES_LIMIT) {
retries = MAX_RETRIES_LIMIT
}
sleepTimeMs = (baseSleepTimeMs * max(1, 1 shl retries)).toLong()
if (sleepTimeMs > maxSleepMs) {
sleepTimeMs = maxSleepMs.toLong()
}
Log.d(TAG, "${sleepTimeMs}ms后执行重连操作")
scheduledThreadPool.schedule({
Log.d(TAG, "Reconnecting server ...")
// 异步重连或单独线程池中同步重连不要阻塞netty的io线程同时也不要关闭EventLoopGroup
mNettyClient.reconnectServer()
}, sleepTimeMs, TimeUnit.MILLISECONDS)
++retries
} else {
Log.d(TAG, "Try to reconnect to the server Over Retry count:${retries + 1}")
}
}
}

View File

@@ -9,4 +9,6 @@ public class ConnectState {
public final static int STATUS_CONNECT_CLOSED = 0;
public final static int STATUS_CONNECT_ERROR = -1;
public final static int STATUS_CONNECT_LOST = -2;
}