add socketModule

This commit is contained in:
zhongchao
2021-01-22 10:47:20 +08:00
parent 0f7d60a77d
commit eb64ce17df
16 changed files with 460 additions and 1 deletions

View File

@@ -0,0 +1,42 @@
package com.mogo.cloud.socket;
import android.content.Context;
/**
* socket 长链
*/
public interface IMogoCloudSocketManager {
/**
* 初始化,各模块不用关心
*
* @param context 上下文
* @param appId 一般为包名,不参与通道的建立,一般用于发消息
*/
void init(Context context, String appId);
/**
* 注册消息监听
*
* @param msgType 消息类型
* @param listener 回调
*/
void registerOnMessageListener(int msgType, IMogoCloudSocketOnMessageListener listener);
/**
* 注销消息监听
*
* @param msgType 消息类型
* @param listener 回调
*/
void unregisterOnMessageListener(int msgType, IMogoCloudSocketOnMessageListener listener);
/**
* 发送消息
*
* @param body 消息体
* @param listener 回执监听
*/
void sendMsg(MsgBody body, IMogoCloudSocketMsgAckListener listener);
}

View File

@@ -0,0 +1,13 @@
package com.mogo.cloud.socket;
/**
* 消息回执监听
*/
public interface IMogoCloudSocketMsgAckListener {
/**
* 长连接消息回执
* msgId: 消息id
*/
void onAck(long msgId);
}

View File

@@ -0,0 +1,11 @@
package com.mogo.cloud.socket;
/**
* 消息回调
*/
public interface IMogoCloudSocketOnMessageListener<T> {
Class<T> target();
void onMsgReceived(T obj);
}

View File

@@ -0,0 +1,58 @@
package com.mogo.cloud.socket;
/**
* 描述
*/
public class MsgBody {
/**
* 消息类型
*/
private int mMsgType;
/**
* 是否回执
*/
private boolean mAck = false;
/**
* 消息ID
*/
private final long mMsgId = System.currentTimeMillis();
/**
* 消息内容
*/
private Object mContent;
public MsgBody msgType(int msgType) {
this.mMsgType = msgType;
return this;
}
public MsgBody ack(boolean ack) {
this.mAck = ack;
return this;
}
public MsgBody content(Object object) {
this.mContent = object;
return this;
}
public int getMsgType() {
return mMsgType;
}
public boolean isAck() {
return mAck;
}
public long getMsgId() {
return mMsgId;
}
public Object getContent() {
return mContent;
}
}

View File

@@ -0,0 +1,209 @@
package com.mogo.cloud.socket;
import android.content.Context;
import androidx.annotation.NonNull;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.mogo.cloud.passport.MoGoAiCloudClient;
import com.mogo.cloud.passport.MoGoAiCloudClientConfig;
import com.mogo.utils.logger.Logger;
import com.mogo.utils.network.utils.GsonUtil;
import com.zhidao.locupload.Platform;
import com.zhidao.ptech.connsvr.protocol.MogoConnsvr;
import com.zhidao.socket.Callback;
import com.zhidao.socket.CallbackManager;
import com.zhidao.socket.Environment;
import com.zhidao.socket.SocketClient;
import com.zhidao.socket.SocketConfig;
import com.zhidao.socket.utils.RequestUtil;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SocketManager implements IMogoCloudSocketManager, Callback {
private static final String TAG = "SocketManager";
private static volatile SocketManager mInstance;
private MoGoAiCloudClientConfig cloudClientConfig;
private String mAppId;
private SocketManager() {
CallbackManager.getInstance().register(this);
cloudClientConfig = MoGoAiCloudClient.getInstance().getAiCloudClientConfig();
}
public static SocketManager getInstance() {
if (mInstance == null) {
synchronized (SocketManager.class) {
if (mInstance == null) {
mInstance = new SocketManager();
}
}
}
return mInstance;
}
/**
* 管理消息分发
* <p>
* key - msgType
*/
private Map<Integer, List<IMogoCloudSocketOnMessageListener>> mListeners = new ConcurrentHashMap<>();
/**
* 管理消息回执
* <p>
* key - msgId
*/
private Map<Long, IMogoCloudSocketMsgAckListener> mAckListeners = new ConcurrentHashMap<>();
public static final int MAX_CAP = 64; //保证充足的容量应对非常延时的推送
private final ArrayList<Long> mReceivedMsgId = new ArrayList<>(MAX_CAP);
private int mCurrentIndex = 0;
@Override
public void init(Context context, String appId) {
mAppId = appId;
SocketConfig.instance()
.setAppContext(context.getApplicationContext())
.setEnvironment(getEnvironment())
.setClient(Platform.getClient(Platform.car))
.setChannelId(SocketServicesConstants.SOCKET_CHANNEL_ID)
.setOpenAnalytics(true)
.setSn(cloudClientConfig.getSn())
.setDebug(cloudClientConfig.isShowDebugLog());
SocketClient.getInstance().start(context);
}
@Override
public void registerOnMessageListener(int msgType, IMogoCloudSocketOnMessageListener listener) {
if (mListeners.containsKey(msgType)) {
Logger.w(TAG, "msgType %d is exist.", msgType);
}
if (!mListeners.containsKey(msgType)) {
mListeners.put(msgType, new ArrayList<>());
}
mListeners.get(msgType).add(listener);
}
@Override
public void unregisterOnMessageListener(int msgType, IMogoCloudSocketOnMessageListener listener) {
if (listener == null) {
return;
}
if (!mListeners.containsKey(msgType)) {
return;
}
List<IMogoCloudSocketOnMessageListener> listeners = mListeners.get(msgType);
if (listeners != null) {
listeners.remove(listener);
}
}
@Override
public void sendMsg(MsgBody body, IMogoCloudSocketMsgAckListener listener) {
Logger.d(TAG, "sendMsg.");
final byte[] pb = convertToPBBytes(body.getMsgType(), objectToBytes(body.getContent()));
RequestUtil.sendPayloadData(mAppId, 2, pb, 1, true, System.currentTimeMillis());
}
@Override
public void update(@NonNull CallbackManager manager, @NonNull byte[] message, String appId, long msgId) {
try {
MogoConnsvr.Payload payload = MogoConnsvr.Payload.parseFrom(message);
int msgType = payload.getMsgType();
Logger.d(TAG, "received msg type = %d", msgType);
List<IMogoCloudSocketOnMessageListener> listeners = mListeners.get(msgType);
if (listeners != null && !listeners.isEmpty()) {
Iterator<IMogoCloudSocketOnMessageListener> iterator = listeners.iterator();
if (msgId != 0) { //兼容老版本
if (mReceivedMsgId.contains(msgId)) { // 避免消息重发
return;
}
cacheLastReceivedMsgId(msgId);
}
Object object = null;
while (iterator.hasNext()) {
IMogoCloudSocketOnMessageListener listener = iterator.next();
if (object == null) {
object = GsonUtil.objectFromJson(payload.getPayload().toStringUtf8(), listener.target());
}
if (listener != null) {
Logger.d(TAG, "received msgId = %s, content = %s", msgId, payload.getPayload().toStringUtf8());
listener.onMsgReceived(object);
}
}
}
} catch (InvalidProtocolBufferException e) {
Logger.e(TAG, e, "parse msg error.");
}
}
private void cacheLastReceivedMsgId(long msgId) {
if (msgId == 0) {
return;
}
synchronized (this) {
mReceivedMsgId.add(mCurrentIndex % MAX_CAP, msgId);
mCurrentIndex++;
}
}
@Override
public void onAck(@NonNull CallbackManager manager, @NonNull byte[] headerBytes, byte[] content) {
try {
MogoConnsvr.Header header = MogoConnsvr.Header.parseFrom(headerBytes);
int msgType = header.getMsgType();
String appId = header.getAppId();
int productLine = header.getProductLine();
long msgId = header.getMsgId();
IMogoCloudSocketMsgAckListener listener = mAckListeners.remove(msgId);
if (listener != null) {
listener.onAck(msgId);
}
Logger.d(TAG, "send message success: msgType = %d, appId = %s, productLine = %d", msgType, appId, productLine);
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
private Environment getEnvironment() {
switch (cloudClientConfig.getNetMode()) {
case 1:
return Environment.dev;
case 2:
case 4:
return Environment.qa;
case 3:
default:
return Environment.release;
}
}
public byte[] objectToBytes(Object obj) {
String jsonStr = GsonUtil.jsonFromObject(obj);
return jsonStr.getBytes();
}
private byte[] convertToPBBytes(int msgType, byte[] payloadBytes) {
MogoConnsvr.Payload payloadData = MogoConnsvr.Payload.newBuilder()
.setMsgType(msgType)
.setPayload(ByteString.copyFrom(payloadBytes)).build();
return payloadData.toByteArray();
}
public synchronized void release() {
mListeners.clear();
mListeners = null;
cloudClientConfig = null;
mInstance = null;
}
}

View File

@@ -0,0 +1,12 @@
package com.mogo.cloud.socket;
import androidx.annotation.Keep;
public class SocketServicesConstants {
/**
* 建立长链的通道ID
*/
@Keep
public static final String SOCKET_CHANNEL_ID = "dataCrawler";
}