[Update]Netty支持传输byte[]并封装使用

This commit is contained in:
chenfufeng
2022-02-10 14:32:52 +08:00
parent 2f5c5f9b28
commit 4e2c6ffd7a
9 changed files with 426 additions and 298 deletions

View File

@@ -0,0 +1,256 @@
package com.mogo.telematic;
import android.content.Context;
import android.content.Intent;
import android.net.nsd.NsdServiceInfo;
import android.util.Log;
import com.mogo.telematic.client.NettyTcpClient;
import com.mogo.telematic.client.NsdClient;
import com.mogo.telematic.client.listener.MessageStateListener;
import com.mogo.telematic.client.listener.NettyClientListener;
import com.mogo.telematic.server.NSDServer;
import com.mogo.telematic.server.NettyServerService;
import com.mogo.telematic.server.bean.ClientChanel;
import com.mogo.telematic.server.netty.NettyServerListener;
import com.mogo.telematic.server.netty.NettyTcpServer;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
public class NSDNettyManager {
private static final String TAG = "NSDNettyManager";
private volatile static NSDNettyManager sInstance;
// client端用来过滤的
public static final String SERVER_NAME = "NSD_SERVER";
private NSDServer mNsdServer;
private NettyTcpClient mNettyTcpClient;
private NsdClient mNsdClient;
private Context mContext;
private NettyServerListener mListener;
private NSDNettyManager() {
}
public static NSDNettyManager getInstance() {
if (sInstance == null) {
synchronized (NSDNettyManager.class) {
if (sInstance == null) {
sInstance = new NSDNettyManager();
}
}
}
return sInstance;
}
private NettyServerListener mDefaultListener = new NettyServerListener() {
@Override
public void onMessageResponseServer(Object msg, String ChannelId) {
}
@Override
public void onStartServer() {
registerNsdServer();
if (mListener != null) {
mListener.onStartServer();
}
}
@Override
public void onStopServer() {
if (mNsdServer != null) {
mNsdServer.stopNSDServer();
}
if (mListener != null) {
mListener.onStopServer();
}
}
@Override
public void onChannelConnect(Channel channel) {
if (mListener != null) {
mListener.onChannelConnect(channel);
}
}
@Override
public void onChannelDisConnect(Channel channel) {
if (mListener != null) {
mListener.onChannelDisConnect(channel);
}
}
};
/**
* 启动Netty和NSD服务的Server端
* @param context
* @param listener
*/
public void startNSDNettyServer(Context context, NettyServerListener listener) {
if (context == null) {
throw new RuntimeException("Context must not be null!");
} else {
mContext = context;
}
mListener = listener;
NettyTcpServer nettyTcpServer = NettyTcpServer.getInstance();
if (!nettyTcpServer.isServerStart()) {
nettyTcpServer.setListener(mDefaultListener);
nettyTcpServer.start();
} else {
NettyTcpServer.getInstance().disconnect();
}
}
public boolean isServerStart() {
return NettyTcpServer.getInstance().isServerStart();
}
public void sendByteArrayToClient(byte[] byteArray, ChannelFutureListener listener) {
NettyTcpServer.getInstance().sendMsgToClient(byteArray, listener);
}
/**
* 服务器端注册一个可供NSD探测到的网络 Ip 地址便于给展示叫号机连接此socket
*/
private Runnable mNsdServerRunnable = new Runnable() {
@Override
public void run() {
if (mNsdServer == null) {
mNsdServer = new NSDServer();
}
mNsdServer.startNSDServer(mContext, NettyTcpServer.SERVER_NAME, NettyTcpServer.SERVER_PORT);
mNsdServer.setRegisterState(new NSDServer.IRegisterState() {
@Override
public void onServiceRegistered(NsdServiceInfo serviceInfo) {
Log.i(TAG, "已注册服务onServiceRegistered: " + serviceInfo.toString());
//已经注册可停止该服务
// nsdServer.stopNSDServer();
}
@Override
public void onRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
}
@Override
public void onServiceUnregistered(NsdServiceInfo serviceInfo) {
}
@Override
public void onUnRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
}
});
}
};
private void registerNsdServer() {
new Thread(mNsdServerRunnable).start();
}
/**
* 选择连接的Client端的channel
* @param channel
*/
public void selectChannel(Channel channel) {
NettyTcpServer.getInstance().selectorChannel(channel);
}
/**
* 搜索并连接Netty服务端
* @param context
*/
public void searchAndConnectServer(Context context, NettyClientListener listener) {
if (mNsdClient == null) {
mNsdClient = new NsdClient(context, SERVER_NAME, new NsdClient.IServerFound() {
@Override
public void onServerFound(NsdServiceInfo info, int port) {
if (info != null) {
String hostAddress = info.getHost().getHostAddress();
Log.d(TAG, "NSD查询到指定服务器信息ip为" + hostAddress + ",port为" + port);
//获取到指定的地址进行Netty的连接
connectNettyServer(hostAddress, port, listener);
if (info.getServiceName().equals(SERVER_NAME)) {
//扫描到以后停止
mNsdClient.stopServiceDiscovery();
}
}
}
@Override
public void onServerFail() {
}
});
}
mNsdClient.startNSDClient();
}
private void connectNettyServer(String serverAddress, int port, NettyClientListener listener) {
Log.d(TAG, "connectNettyServer");
if (serverAddress == null || serverAddress.length() == 0) {
Log.e(TAG, "Netty Server的ip不能为空");
return;
}
if (mNettyTcpClient == null) {
mNettyTcpClient = new NettyTcpClient.Builder()
.setHost(serverAddress) //设置服务端地址
.setTcpPort(port) //设置服务端端口号
.setMaxReconnectTimes(5) //设置最大重连次数
.setReconnectIntervalTime(5) //设置重连间隔时间。单位:秒
.setSendHeartBeat(true) //设置是否发送心跳
.setHeartBeatInterval(5) //设置心跳间隔时间。单位:秒
.setHeartBeatData(new byte[]{0x00, 0x00, 0x00, 0x00}) //设置心跳数据可以是String类型也可以是byte[],以后设置的为准
.setIndex(0) //设置客户端标识.(因为可能存在多个tcp连接)
// .setPacketSeparator("#")//用特殊字符,作为分隔符,解决粘包问题,默认是用换行符作为分隔符
// .setMaxPacketLong(1024)//设置一次发送数据的最大长度默认是1024最大值为Integer.MAX
.build();
if (listener != null) {
mNettyTcpClient.setListener(listener); //设置TCP监听
}
}
if (!mNettyTcpClient.getConnectStatus()) {
mNettyTcpClient.connect();//连接服务器
} else {
mNettyTcpClient.disconnect();
}
}
public boolean getConnectStatus() {
return mNettyTcpClient.getConnectStatus();
}
/**
* 发送byte[]到服务端
* @param byteArray
* @param listener
*/
public void sendByteArrayToServer(byte[] byteArray, final MessageStateListener listener) {
if (mNettyTcpClient != null) {
mNettyTcpClient.sendMsgToServer(byteArray, listener);
}
}
public static String bytesToHexFun(byte[] bytes, int length) {
StringBuilder buf = new StringBuilder(length * 2);
for (int i = 0; i < length; i++) {// 使用String的format方法进行转换
buf.append(String.format("%02x", new Integer(bytes[i] & 0xFF)));
}
return buf.toString();
}
/**
* 断开Netty连接
*/
public void disconnect() {
if (mNettyTcpClient != null) {
mNettyTcpClient.disconnect();
}
}
}

View File

@@ -153,20 +153,7 @@ public class NettyTcpClient {
if (isSendheartBeat) {
ch.pipeline().addLast("ping", new IdleStateHandler(0, heartBeatInterval, 0, TimeUnit.SECONDS));//5s未发送数据回调userEventTriggered
}
//黏包处理,需要客户端、服务端配合
if (!TextUtils.isEmpty(packetSeparator)) {
ByteBuf delimiter= Unpooled.buffer();
delimiter.writeBytes(packetSeparator.getBytes());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(maxPacketLong,delimiter));
} else {
ch.pipeline().addLast(new LineBasedFrameDecoder(maxPacketLong));
}
ch.pipeline().addLast(new StringEncoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new StringDecoder(CharsetUtil.UTF_8));
ch.pipeline().addLast(new NettyClientHandler(listener, mIndex, isSendheartBeat, heartBeatData,packetSeparator));
ch.pipeline().addLast(new NettyClientHandler(listener, mIndex, isSendheartBeat, heartBeatData, packetSeparator));
}
});
@@ -272,14 +259,12 @@ public class NettyTcpClient {
return false;
}
public boolean sendMsgToServer(byte[] data, final MessageStateListener listener) {
boolean flag = channel != null && isConnect;
if (flag) {
ByteBuf buf = Unpooled.copiedBuffer(data);
channel.writeAndFlush(buf).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
channel.writeAndFlush(buf).addListener((ChannelFutureListener) channelFuture -> {
if (listener != null) {
listener.isSendSuccss(channelFuture.isSuccess());
}
});
@@ -408,7 +393,7 @@ public class NettyTcpClient {
return this;
}
public Builder setSendheartBeat(boolean isSendheartBeat) {
public Builder setSendHeartBeat(boolean isSendheartBeat) {
this.isSendheartBeat = isSendheartBeat;
return this;
}

View File

@@ -168,13 +168,13 @@ public class NsdClient {
Message msg = Message.obtain();
msg.what = MSG_RESOLVER;
msg.obj = serviceInfo;
mHandler.sendMessageDelayed(msg, 500);
mHandler.sendMessageDelayed(msg, 200);
}
};
}
public void stopNSDServer() {
public void stopServiceDiscovery() {
mNsdManager.stopServiceDiscovery(mDiscoveryListener);
}

View File

@@ -6,6 +6,8 @@ import android.util.Log;
import com.mogo.telematic.client.listener.NettyClientListener;
import com.mogo.telematic.client.status.ConnectState;
import java.io.UnsupportedEncodingException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
@@ -14,7 +16,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
public class NettyClientHandler extends SimpleChannelInboundHandler<Object> {
private static final String TAG = "NettyClientHandler";
private final boolean isSendheartBeat;
@@ -29,10 +31,10 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
public NettyClientHandler(NettyClientListener listener, int index, boolean isSendheartBeat, Object heartBeatData) {
this(listener,index,isSendheartBeat,heartBeatData,null);
this(listener, index, isSendheartBeat, heartBeatData, null);
}
public NettyClientHandler(NettyClientListener listener, int index, boolean isSendheartBeat, Object heartBeatData,String separator) {
public NettyClientHandler(NettyClientListener listener, int index, boolean isSendheartBeat, Object heartBeatData, String separator) {
this.listener = listener;
this.index = index;
this.isSendheartBeat = isSendheartBeat;
@@ -55,18 +57,13 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
// ctx.channel().writeAndFlush("Heartbeat" + System.getProperty("line.separator"));
if (isSendheartBeat) {
if (heartBeatData == null) {
ctx.channel().writeAndFlush("Heartbeat" + packetSeparator);
ByteBuf buf = Unpooled.copiedBuffer(requestBody);
ctx.channel().writeAndFlush(buf);
} else if (heartBeatData instanceof byte[]) {
ByteBuf buf = Unpooled.copiedBuffer((byte[]) heartBeatData);
ctx.channel().writeAndFlush(buf);
} else {
if (heartBeatData instanceof String) {
// Log.d(TAG, "userEventTriggered: String");
ctx.channel().writeAndFlush(heartBeatData + packetSeparator);
} else if (heartBeatData instanceof byte[]) {
// Log.d(TAG, "userEventTriggered: byte");
ByteBuf buf = Unpooled.copiedBuffer((byte[]) heartBeatData);
ctx.channel().writeAndFlush(buf);
} else {
Log.e(TAG, "userEventTriggered: heartBeatData type error");
}
Log.e(TAG, "userEventTriggered: heartBeatData type error");
}
} else {
Log.e(TAG, "不发送心跳");
@@ -107,9 +104,18 @@ public class NettyClientHandler extends SimpleChannelInboundHandler<String> {
* @param msg 消息
*/
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) {
Log.e(TAG, "channelRead0:"+msg);
listener.onMessageResponseClient(msg, index);
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object msg) {
Log.e(TAG, "channelRead0:" + msg);
try {
ByteBuf buf = (ByteBuf) msg;
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
String result = new String(bytes, "utf-8");
Log.e(TAG, "Server: " + result);
listener.onMessageResponseClient(bytes, index);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
/**

View File

@@ -10,9 +10,6 @@ public class NSDServer {
private NsdManager mNsdManager;
private NsdManager.RegistrationListener mRegistrationListener;
private String mServerName;
private Context mContext;
private int mPort;
private String mServiceName;
private final String mServerType = "_http._tcp."; // 服务器type要客户端扫描服务器的一致
public NSDServer() {
@@ -38,7 +35,7 @@ public class NSDServer {
public void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode) {
Log.i(TAG, "onUnregistrationFailed serviceInfo: " + serviceInfo + " ,errorCode:" + errorCode);
if (registerState != null) {
registerState.onUnregistrationFailed(serviceInfo, errorCode);
registerState.onUnRegistrationFailed(serviceInfo, errorCode);
}
}
@@ -84,8 +81,7 @@ public class NSDServer {
void onServiceUnregistered(NsdServiceInfo serviceInfo); //取消NSD注册成功
void onUnregistrationFailed(NsdServiceInfo serviceInfo, int errorCode); //取消NSD注册失败
void onUnRegistrationFailed(NsdServiceInfo serviceInfo, int errorCode); //取消NSD注册失败
}
//NSD服务接口对象

View File

@@ -0,0 +1,65 @@
package com.mogo.telematic.server;
import android.app.Service;
import android.content.Intent;
import android.os.IBinder;
import com.mogo.telematic.server.netty.NettyServerListener;
import com.mogo.telematic.server.netty.NettyTcpServer;
import io.netty.channel.Channel;
public class NettyServerService extends Service {
@Override
public void onCreate() {
super.onCreate();
}
@Override
public int onStartCommand(Intent intent, int flags, int startId) {
// NettyTcpServer nettyTcpServer = NettyTcpServer.getInstance();
// if (!nettyTcpServer.isServerStart()) {
// nettyTcpServer.setListener(new NettyServerListener() {
// @Override
// public void onMessageResponseServer(Object msg, String ChannelId) {
//
// }
//
// @Override
// public void onStartServer() {
//
// }
//
// @Override
// public void onStopServer() {
//
// }
//
// @Override
// public void onChannelConnect(Channel channel) {
//
// }
//
// @Override
// public void onChannelDisConnect(Channel channel) {
//
// }
// });
// nettyTcpServer.start();
// } else {
// NettyTcpServer.getInstance().disconnect();
// }
return super.onStartCommand(intent, flags, startId);
}
@Override
public void onDestroy() {
super.onDestroy();
}
@Override
public IBinder onBind(Intent intent) {
return null;
}
}