[Feat]新增QUIC协议网络请求时延回调

This commit is contained in:
chenfufeng
2023-12-07 19:57:48 +08:00
parent 2c11961194
commit 202d849ab4
5 changed files with 349 additions and 242 deletions

View File

@@ -0,0 +1,9 @@
package com.mogo.cloud.network.cronet;
public interface NetworkQualityListener {
void onLatencyObservation(int rttMs, String url);
void onFailed(String error, String url);
void onCanceled(String url);
void onRttObservation(int rttMs, long whenMs, int source);
void onThroughputObservation(int throughputKbps, long whenMs, int source);
}

View File

@@ -20,10 +20,13 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import android.util.Log;
import androidx.annotation.Nullable;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.net.ProtocolException;
import java.nio.ByteBuffer;
@@ -33,9 +36,11 @@ import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import okio.Buffer;
import okio.Source;
import okio.Timeout;
import org.chromium.net.CronetException;
import org.chromium.net.UrlRequest;
import org.chromium.net.UrlResponseInfo;
@@ -54,260 +59,311 @@ import org.chromium.net.UrlResponseInfo;
* scheduling, especially when handling cancellations.
*/
class OkHttpBridgeRequestCallback extends UrlRequest.Callback {
/**
* The byte buffer capacity for reading Cronet response bodies. Each response callback will
* allocate its own buffer of this size once the response starts being processed.
*/
private static final int CRONET_BYTE_BUFFER_CAPACITY = 32 * 1024;
/**
* The byte buffer capacity for reading Cronet response bodies. Each response callback will
* allocate its own buffer of this size once the response starts being processed.
*/
private static final int CRONET_BYTE_BUFFER_CAPACITY = 32 * 1024;
/**
* A bridge between Cronet's asynchronous callbacks and OkHttp's blocking stream-like reads.
*/
private final SettableFuture<Source> bodySourceFuture = SettableFuture.create();
/** A bridge between Cronet's asynchronous callbacks and OkHttp's blocking stream-like reads. */
private final SettableFuture<Source> bodySourceFuture = SettableFuture.create();
/**
* Signal whether the request is finished and the response has been fully read.
*/
private final AtomicBoolean finished = new AtomicBoolean(false);
/** Signal whether the request is finished and the response has been fully read. */
private final AtomicBoolean finished = new AtomicBoolean(false);
/**
* Signal whether the request was canceled.
*/
private final AtomicBoolean canceled = new AtomicBoolean(false);
/** Signal whether the request was canceled. */
private final AtomicBoolean canceled = new AtomicBoolean(false);
/**
* An internal, blocking, thread safe way of passing data between the callback methods and {@link
* #bodySourceFuture}.
*
* <p>Has a capacity of 2 - at most one slot for a read result and at most 1 slot for cancellation
* signal, this guarantees that all inserts are non blocking.
*/
private final BlockingQueue<CallbackResult> callbackResults = new ArrayBlockingQueue<>(2);
/**
* An internal, blocking, thread safe way of passing data between the callback methods and {@link
* #bodySourceFuture}.
*
* <p>Has a capacity of 2 - at most one slot for a read result and at most 1 slot for cancellation
* signal, this guarantees that all inserts are non blocking.
*/
private final BlockingQueue<CallbackResult> callbackResults = new ArrayBlockingQueue<>(2);
/**
* The response headers.
*/
private final SettableFuture<UrlResponseInfo> headersFuture = SettableFuture.create();
/** The response headers. */
private final SettableFuture<UrlResponseInfo> headersFuture = SettableFuture.create();
/**
* The read timeout as specified by OkHttp. *
*/
private final long readTimeoutMillis;
/** The read timeout as specified by OkHttp. * */
private final long readTimeoutMillis;
/**
* The previous responses as reported to {@link #onRedirectReceived}, from oldest to newest. *
*/
private final List<UrlResponseInfo> urlResponseInfoChain = new ArrayList<>();
/** The previous responses as reported to {@link #onRedirectReceived}, from oldest to newest. * */
private final List<UrlResponseInfo> urlResponseInfoChain = new ArrayList<>();
private final RedirectStrategy redirectStrategy;
private final RedirectStrategy redirectStrategy;
/**
* The request being processed. Set when the request is first seen by the callback.
*/
private volatile UrlRequest request;
/** The request being processed. Set when the request is first seen by the callback. */
private volatile UrlRequest request;
public long start;
OkHttpBridgeRequestCallback(long readTimeoutMillis, RedirectStrategy redirectStrategy) {
checkArgument(readTimeoutMillis >= 0);
private long stop;
// So that we don't have to special case infinity. Int.MAX_VALUE is ~infinity for all practical
// use cases.
if (readTimeoutMillis == 0) {
this.readTimeoutMillis = Integer.MAX_VALUE;
} else {
this.readTimeoutMillis = readTimeoutMillis;
}
this.redirectStrategy = redirectStrategy;
}
private NetworkQualityListener listener;
/** Returns the {@link UrlResponseInfo} for the request associated with this callback. */
ListenableFuture<UrlResponseInfo> getUrlResponseInfo() {
return headersFuture;
}
OkHttpBridgeRequestCallback(long readTimeoutMillis, RedirectStrategy redirectStrategy) {
checkArgument(readTimeoutMillis >= 0);
/**
* Returns the OkHttp {@link Source} for the request associated with this callback.
*
* <p>Note that retrieving data from the {@code Source} instance might block further as the
* response body is streamed.
*/
ListenableFuture<Source> getBodySource() {
return bodySourceFuture;
}
List<UrlResponseInfo> getUrlResponseInfoChain() {
return Collections.unmodifiableList(urlResponseInfoChain);
}
@Override
public void onRedirectReceived(
UrlRequest urlRequest, UrlResponseInfo urlResponseInfo, String nextUrl) {
// We shouldn't follow redirects - pass the given UrlResponseInfo as the ultimate result
if (!redirectStrategy.followRedirects()) {
checkState(headersFuture.set(urlResponseInfo));
// Note: This might not match the content length headers but we have no way of accessing
// the actual body with current Cronet's APIs (see RedirectStrategy).
checkState(bodySourceFuture.set(new Buffer()));
urlRequest.cancel();
return;
// So that we don't have to special case infinity. Int.MAX_VALUE is ~infinity for all practical
// use cases.
if (readTimeoutMillis == 0) {
this.readTimeoutMillis = Integer.MAX_VALUE;
} else {
this.readTimeoutMillis = readTimeoutMillis;
}
this.redirectStrategy = redirectStrategy;
}
// We should follow redirects and we haven't hit the cap yet
urlResponseInfoChain.add(urlResponseInfo);
if (urlResponseInfo.getUrlChain().size() <= redirectStrategy.numberOfRedirectsToFollow()) {
urlRequest.followRedirect();
return;
void setNetworkListener(NetworkQualityListener listener) {
this.listener = listener;
}
// Cap reached - cancel the request and fail. Exception crafted to match OkHttp.
urlRequest.cancel();
IOException e =
new ProtocolException(
"Too many follow-up requests: " + (redirectStrategy.numberOfRedirectsToFollow() + 1));
headersFuture.setException(e);
bodySourceFuture.setException(e);
}
@Override
public void onResponseStarted(UrlRequest urlRequest, UrlResponseInfo urlResponseInfo) {
request = urlRequest;
checkState(headersFuture.set(urlResponseInfo));
checkState(bodySourceFuture.set(new CronetBodySource()));
}
@Override
public void onReadCompleted(
UrlRequest urlRequest, UrlResponseInfo urlResponseInfo, ByteBuffer byteBuffer) {
callbackResults.add(new CallbackResult(CallbackStep.ON_READ_COMPLETED, byteBuffer, null));
}
@Override
public void onSucceeded(UrlRequest urlRequest, UrlResponseInfo urlResponseInfo) {
callbackResults.add(new CallbackResult(CallbackStep.ON_SUCCESS, null, null));
}
@Override
public void onFailed(UrlRequest urlRequest, UrlResponseInfo urlResponseInfo, CronetException e) {
// If this was called before we start reading the body, the exception will
// propagate in the future providing headers and the body wrapper.
if (headersFuture.setException(e) && bodySourceFuture.setException(e)) {
return;
/**
* Returns the {@link UrlResponseInfo} for the request associated with this callback.
*/
ListenableFuture<UrlResponseInfo> getUrlResponseInfo() {
return headersFuture;
}
// If this was called as a reaction to a read() call, the read result will propagate
// the exception.
callbackResults.add(new CallbackResult(CallbackStep.ON_FAILED, null, e));
}
/**
* Returns the OkHttp {@link Source} for the request associated with this callback.
*
* <p>Note that retrieving data from the {@code Source} instance might block further as the
* response body is streamed.
*/
ListenableFuture<Source> getBodySource() {
return bodySourceFuture;
}
@Override
public void onCanceled(UrlRequest urlRequest, UrlResponseInfo responseInfo) {
canceled.set(true);
callbackResults.add(new CallbackResult(CallbackStep.ON_CANCELED, null, null));
// If there's nobody listening it's possible that the cancellation happened before we even
// received anything from the server. In that case inform the thread that's awaiting server
// response about the cancellation as well. This becomes a no-op if the futures
// were already set.
IOException e = new IOException("The request was canceled!");
headersFuture.setException(e);
bodySourceFuture.setException(e);
}
private class CronetBodySource implements Source {
private ByteBuffer buffer = ByteBuffer.allocateDirect(CRONET_BYTE_BUFFER_CAPACITY);
/** Whether the close() method has been called. */
private volatile boolean closed = false;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
if (canceled.get()) {
throw new IOException("The request was canceled!");
}
// Using IAE instead of NPE (checkNotNull) for okio.RealBufferedSource consistency
checkArgument(sink != null, "sink == null");
checkArgument(byteCount >= 0, "byteCount < 0: %s", byteCount);
checkState(!closed, "closed");
if (finished.get()) {
return -1;
}
if (byteCount < buffer.limit()) {
buffer.limit((int) byteCount);
}
request.read(buffer);
CallbackResult result;
try {
result = callbackResults.poll(readTimeoutMillis, MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result = null;
}
if (result == null) {
// Either readResult.poll() was interrupted or it timed out.
request.cancel();
throw new CronetTimeoutException();
}
switch (result.callbackStep) {
// We null the buffer in final statuses to allow fast GC of the buffer even if the callback
// is still in use.
case ON_FAILED:
finished.set(true);
buffer = null;
throw new IOException(result.exception);
case ON_SUCCESS:
finished.set(true);
buffer = null;
return -1;
case ON_CANCELED:
// The canceled flag is already set by the onCanceled method
// so not setting it here.
buffer = null;
throw new IOException("The request was canceled!");
case ON_READ_COMPLETED:
result.buffer.flip();
int bytesWritten = sink.write(result.buffer);
result.buffer.clear();
return bytesWritten;
}
throw new AssertionError("The switch block above is exhaustive!");
List<UrlResponseInfo> getUrlResponseInfoChain() {
return Collections.unmodifiableList(urlResponseInfoChain);
}
@Override
public Timeout timeout() {
// TODO(danstahr): This should likely respect the OkHttp timeout somehow
return Timeout.NONE;
public void onRedirectReceived(
UrlRequest urlRequest, UrlResponseInfo urlResponseInfo, String nextUrl) {
// We shouldn't follow redirects - pass the given UrlResponseInfo as the ultimate result
if (!redirectStrategy.followRedirects()) {
checkState(headersFuture.set(urlResponseInfo));
// Note: This might not match the content length headers but we have no way of accessing
// the actual body with current Cronet's APIs (see RedirectStrategy).
checkState(bodySourceFuture.set(new Buffer()));
urlRequest.cancel();
return;
}
// We should follow redirects and we haven't hit the cap yet
urlResponseInfoChain.add(urlResponseInfo);
if (urlResponseInfo.getUrlChain().size() <= redirectStrategy.numberOfRedirectsToFollow()) {
urlRequest.followRedirect();
return;
}
// Cap reached - cancel the request and fail. Exception crafted to match OkHttp.
urlRequest.cancel();
IOException e =
new ProtocolException(
"Too many follow-up requests: " + (redirectStrategy.numberOfRedirectsToFollow() + 1));
headersFuture.setException(e);
bodySourceFuture.setException(e);
}
@Override
public void close() {
if (closed) {
return;
}
closed = true;
if (!finished.get()) {
request.cancel();
}
public void onResponseStarted(UrlRequest urlRequest, UrlResponseInfo urlResponseInfo) {
request = urlRequest;
checkState(headersFuture.set(urlResponseInfo));
checkState(bodySourceFuture.set(new CronetBodySource()));
}
}
private static class CallbackResult {
private final CallbackStep callbackStep;
@Nullable
private final ByteBuffer buffer;
@Nullable private final CronetException exception;
private CallbackResult(
CallbackStep callbackStep,
@Nullable ByteBuffer buffer,
@Nullable CronetException exception) {
this.callbackStep = callbackStep;
this.buffer = buffer;
this.exception = exception;
@Override
public void onReadCompleted(
UrlRequest urlRequest, UrlResponseInfo urlResponseInfo, ByteBuffer byteBuffer) {
callbackResults.add(new CallbackResult(CallbackStep.ON_READ_COMPLETED, byteBuffer, null));
}
}
private enum CallbackStep {
ON_READ_COMPLETED,
ON_SUCCESS,
ON_FAILED,
ON_CANCELED
}
@Override
public void onSucceeded(UrlRequest urlRequest, UrlResponseInfo urlResponseInfo) {
stop = System.nanoTime();
if (QuicConfig.INSTANCE.getDebug()) {
Log.i("CronetNetwork", "****** Cronet Request Completed, the latency is " + ((stop - start)/1000000) + "ms"
+ ";status code is " + urlResponseInfo.getHttpStatusCode() + ";total received bytes is "
+ urlResponseInfo.getReceivedByteCount() + ";url is " + urlResponseInfo.getUrl());
}
if (listener != null) {
listener.onLatencyObservation((int) ((stop - start)/1000000), urlResponseInfo.getUrl());
}
callbackResults.add(new CallbackResult(CallbackStep.ON_SUCCESS, null, null));
}
@Override
public void onFailed(UrlRequest urlRequest, UrlResponseInfo urlResponseInfo, CronetException e) {
// If this was called before we start reading the body, the exception will
// propagate in the future providing headers and the body wrapper.
if (QuicConfig.INSTANCE.getDebug()) {
Log.i("CronetNetwork", "****** Cronet Request onFailed, error is " + e.getMessage()
+ ";url is " + urlResponseInfo.getUrl());
}
if (listener != null) {
listener.onFailed(e.getMessage(), urlResponseInfo.getUrl());
}
if (headersFuture.setException(e) && bodySourceFuture.setException(e)) {
return;
}
// If this was called as a reaction to a read() call, the read result will propagate
// the exception.
callbackResults.add(new CallbackResult(CallbackStep.ON_FAILED, null, e));
}
@Override
public void onCanceled(UrlRequest urlRequest, UrlResponseInfo responseInfo) {
if (QuicConfig.INSTANCE.getDebug()) {
Log.i("CronetNetwork", "****** Cronet Request onCanceled" + ";url is " + responseInfo.getUrl());
}
if (listener != null) {
listener.onCanceled(responseInfo.getUrl());
}
canceled.set(true);
callbackResults.add(new CallbackResult(CallbackStep.ON_CANCELED, null, null));
// If there's nobody listening it's possible that the cancellation happened before we even
// received anything from the server. In that case inform the thread that's awaiting server
// response about the cancellation as well. This becomes a no-op if the futures
// were already set.
IOException e = new IOException("The request was canceled!");
headersFuture.setException(e);
bodySourceFuture.setException(e);
}
private class CronetBodySource implements Source {
private ByteBuffer buffer = ByteBuffer.allocateDirect(CRONET_BYTE_BUFFER_CAPACITY);
/**
* Whether the close() method has been called.
*/
private volatile boolean closed = false;
@Override
public long read(Buffer sink, long byteCount) throws IOException {
if (canceled.get()) {
throw new IOException("The request was canceled!");
}
// Using IAE instead of NPE (checkNotNull) for okio.RealBufferedSource consistency
checkArgument(sink != null, "sink == null");
checkArgument(byteCount >= 0, "byteCount < 0: %s", byteCount);
checkState(!closed, "closed");
if (finished.get()) {
return -1;
}
if (byteCount < buffer.limit()) {
buffer.limit((int) byteCount);
}
request.read(buffer);
CallbackResult result;
try {
result = callbackResults.poll(readTimeoutMillis, MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
result = null;
}
if (result == null) {
// Either readResult.poll() was interrupted or it timed out.
request.cancel();
throw new CronetTimeoutException();
}
switch (result.callbackStep) {
// We null the buffer in final statuses to allow fast GC of the buffer even if the callback
// is still in use.
case ON_FAILED:
finished.set(true);
buffer = null;
throw new IOException(result.exception);
case ON_SUCCESS:
finished.set(true);
buffer = null;
return -1;
case ON_CANCELED:
// The canceled flag is already set by the onCanceled method
// so not setting it here.
buffer = null;
throw new IOException("The request was canceled!");
case ON_READ_COMPLETED:
result.buffer.flip();
int bytesWritten = sink.write(result.buffer);
result.buffer.clear();
return bytesWritten;
}
throw new AssertionError("The switch block above is exhaustive!");
}
@Override
public Timeout timeout() {
// TODO(danstahr): This should likely respect the OkHttp timeout somehow
return Timeout.NONE;
}
@Override
public void close() {
if (closed) {
return;
}
closed = true;
if (!finished.get()) {
request.cancel();
}
}
}
private static class CallbackResult {
private final CallbackStep callbackStep;
@Nullable
private final ByteBuffer buffer;
@Nullable
private final CronetException exception;
private CallbackResult(
CallbackStep callbackStep,
@Nullable ByteBuffer buffer,
@Nullable CronetException exception) {
this.callbackStep = callbackStep;
this.buffer = buffer;
this.exception = exception;
}
}
private enum CallbackStep {
ON_READ_COMPLETED,
ON_SUCCESS,
ON_FAILED,
ON_CANCELED
}
}

View File

@@ -2,24 +2,53 @@ package com.mogo.cloud.network.cronet
import android.annotation.SuppressLint
import android.content.Context
import android.util.Log
import org.chromium.net.CronetEngine
import org.chromium.net.ExperimentalCronetEngine
import org.chromium.net.NetworkQualityRttListener
import org.chromium.net.NetworkQualityThroughputListener
import java.util.concurrent.Executors
@SuppressLint("StaticFieldLeak")
object QuicConfig {
private var engine: CronetEngine? = null
private var engine: ExperimentalCronetEngine? = null
private var converter: RequestResponseConverter? = null
private var listener: NetworkQualityListener? = null
private val executor by lazy {
Executors.newFixedThreadPool(2)
}
@Volatile
private var enable: Boolean = false
private var isDebug: Boolean = false
fun init(context: Context) {
fun init(context: Context, isDebug: Boolean = false) {
this.isDebug = isDebug
if (engine == null) {
engine = CronetEngine.Builder(context.applicationContext)
engine = ExperimentalCronetEngine.Builder(context.applicationContext)
.enableQuic(true)
.enableHttp2(true)
.enableNetworkQualityEstimator(true)
.setUserAgent("OkHttp With Cronet")
.build()
engine?.addRttListener(object :
NetworkQualityRttListener(executor) {
// source请看注解NetworkQualityObservationSource,HTTP为0 TCP为1 QUIC为2
override fun onRttObservation(rttMs: Int, whenMs: Long, source: Int) {
if (getDebug()) {
Log.d("CronetNetwork", "rttMs is:$rttMs ms,source is:$source")
}
getNetworkListener()?.onRttObservation(rttMs, whenMs, source)
}
})
engine?.addThroughputListener(object : NetworkQualityThroughputListener(executor) {
override fun onThroughputObservation(throughputKbps: Int, whenMs: Long, source: Int) {
if (getDebug()) {
Log.d("CronetNetwork", "downstream throughput is:$throughputKbps,source is:$source")
}
getNetworkListener()?.onThroughputObservation(throughputKbps, whenMs, source)
}
})
}
if (converter == null) {
converter = RequestResponseConverter(
@@ -37,10 +66,10 @@ object QuicConfig {
return enable
}
fun setEnable(context: Context, enable: Boolean) {
fun setEnable(context: Context, enable: Boolean, isDebug: Boolean = false) {
when {
enable -> {
init(context)
init(context, isDebug)
this.enable = true
}
else -> {
@@ -56,4 +85,16 @@ object QuicConfig {
fun getConverter(): RequestResponseConverter? {
return converter
}
fun getDebug(): Boolean {
return isDebug
}
fun setNetworkListener(listener: NetworkQualityListener) {
this.listener = listener
}
fun getNetworkListener(): NetworkQualityListener? {
return listener
}
}

View File

@@ -75,6 +75,7 @@ public final class RequestResponseConverter {
OkHttpBridgeRequestCallback callback =
new OkHttpBridgeRequestCallback(readTimeoutMillis, redirectStrategy);
callback.setNetworkListener(QuicConfig.INSTANCE.getNetworkListener());
// The OkHttp request callback methods are lightweight, the heavy lifting is done by OkHttp /
// app owned threads. Use a direct executor to avoid extra thread hops.
@@ -111,7 +112,7 @@ public final class RequestResponseConverter {
uploadDataProviderExecutor);
}
}
callback.start = System.nanoTime();
return new CronetRequestAndOkHttpResponse(
builder.build(), createResponseSupplier(okHttpRequest, callback));
}

View File

@@ -36,24 +36,24 @@ PASSWORD=xintai2018
RELEASE=true
# AI CLOUD 云平台
# 工具类
MOGO_UTILS_VERSION=1.4.7.19
MOGO_UTILS_VERSION=1.4.7.20
# 网络请求
MOGO_NETWORK_VERSION=1.4.7.19
MOGO_NETWORK_VERSION=1.4.7.20
# 鉴权
MOGO_PASSPORT_VERSION=1.4.7.19
MOGO_PASSPORT_VERSION=1.4.7.20
# 常链接
MOGO_SOCKET_VERSION=1.4.7.19
MOGO_SOCKET_VERSION=1.4.7.20
# 数据采集
MOGO_REALTIME_VERSION=1.4.7.19
MOGO_REALTIME_VERSION=1.4.7.20
# 探路,道路事件发布,获取
MOGO_TANLU_VERSION=1.4.7.19
MOGO_TANLU_VERSION=1.4.7.20
# 直播推流
MOGO_LIVE_VERSION=1.4.7.19
MOGO_LIVE_VERSION=1.4.7.20
# 直播拉流
MOGO_TRAFFICLIVE_VERSION=1.4.7.19
MOGO_TRAFFICLIVE_VERSION=1.4.7.20
# 定位服务
MOGO_LOCATION_VERSION=1.4.7.19
MOGO_LOCATION_VERSION=1.4.7.20
# 远程通讯模块
MOGO_TELEMATIC_VERSION=1.4.7.19
MOGO_TELEMATIC_VERSION=1.4.7.20
# v2x
MOGO_V2X_VERSION=1.4.7.19
MOGO_V2X_VERSION=1.4.7.20