target();
+
+ void onMsgReceived(T obj);
+}
diff --git a/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/MsgBody.java b/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/MsgBody.java
new file mode 100644
index 0000000..d23d032
--- /dev/null
+++ b/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/MsgBody.java
@@ -0,0 +1,58 @@
+package com.mogo.cloud.socket;
+
+/**
+ * 描述
+ */
+public class MsgBody {
+
+ /**
+ * 消息类型
+ */
+ private int mMsgType;
+
+ /**
+ * 是否回执
+ */
+ private boolean mAck = false;
+
+ /**
+ * 消息ID
+ */
+ private final long mMsgId = System.currentTimeMillis();
+
+ /**
+ * 消息内容
+ */
+ private Object mContent;
+
+ public MsgBody msgType(int msgType) {
+ this.mMsgType = msgType;
+ return this;
+ }
+
+ public MsgBody ack(boolean ack) {
+ this.mAck = ack;
+ return this;
+ }
+
+ public MsgBody content(Object object) {
+ this.mContent = object;
+ return this;
+ }
+
+ public int getMsgType() {
+ return mMsgType;
+ }
+
+ public boolean isAck() {
+ return mAck;
+ }
+
+ public long getMsgId() {
+ return mMsgId;
+ }
+
+ public Object getContent() {
+ return mContent;
+ }
+}
diff --git a/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/SocketManager.java b/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/SocketManager.java
new file mode 100644
index 0000000..a854ce2
--- /dev/null
+++ b/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/SocketManager.java
@@ -0,0 +1,209 @@
+package com.mogo.cloud.socket;
+
+
+import android.content.Context;
+
+import androidx.annotation.NonNull;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.mogo.cloud.passport.MoGoAiCloudClient;
+import com.mogo.cloud.passport.MoGoAiCloudClientConfig;
+import com.mogo.utils.logger.Logger;
+import com.mogo.utils.network.utils.GsonUtil;
+import com.zhidao.locupload.Platform;
+import com.zhidao.ptech.connsvr.protocol.MogoConnsvr;
+import com.zhidao.socket.Callback;
+import com.zhidao.socket.CallbackManager;
+import com.zhidao.socket.Environment;
+import com.zhidao.socket.SocketClient;
+import com.zhidao.socket.SocketConfig;
+import com.zhidao.socket.utils.RequestUtil;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SocketManager implements IMogoCloudSocketManager, Callback {
+
+ private static final String TAG = "SocketManager";
+ private static volatile SocketManager mInstance;
+ private MoGoAiCloudClientConfig cloudClientConfig;
+ private String mAppId;
+
+ private SocketManager() {
+ CallbackManager.getInstance().register(this);
+ cloudClientConfig = MoGoAiCloudClient.getInstance().getAiCloudClientConfig();
+ }
+
+ public static SocketManager getInstance() {
+ if (mInstance == null) {
+ synchronized (SocketManager.class) {
+ if (mInstance == null) {
+ mInstance = new SocketManager();
+ }
+ }
+ }
+ return mInstance;
+ }
+
+ /**
+ * 管理消息分发
+ *
+ * key - msgType
+ */
+ private Map> mListeners = new ConcurrentHashMap<>();
+
+ /**
+ * 管理消息回执
+ *
+ * key - msgId
+ */
+ private Map mAckListeners = new ConcurrentHashMap<>();
+
+
+ public static final int MAX_CAP = 64; //保证充足的容量应对非常延时的推送
+ private final ArrayList mReceivedMsgId = new ArrayList<>(MAX_CAP);
+ private int mCurrentIndex = 0;
+
+ @Override
+ public void init(Context context, String appId) {
+ mAppId = appId;
+ SocketConfig.instance()
+ .setAppContext(context.getApplicationContext())
+ .setEnvironment(getEnvironment())
+ .setClient(Platform.getClient(Platform.car))
+ .setChannelId(SocketServicesConstants.SOCKET_CHANNEL_ID)
+ .setOpenAnalytics(true)
+ .setSn(cloudClientConfig.getSn())
+ .setDebug(cloudClientConfig.isShowDebugLog());
+ SocketClient.getInstance().start(context);
+ }
+
+ @Override
+ public void registerOnMessageListener(int msgType, IMogoCloudSocketOnMessageListener listener) {
+ if (mListeners.containsKey(msgType)) {
+ Logger.w(TAG, "msgType %d is exist.", msgType);
+ }
+ if (!mListeners.containsKey(msgType)) {
+ mListeners.put(msgType, new ArrayList<>());
+ }
+ mListeners.get(msgType).add(listener);
+ }
+
+ @Override
+ public void unregisterOnMessageListener(int msgType, IMogoCloudSocketOnMessageListener listener) {
+ if (listener == null) {
+ return;
+ }
+ if (!mListeners.containsKey(msgType)) {
+ return;
+ }
+ List listeners = mListeners.get(msgType);
+ if (listeners != null) {
+ listeners.remove(listener);
+ }
+ }
+
+ @Override
+ public void sendMsg(MsgBody body, IMogoCloudSocketMsgAckListener listener) {
+ Logger.d(TAG, "sendMsg.");
+ final byte[] pb = convertToPBBytes(body.getMsgType(), objectToBytes(body.getContent()));
+ RequestUtil.sendPayloadData(mAppId, 2, pb, 1, true, System.currentTimeMillis());
+ }
+
+ @Override
+ public void update(@NonNull CallbackManager manager, @NonNull byte[] message, String appId, long msgId) {
+ try {
+ MogoConnsvr.Payload payload = MogoConnsvr.Payload.parseFrom(message);
+ int msgType = payload.getMsgType();
+ Logger.d(TAG, "received msg type = %d", msgType);
+ List listeners = mListeners.get(msgType);
+ if (listeners != null && !listeners.isEmpty()) {
+ Iterator iterator = listeners.iterator();
+ if (msgId != 0) { //兼容老版本
+ if (mReceivedMsgId.contains(msgId)) { // 避免消息重发
+ return;
+ }
+ cacheLastReceivedMsgId(msgId);
+ }
+ Object object = null;
+ while (iterator.hasNext()) {
+ IMogoCloudSocketOnMessageListener listener = iterator.next();
+ if (object == null) {
+ object = GsonUtil.objectFromJson(payload.getPayload().toStringUtf8(), listener.target());
+ }
+ if (listener != null) {
+ Logger.d(TAG, "received msgId = %s, content = %s", msgId, payload.getPayload().toStringUtf8());
+ listener.onMsgReceived(object);
+ }
+ }
+ }
+ } catch (InvalidProtocolBufferException e) {
+ Logger.e(TAG, e, "parse msg error.");
+ }
+ }
+
+ private void cacheLastReceivedMsgId(long msgId) {
+ if (msgId == 0) {
+ return;
+ }
+ synchronized (this) {
+ mReceivedMsgId.add(mCurrentIndex % MAX_CAP, msgId);
+ mCurrentIndex++;
+ }
+ }
+
+ @Override
+ public void onAck(@NonNull CallbackManager manager, @NonNull byte[] headerBytes, byte[] content) {
+ try {
+ MogoConnsvr.Header header = MogoConnsvr.Header.parseFrom(headerBytes);
+ int msgType = header.getMsgType();
+ String appId = header.getAppId();
+ int productLine = header.getProductLine();
+ long msgId = header.getMsgId();
+ IMogoCloudSocketMsgAckListener listener = mAckListeners.remove(msgId);
+ if (listener != null) {
+ listener.onAck(msgId);
+ }
+ Logger.d(TAG, "send message success: msgType = %d, appId = %s, productLine = %d", msgType, appId, productLine);
+ } catch (InvalidProtocolBufferException e) {
+ e.printStackTrace();
+ }
+ }
+
+ private Environment getEnvironment() {
+ switch (cloudClientConfig.getNetMode()) {
+ case 1:
+ return Environment.dev;
+ case 2:
+ case 4:
+ return Environment.qa;
+ case 3:
+ default:
+ return Environment.release;
+ }
+ }
+
+ public byte[] objectToBytes(Object obj) {
+ String jsonStr = GsonUtil.jsonFromObject(obj);
+ return jsonStr.getBytes();
+ }
+
+ private byte[] convertToPBBytes(int msgType, byte[] payloadBytes) {
+ MogoConnsvr.Payload payloadData = MogoConnsvr.Payload.newBuilder()
+ .setMsgType(msgType)
+ .setPayload(ByteString.copyFrom(payloadBytes)).build();
+ return payloadData.toByteArray();
+ }
+
+ public synchronized void release() {
+ mListeners.clear();
+ mListeners = null;
+ cloudClientConfig = null;
+ mInstance = null;
+ }
+
+}
diff --git a/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/SocketServicesConstants.java b/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/SocketServicesConstants.java
new file mode 100644
index 0000000..cfb4a91
--- /dev/null
+++ b/foudations/mogo-socket/src/main/java/com/mogo/cloud/socket/SocketServicesConstants.java
@@ -0,0 +1,12 @@
+package com.mogo.cloud.socket;
+
+import androidx.annotation.Keep;
+
+public class SocketServicesConstants {
+
+ /**
+ * 建立长链的通道ID
+ */
+ @Keep
+ public static final String SOCKET_CHANNEL_ID = "dataCrawler";
+}
diff --git a/foudations/mogo-socket/src/test/java/com/mogo/cloud/socket/ExampleUnitTest.java b/foudations/mogo-socket/src/test/java/com/mogo/cloud/socket/ExampleUnitTest.java
new file mode 100644
index 0000000..e889224
--- /dev/null
+++ b/foudations/mogo-socket/src/test/java/com/mogo/cloud/socket/ExampleUnitTest.java
@@ -0,0 +1,17 @@
+package com.mogo.cloud.socket;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Example local unit test, which will execute on the development machine (host).
+ *
+ * @see Testing documentation
+ */
+public class ExampleUnitTest {
+ @Test
+ public void addition_isCorrect() {
+ assertEquals(4, 2 + 2);
+ }
+}
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index d4cc957..5156a21 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,4 +1,4 @@
-include ':modules:mogo-realtime'
+include ':foudations:mogo-socket'
include ':modules:mogo-realtime'
include ':modules:realtime'
include ':modules:mogo-tanlu'