[Update]Netty支持多channel通信和业务身份绑定
This commit is contained in:
@@ -5,7 +5,10 @@ import java.util.Arrays;
|
||||
public class MogoProtocolMsg {
|
||||
// 心跳数据包类型
|
||||
public static final int HEART_DATA = 0;
|
||||
// 普通数据
|
||||
public static final int NORMAL_DATA = 1;
|
||||
// 业务相关的身份注册
|
||||
public static final int IDENTITY_REGIST = 2;
|
||||
|
||||
private int protocolType;
|
||||
private int bodyLength;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package com.mogo.telematic;
|
||||
|
||||
import android.content.Context;
|
||||
import android.content.Intent;
|
||||
import android.net.nsd.NsdServiceInfo;
|
||||
import android.util.Log;
|
||||
|
||||
@@ -10,8 +9,6 @@ import com.mogo.telematic.client.NsdClient;
|
||||
import com.mogo.telematic.client.listener.MessageStateListener;
|
||||
import com.mogo.telematic.client.listener.NettyClientListener;
|
||||
import com.mogo.telematic.server.NSDServer;
|
||||
import com.mogo.telematic.server.NettyServerService;
|
||||
import com.mogo.telematic.server.bean.ClientChanel;
|
||||
import com.mogo.telematic.server.netty.NettyServerListener;
|
||||
import com.mogo.telematic.server.netty.NettyTcpServer;
|
||||
|
||||
@@ -46,11 +43,12 @@ public class NSDNettyManager {
|
||||
return sInstance;
|
||||
}
|
||||
|
||||
private NettyServerListener mDefaultListener = new NettyServerListener() {
|
||||
private NettyServerListener mDefaultServerListener = new NettyServerListener() {
|
||||
|
||||
@Override
|
||||
public void onMessageResponseServer(Object msg, String ChannelId) {
|
||||
if (mListener != null) {
|
||||
mListener.onMessageResponseServer(msg, ChannelId);
|
||||
public void onMessageResponseServer(Object msg, Channel channel) {
|
||||
if (msg instanceof MogoProtocolMsg && mListener != null) {
|
||||
mListener.onMessageResponseServer(msg, channel);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,6 +72,7 @@ public class NSDNettyManager {
|
||||
|
||||
@Override
|
||||
public void onChannelConnect(Channel channel) {
|
||||
NettyTcpServer.getInstance().addChannel(channel);
|
||||
if (mListener != null) {
|
||||
mListener.onChannelConnect(channel);
|
||||
}
|
||||
@@ -81,6 +80,7 @@ public class NSDNettyManager {
|
||||
|
||||
@Override
|
||||
public void onChannelDisConnect(Channel channel) {
|
||||
NettyTcpServer.getInstance().removeChannel(channel);
|
||||
if (mListener != null) {
|
||||
mListener.onChannelDisConnect(channel);
|
||||
}
|
||||
@@ -89,6 +89,7 @@ public class NSDNettyManager {
|
||||
|
||||
/**
|
||||
* 启动Netty和NSD服务的Server端
|
||||
*
|
||||
* @param context
|
||||
* @param listener
|
||||
*/
|
||||
@@ -101,7 +102,7 @@ public class NSDNettyManager {
|
||||
mListener = listener;
|
||||
NettyTcpServer nettyTcpServer = NettyTcpServer.getInstance();
|
||||
if (!nettyTcpServer.isServerStart()) {
|
||||
nettyTcpServer.setListener(mDefaultListener);
|
||||
nettyTcpServer.setListener(mDefaultServerListener);
|
||||
nettyTcpServer.start();
|
||||
} else {
|
||||
NettyTcpServer.getInstance().disconnect();
|
||||
@@ -109,17 +110,17 @@ public class NSDNettyManager {
|
||||
}
|
||||
|
||||
public boolean isServerStart() {
|
||||
return NettyTcpServer.getInstance().isServerStart();
|
||||
}
|
||||
|
||||
public void sendByteArrayToClient(byte[] byteArray, ChannelFutureListener listener) {
|
||||
NettyTcpServer.getInstance().sendMsgToClient(byteArray, listener);
|
||||
return NettyTcpServer.getInstance().isServerStart();
|
||||
}
|
||||
|
||||
public void sendMogoProtocolMsgToClient(MogoProtocolMsg mogoProtocolMsg, ChannelFutureListener listener) {
|
||||
NettyTcpServer.getInstance().sendMsgToClient(mogoProtocolMsg, listener);
|
||||
}
|
||||
|
||||
public void sendMsgToAllClients(MogoProtocolMsg mogoProtocolMsg) {
|
||||
NettyTcpServer.getInstance().sendMsgToAllClients(mogoProtocolMsg);
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务器端注册一个可供NSD探测到的网络 Ip 地址,便于给展示叫号机连接此socket
|
||||
*/
|
||||
@@ -138,14 +139,17 @@ public class NSDNettyManager {
|
||||
//已经注册可停止该服务
|
||||
// nsdServer.stopNSDServer();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onServiceUnregistered(NsdServiceInfo serviceInfo) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onUnRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
|
||||
|
||||
@@ -160,6 +164,7 @@ public class NSDNettyManager {
|
||||
|
||||
/**
|
||||
* 选择连接的Client端的channel
|
||||
*
|
||||
* @param channel
|
||||
*/
|
||||
public void selectChannel(Channel channel) {
|
||||
@@ -168,9 +173,10 @@ public class NSDNettyManager {
|
||||
|
||||
/**
|
||||
* 搜索并连接Netty服务端
|
||||
*
|
||||
* @param context
|
||||
*/
|
||||
public void searchAndConnectServer(Context context, NettyClientListener listener) {
|
||||
public void searchAndConnectServer(Context context, String uuid, NettyClientListener listener) {
|
||||
if (mNsdClient == null) {
|
||||
mNsdClient = new NsdClient(context, SERVER_NAME, new NsdClient.IServerFound() {
|
||||
@Override
|
||||
@@ -179,7 +185,7 @@ public class NSDNettyManager {
|
||||
String hostAddress = info.getHost().getHostAddress();
|
||||
Log.d(TAG, "NSD查询到指定服务器信息ip为:" + hostAddress + ",port为:" + port);
|
||||
//获取到指定的地址,进行Netty的连接
|
||||
connectNettyServer(hostAddress, port, listener);
|
||||
connectNettyServer(hostAddress, port, uuid, listener);
|
||||
|
||||
if (info.getServiceName().equals(SERVER_NAME)) {
|
||||
//扫描到以后停止
|
||||
@@ -197,7 +203,7 @@ public class NSDNettyManager {
|
||||
mNsdClient.startNSDClient();
|
||||
}
|
||||
|
||||
private void connectNettyServer(String serverAddress, int port, NettyClientListener listener) {
|
||||
private void connectNettyServer(String serverAddress, int port, String sign, NettyClientListener listener) {
|
||||
Log.d(TAG, "connectNettyServer");
|
||||
if (serverAddress == null || serverAddress.length() == 0) {
|
||||
Log.e(TAG, "Netty Server的ip不能为空!");
|
||||
@@ -212,7 +218,7 @@ public class NSDNettyManager {
|
||||
.setSendHeartBeat(true) //设置是否发送心跳
|
||||
.setHeartBeatInterval(120) //设置心跳间隔时间。单位:秒
|
||||
.setHeartBeatData(new MogoProtocolMsg(MogoProtocolMsg.HEART_DATA, 2, new byte[]{0x00, 0x00})) //设置心跳数据,可以是String类型,也可以是byte[],以后设置的为准
|
||||
.setIndex(0) //设置客户端标识.(因为可能存在多个tcp连接)
|
||||
.setSign(sign) //设置客户端标识.(因为可能存在多个tcp连接)
|
||||
.build();
|
||||
if (listener != null) {
|
||||
mNettyTcpClient.setListener(listener); //设置TCP监听
|
||||
@@ -227,7 +233,15 @@ public class NSDNettyManager {
|
||||
}
|
||||
|
||||
public boolean getConnectStatus() {
|
||||
return mNettyTcpClient.getConnectStatus();
|
||||
return mNettyTcpClient != null && mNettyTcpClient.getConnectStatus();
|
||||
}
|
||||
|
||||
public String getConnServerIp() {
|
||||
if (mNettyTcpClient == null) {
|
||||
return "";
|
||||
} else {
|
||||
return mNettyTcpClient.getHost();
|
||||
}
|
||||
}
|
||||
|
||||
public void sendMogoProtocolMsgToServer(MogoProtocolMsg mogoProtocolMsg, final MessageStateListener listener) {
|
||||
@@ -236,17 +250,6 @@ public class NSDNettyManager {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发送byte[]到服务端
|
||||
* @param byteArray
|
||||
* @param listener
|
||||
*/
|
||||
public void sendByteArrayToServer(byte[] byteArray, final MessageStateListener listener) {
|
||||
if (mNettyTcpClient != null) {
|
||||
mNettyTcpClient.sendMsgToServer(byteArray, listener);
|
||||
}
|
||||
}
|
||||
|
||||
public static String bytesToHexFun(byte[] bytes, int length) {
|
||||
StringBuilder buf = new StringBuilder(length * 2);
|
||||
for (int i = 0; i < length; i++) {// 使用String的format方法进行转换
|
||||
|
||||
@@ -3,6 +3,7 @@ package com.mogo.telematic.client;
|
||||
import static com.mogo.telematic.MogoLengthFrameDecoder.LENGTH_FIELD_OFFSET;
|
||||
import static com.mogo.telematic.MogoLengthFrameDecoder.LENGTH_FIELD_SIZE;
|
||||
import static com.mogo.telematic.MogoLengthFrameDecoder.MAX_FRAME_LENGTH;
|
||||
import static com.mogo.telematic.MogoProtocolMsg.IDENTITY_REGIST;
|
||||
|
||||
import android.os.SystemClock;
|
||||
import android.text.TextUtils;
|
||||
@@ -12,6 +13,7 @@ import com.mogo.telematic.MogoLengthFrameDecoder;
|
||||
import com.mogo.telematic.MogoMessageEncoder;
|
||||
import com.mogo.telematic.MogoProtocolMsg;
|
||||
import com.mogo.telematic.client.handler.NettyClientHandler;
|
||||
import com.mogo.telematic.client.handler.ReconnectHandler;
|
||||
import com.mogo.telematic.client.listener.MessageStateListener;
|
||||
import com.mogo.telematic.client.listener.NettyClientListener;
|
||||
import com.mogo.telematic.client.status.ConnectState;
|
||||
@@ -19,8 +21,6 @@ import com.mogo.telematic.client.status.ConnectState;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.Channel;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
@@ -30,12 +30,7 @@ import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
|
||||
import io.netty.handler.codec.LineBasedFrameDecoder;
|
||||
import io.netty.handler.codec.string.StringDecoder;
|
||||
import io.netty.handler.codec.string.StringEncoder;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import io.netty.util.CharsetUtil;
|
||||
|
||||
/**
|
||||
* TCP 客户端
|
||||
@@ -43,6 +38,8 @@ import io.netty.util.CharsetUtil;
|
||||
public class NettyTcpClient {
|
||||
private static final String TAG = "NettyTcpClient";
|
||||
|
||||
private Bootstrap bootstrap;
|
||||
|
||||
private EventLoopGroup group;
|
||||
|
||||
private NettyClientListener listener;
|
||||
@@ -66,7 +63,7 @@ public class NettyTcpClient {
|
||||
|
||||
private String host;
|
||||
private int tcp_port;
|
||||
private int mIndex;
|
||||
private String mSign;
|
||||
/**
|
||||
* 心跳间隔时间
|
||||
*/
|
||||
@@ -82,21 +79,10 @@ public class NettyTcpClient {
|
||||
*/
|
||||
private Object heartBeatData;
|
||||
|
||||
private String packetSeparator;
|
||||
private int maxPacketLong = 1024;
|
||||
|
||||
private void setPacketSeparator(String separator) {
|
||||
this.packetSeparator = separator;
|
||||
}
|
||||
|
||||
private void setMaxPacketLong(int maxPacketLong) {
|
||||
this.maxPacketLong = maxPacketLong;
|
||||
}
|
||||
|
||||
private NettyTcpClient(String host, int tcp_port, int index) {
|
||||
private NettyTcpClient(String host, int tcp_port, String sign) {
|
||||
this.host = host;
|
||||
this.tcp_port = tcp_port;
|
||||
this.mIndex = index;
|
||||
this.mSign = sign;
|
||||
}
|
||||
|
||||
public int getMaxConnectTimes() {
|
||||
@@ -115,8 +101,12 @@ public class NettyTcpClient {
|
||||
return tcp_port;
|
||||
}
|
||||
|
||||
public int getIndex() {
|
||||
return mIndex;
|
||||
public String getSign() {
|
||||
return mSign;
|
||||
}
|
||||
|
||||
public void setSign(String sign) {
|
||||
this.mSign = sign;
|
||||
}
|
||||
|
||||
public long getHeartBeatInterval() {
|
||||
@@ -150,13 +140,13 @@ public class NettyTcpClient {
|
||||
if (!isConnect) {
|
||||
isConnecting = true;
|
||||
group = new NioEventLoopGroup();
|
||||
Bootstrap bootstrap = new Bootstrap().group(group)
|
||||
.option(ChannelOption.TCP_NODELAY, true)//屏蔽Nagle算法试图
|
||||
bootstrap = new Bootstrap().group(group)
|
||||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
|
||||
.channel(NioSocketChannel.class)
|
||||
.handler(new ChannelInitializer<SocketChannel>() {
|
||||
@Override
|
||||
public void initChannel(SocketChannel ch) throws Exception {
|
||||
ch.pipeline().addLast(new ReconnectHandler(NettyTcpClient.this));
|
||||
if (isSendheartBeat) {
|
||||
ch.pipeline().addLast("ping", new IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS));//5s未发送数据,回调userEventTriggered
|
||||
}
|
||||
@@ -166,47 +156,77 @@ public class NettyTcpClient {
|
||||
LENGTH_FIELD_SIZE));
|
||||
// encoder
|
||||
ch.pipeline().addLast(new MogoMessageEncoder());
|
||||
ch.pipeline().addLast(new NettyClientHandler(listener, mIndex, isSendheartBeat, heartBeatData, packetSeparator));
|
||||
ch.pipeline().addLast(new NettyClientHandler(mProxyListener, mSign, isSendheartBeat, heartBeatData));
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
channelFuture = bootstrap.connect(host, tcp_port).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
if (channelFuture.isSuccess()) {
|
||||
Log.e(TAG, "连接成功");
|
||||
reconnectNum = MAX_CONNECT_TIMES;
|
||||
isConnect = true;
|
||||
channel = channelFuture.channel();
|
||||
} else {
|
||||
Log.e(TAG, "连接失败");
|
||||
isConnect = false;
|
||||
}
|
||||
isConnecting = false;
|
||||
}
|
||||
}).sync();
|
||||
|
||||
channelFuture = bootstrap.connect(host, tcp_port).addListener(mFutureListener);//.sync()
|
||||
// Wait until the connection is closed.
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
channelFuture.channel().closeFuture().addListener((ChannelFutureListener) future -> {
|
||||
if (future.channel() != null) {
|
||||
future.channel().eventLoop().shutdownGracefully();
|
||||
}
|
||||
channel = null;
|
||||
});
|
||||
Log.e(TAG, " 断开连接");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
isConnect = false;
|
||||
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, mIndex);
|
||||
if (null != channelFuture) {
|
||||
if (channelFuture.channel() != null && channelFuture.channel().isOpen()) {
|
||||
channelFuture.channel().close();
|
||||
}
|
||||
if (listener != null) {
|
||||
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, mSign);
|
||||
}
|
||||
group.shutdownGracefully();
|
||||
reconnect();
|
||||
// if (null != channelFuture) {
|
||||
// if (channelFuture.channel() != null && channelFuture.channel().isOpen()) {
|
||||
// channelFuture.channel().close();
|
||||
// }
|
||||
// }
|
||||
// group.shutdownGracefully();
|
||||
// reconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private NettyClientListener mProxyListener = new NettyClientListener() {
|
||||
@Override
|
||||
public void onMessageResponseClient(Object msg, String sign) {
|
||||
if (listener != null) {
|
||||
listener.onMessageResponseClient(msg, sign);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClientStatusConnectChanged(int statusCode, String sign) {
|
||||
if (statusCode == ConnectState.STATUS_CONNECT_SUCCESS && !TextUtils.isEmpty(mSign)) {
|
||||
byte[] signByteArr = mSign.getBytes();
|
||||
MogoProtocolMsg msg = new MogoProtocolMsg(IDENTITY_REGIST, signByteArr.length, signByteArr);
|
||||
sendMsgToServer(msg, isSuccess -> {
|
||||
Log.d(TAG, "身份注册成功:" + isSuccess);
|
||||
});
|
||||
}
|
||||
if (listener != null) {
|
||||
listener.onClientStatusConnectChanged(statusCode, sign);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
private ChannelFutureListener mFutureListener = new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
if (channelFuture.isSuccess()) {
|
||||
Log.e(TAG, "连接成功");
|
||||
reconnectNum = MAX_CONNECT_TIMES;
|
||||
isConnect = true;
|
||||
channel = channelFuture.channel();
|
||||
} else {
|
||||
Log.e(TAG, "连接失败");
|
||||
isConnect = false;
|
||||
}
|
||||
isConnecting = false;
|
||||
}
|
||||
};
|
||||
|
||||
public void disconnect() {
|
||||
Log.e(TAG, "disconnect");
|
||||
@@ -226,41 +246,27 @@ public class NettyTcpClient {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步发送
|
||||
*
|
||||
* @param data 要发送的数据
|
||||
* @param listener 发送结果回调
|
||||
* @return 方法执行结果
|
||||
*/
|
||||
public boolean sendMsgToServer(String data, final MessageStateListener listener) {
|
||||
boolean flag = channel != null && isConnect;
|
||||
if (flag) {
|
||||
String separator = TextUtils.isEmpty(packetSeparator) ? System.getProperty("line.separator") : packetSeparator;
|
||||
ChannelFuture channelFuture = channel.writeAndFlush(data + separator).addListener(new ChannelFutureListener() {
|
||||
@Override
|
||||
public void operationComplete(ChannelFuture channelFuture) throws Exception {
|
||||
listener.isSendSuccess(channelFuture.isSuccess());
|
||||
public void reconnectServer() {
|
||||
Log.e(TAG, "***reconnectServer***");
|
||||
if (channel != null && channel.isOpen() && channel.isActive()) {
|
||||
return;
|
||||
}
|
||||
if (bootstrap != null) {
|
||||
ChannelFuture channelFuture = null;
|
||||
try {
|
||||
channelFuture = bootstrap.connect(host, tcp_port).addListener(mFutureListener);
|
||||
// Wait until the connection is closed.
|
||||
channelFuture.channel().closeFuture().sync();
|
||||
Log.e(TAG, " 断开连接");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
} finally {
|
||||
isConnect = false;
|
||||
if (listener != null) {
|
||||
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_CLOSED, mSign);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return flag;
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步发送
|
||||
*
|
||||
* @param data 要发送的数据
|
||||
* @return 方法执行结果
|
||||
*/
|
||||
public boolean sendMsgToServer(String data) {
|
||||
boolean flag = channel != null && isConnect;
|
||||
if (flag) {
|
||||
String separator = TextUtils.isEmpty(packetSeparator) ? System.getProperty("line.separator") : packetSeparator;
|
||||
ChannelFuture channelFuture = channel.writeAndFlush(data + separator).awaitUninterruptibly();
|
||||
return channelFuture.isSuccess();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean sendMsgToServer(MogoProtocolMsg protocolMsg, final MessageStateListener listener) {
|
||||
@@ -275,19 +281,6 @@ public class NettyTcpClient {
|
||||
return flag;
|
||||
}
|
||||
|
||||
public boolean sendMsgToServer(byte[] data, final MessageStateListener listener) {
|
||||
boolean flag = channel != null && isConnect;
|
||||
if (flag) {
|
||||
ByteBuf buf = Unpooled.copiedBuffer(data);
|
||||
channel.writeAndFlush(buf).addListener((ChannelFutureListener) channelFuture -> {
|
||||
if (listener != null) {
|
||||
listener.isSendSuccess(channelFuture.isSuccess());
|
||||
}
|
||||
});
|
||||
}
|
||||
return flag;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取TCP连接状态
|
||||
*
|
||||
@@ -309,15 +302,6 @@ public class NettyTcpClient {
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public byte[] strToByteArray(String str) {
|
||||
if (str == null) {
|
||||
return null;
|
||||
}
|
||||
byte[] byteArray = str.getBytes();
|
||||
return byteArray;
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建者,创建NettyTcpClient
|
||||
*/
|
||||
@@ -343,7 +327,7 @@ public class NettyTcpClient {
|
||||
/**
|
||||
* 客户端标识,(因为可能存在多个连接)
|
||||
*/
|
||||
private int mIndex;
|
||||
private String mSign;
|
||||
|
||||
/**
|
||||
* 是否发送心跳
|
||||
@@ -359,23 +343,7 @@ public class NettyTcpClient {
|
||||
*/
|
||||
private Object heartBeatData;
|
||||
|
||||
private String packetSeparator;
|
||||
private int maxPacketLong = 1024;
|
||||
|
||||
public Builder() {
|
||||
this.maxPacketLong = 1024;
|
||||
}
|
||||
|
||||
|
||||
public Builder setPacketSeparator(String packetSeparator) {
|
||||
this.packetSeparator = packetSeparator;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setMaxPacketLong(int maxPacketLong) {
|
||||
this.maxPacketLong = maxPacketLong;
|
||||
return this;
|
||||
}
|
||||
public Builder() {}
|
||||
|
||||
public Builder setMaxReconnectTimes(int reConnectTimes) {
|
||||
this.MAX_CONNECT_TIMES = reConnectTimes;
|
||||
@@ -399,8 +367,8 @@ public class NettyTcpClient {
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder setIndex(int mIndex) {
|
||||
this.mIndex = mIndex;
|
||||
public Builder setSign(String sign) {
|
||||
this.mSign = sign;
|
||||
return this;
|
||||
}
|
||||
|
||||
@@ -420,14 +388,12 @@ public class NettyTcpClient {
|
||||
}
|
||||
|
||||
public NettyTcpClient build() {
|
||||
NettyTcpClient nettyTcpClient = new NettyTcpClient(host, tcp_port, mIndex);
|
||||
NettyTcpClient nettyTcpClient = new NettyTcpClient(host, tcp_port, mSign);
|
||||
nettyTcpClient.MAX_CONNECT_TIMES = this.MAX_CONNECT_TIMES;
|
||||
nettyTcpClient.reconnectIntervalTime = this.reconnectIntervalTime;
|
||||
nettyTcpClient.heartBeatInterval = this.heartBeatInterval;
|
||||
nettyTcpClient.isSendheartBeat = this.isSendheartBeat;
|
||||
nettyTcpClient.heartBeatData = this.heartBeatData;
|
||||
nettyTcpClient.packetSeparator = this.packetSeparator;
|
||||
nettyTcpClient.maxPacketLong = this.maxPacketLong;
|
||||
return nettyTcpClient;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,20 +22,14 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocol
|
||||
private static final String TAG = "NettyClientHandler";
|
||||
private final boolean isSendheartBeat;
|
||||
private NettyClientListener listener;
|
||||
private int index;
|
||||
private String mSign;
|
||||
private Object heartBeatData;
|
||||
private String packetSeparator;
|
||||
|
||||
public NettyClientHandler(NettyClientListener listener, int index, boolean isSendheartBeat, Object heartBeatData) {
|
||||
this(listener, index, isSendheartBeat, heartBeatData, null);
|
||||
}
|
||||
|
||||
public NettyClientHandler(NettyClientListener listener, int index, boolean isSendheartBeat, Object heartBeatData, String separator) {
|
||||
public NettyClientHandler(NettyClientListener listener, String sign, boolean isSendheartBeat, Object heartBeatData) {
|
||||
this.listener = listener;
|
||||
this.index = index;
|
||||
this.mSign = sign;
|
||||
this.isSendheartBeat = isSendheartBeat;
|
||||
this.heartBeatData = heartBeatData;
|
||||
this.packetSeparator = TextUtils.isEmpty(separator) ? System.getProperty("line.separator") : separator;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -84,8 +78,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocol
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
Log.e(TAG, "channelActive");
|
||||
// NettyTcpClient.getInstance().setConnectStatus(true);
|
||||
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_SUCCESS, index);
|
||||
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_SUCCESS, mSign);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -110,7 +103,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocol
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext ctx, MogoProtocolMsg msg) throws Exception {
|
||||
Log.d(TAG, "Client channelRead0:" + msg.toString());
|
||||
listener.onMessageResponseClient(msg, index);
|
||||
listener.onMessageResponseClient(msg, mSign);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -122,7 +115,7 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<MogoProtocol
|
||||
// Close the connection when an exception is raised.
|
||||
// NettyTcpClient.getInstance().setConnectStatus(false);
|
||||
Log.e(TAG, "exceptionCaught");
|
||||
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, index);
|
||||
listener.onClientStatusConnectChanged(ConnectState.STATUS_CONNECT_ERROR, mSign);
|
||||
cause.printStackTrace();
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
@@ -0,0 +1,71 @@
|
||||
package com.mogo.telematic.client.handler;
|
||||
|
||||
import android.util.Log;
|
||||
|
||||
import com.mogo.telematic.client.NettyTcpClient;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.channel.EventLoop;
|
||||
|
||||
public class ReconnectHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
private static final String TAG = "ReconnectHandler";
|
||||
private static final int MAX_RETRIES_LIMIT = 29;
|
||||
|
||||
private int maxRetries = Integer.MAX_VALUE;
|
||||
private int retries = 0;
|
||||
private int baseSleepTimeMs = 1000;
|
||||
private int maxSleepMs = 60 * 1000;
|
||||
|
||||
private NettyTcpClient mNettyClient;
|
||||
|
||||
public ReconnectHandler(NettyTcpClient nettyClient) {
|
||||
mNettyClient = nettyClient;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
retries = 0;
|
||||
ctx.fireChannelActive();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
// TODO:(策略暂时耦合在这里,后面使用策略模式)
|
||||
boolean allowRetry = false;
|
||||
if (retries < maxRetries) {
|
||||
allowRetry = true;
|
||||
}
|
||||
if (allowRetry) {
|
||||
long sleepTimeMs;
|
||||
|
||||
if (retries < 0) {
|
||||
throw new IllegalArgumentException("retries count must greater than 0.");
|
||||
}
|
||||
if (retries > MAX_RETRIES_LIMIT) {
|
||||
retries = MAX_RETRIES_LIMIT;
|
||||
}
|
||||
sleepTimeMs = baseSleepTimeMs * Math.max(1, 1 << retries);
|
||||
if (sleepTimeMs > maxSleepMs) {
|
||||
sleepTimeMs = maxSleepMs;
|
||||
}
|
||||
|
||||
EventLoop eventLoop = ctx.channel().eventLoop();
|
||||
eventLoop.schedule(() -> {
|
||||
Log.d(TAG,"Reconnecting ...");
|
||||
mNettyClient.reconnectServer();
|
||||
}, sleepTimeMs, TimeUnit.MILLISECONDS);
|
||||
++retries;
|
||||
}else {
|
||||
Log.d(TAG, "Try to reconnect to the server Over Retry count:" + (retries+1));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -9,14 +9,14 @@ public interface NettyClientListener<T> {
|
||||
/**
|
||||
* 当接收到系统消息
|
||||
* @param msg 消息
|
||||
* @param index tcp 客户端的标识,因为一个应用程序可能有很多个长链接
|
||||
* @param sign tcp 客户端的标识,因为一个应用程序可能有很多个长链接
|
||||
*/
|
||||
void onMessageResponseClient(T msg, int index);
|
||||
void onMessageResponseClient(T msg, String sign);
|
||||
|
||||
/**
|
||||
* 当服务状态发生变化时触发
|
||||
* @param statusCode 状态变化
|
||||
* @param index tcp 客户端的标识,因为一个应用程序可能有很多个长链接
|
||||
* @param sign tcp 客户端的标识,因为一个应用程序可能有很多个长链接
|
||||
*/
|
||||
public void onClientStatusConnectChanged(int statusCode, int index);
|
||||
void onClientStatusConnectChanged(int statusCode, String sign);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user