1.前言
在上一章节中,了解了android平台下的网络库,也简单分析了HttpURLConnection底层原理,这一章将继续讲解网络库Volley
2.目录
3.Volley
Volley是Google 2013 I/O大会发布的官方网络库,其实质是对HttpClient和HttpURLConnection的封装,适合于高并发数据量较小的场景,不适合大文件下载和上传
3.1Volley的简单使用
//1.创建请求队列
RequestQueue requestQueue = Volley.newRequestQueue(this);
//2.创建具体的请求
StringRequest stringRequest = new StringRequest("https://www.baidu.com", new Response.Listener<String>() {
@Override
public void onResponse(String response) {
Log.d(TAG, "response = " + response);
// main
Log.d(TAG, "currentThread = " + Thread.currentThread().getName());
}
}, new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
Log.e(TAG, error.getMessage(), error);
Log.d(TAG, "currentThread = " + Thread.currentThread().getName());
}
});
//3.将具体的请求添加到队列
requestQueue.add(stringRequest);
3.2.Volley的源码分析
首先查看请求队列RequestQueue的创建
public static RequestQueue newRequestQueue(Context context) {
return newRequestQueue(context, (BaseHttpStack) null);
}
public static RequestQueue newRequestQueue(Context context, BaseHttpStack stack) {
BasicNetwork network;
//默认情况下stack为null
if (stack == null) {
//android2.3及以后都使用HttpURLConnection, HurlStack里的操作都是使用HttpURLConnectionImpl实现的
if (Build.VERSION.SDK_INT >= 9) {
network = new BasicNetwork(new HurlStack());
} else {
.....
//android2.3之前使用HttpClient,AndroidHttpClient是HttpClient的实现类
network = new BasicNetwork(
new HttpClientStack(AndroidHttpClient.newInstance(userAgent)));
}
} else {
//否则使用自定义的stack
network = new BasicNetwork(stack);
}
return newRequestQueue(context, network);
}
private static RequestQueue newRequestQueue(Context context, Network network) {
......
//DiskBasedCache硬盘缓存
RequestQueue queue = new RequestQueue(new DiskBasedCache(cacheSupplier), network);
queue.start();
return queue;
}
public RequestQueue(Cache cache, Network network) {
//DEFAULT_NETWORK_THREAD_POOL_SIZE默认为4
this(cache, network, DEFAULT_NETWORK_THREAD_POOL_SIZE);
}
public RequestQueue(Cache cache, Network network, int threadPoolSize) {
this(
cache,
network,
threadPoolSize,
//线程执行器,传入Handler回调到主线程
new ExecutorDelivery(new Handler(Looper.getMainLooper()))
);
}
public RequestQueue(
Cache cache, Network network, int threadPoolSize, ResponseDelivery delivery) {
mCache = cache;
mNetwork = network;
//用于执行网络请求调度线程数组
mDispatchers = new NetworkDispatcher[threadPoolSize];
mDelivery = delivery;
}
上面代码主要是确认底层网络框架, RequestQueue队列及相关参数的初始化
接着查看RequestQueue
的start()
方法
public void start() {
//确保所有的调度都已经停止
stop();
// 创建缓存调度线程并启动
mCacheDispatcher = new CacheDispatcher(mCacheQueue, mNetworkQueue, mCache, mDelivery);
mCacheDispatcher.start();
// 创建网络调度线程并启动 默认为4个
for (int i = 0; i < mDispatchers.length; i++) {
NetworkDispatcher networkDispatcher =
new NetworkDispatcher(mNetworkQueue, mNetwork, mCache, mDelivery);
mDispatchers[i] = networkDispatcher;
networkDispatcher.start();
}
}
然后再查看步骤2,请求Request
的创建
public StringRequest(
//请求方式
int method,
//请求地址
String url,
//成功回调
Listener<String> listener,
//失败回调
@Nullable ErrorListener errorListener) {
super(method, url, errorListener);
mListener = listener;
}
//添加请求体参数
@Nullable
protected Map<String, String> getParams() throws AuthFailureError {
return null;
}
//添加请求头参数
public Map<String, String> getHeaders() throws AuthFailureError {
return Collections.emptyMap();
}
接着查看步骤3,将请求Request
添加到RequestQueue
public <T> Request<T> add(Request<T> request) {
//标记request属于这个queue
request.setRequestQueue(this);
synchronized (mCurrentRequests) {
//添加到set集合中
mCurrentRequests.add(request);
}
//设置序列号,按顺序处理请求(原子类(AtomicInteger)防止并发问题)
request.setSequence(getSequenceNumber());
//标记已被加入队列中
request.addMarker("add-to-queue");
sendRequestEvent(request, RequestEvent.REQUEST_QUEUED);
//判断是否能被缓存,否则直接加入网络队列
if (!request.shouldCache()) {
mNetworkQueue.add(request);
return request;
}
//能则加入缓存队列
mCacheQueue.add(request);
return request;
}
然后再查看CacheDispatcher
的实现
public CacheDispatcher(
//缓存请求队列
BlockingQueue<Request<?>> cacheQueue,
//网络请求队列
BlockingQueue<Request<?>> networkQueue,
//缓存处理
Cache cache,
//相应处理(默认回调到主线程)
ResponseDelivery delivery) {
mCacheQueue = cacheQueue;
mNetworkQueue = networkQueue;
mCache = cache;
mDelivery = delivery;
//等待状态请求管理器
mWaitingRequestManager = new WaitingRequestManager(this);
}
由于CacheDispatcher
实质是一个线程,再看看run()
方法
@Override
public void run() {
//设置线程为后台处理线程
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
//初始化缓存目录
mCache.initialize();
while (true) {
try {
processRequest();
} catch (InterruptedException e) {
// We may have been interrupted because it was time to quit.
if (mQuit) {
Thread.currentThread().interrupt();
return;
}
}
}
}
private void processRequest() throws InterruptedException {
//从阻塞队列取请求,未取到时会阻塞
final Request<?> request = mCacheQueue.take();
processRequest(request);
}
@VisibleForTesting
void processRequest(final Request<?> request) throws InterruptedException {
request.addMarker("cache-queue-take");
request.sendEvent(RequestQueue.RequestEvent.REQUEST_CACHE_LOOKUP_STARTED);
try {
//如果请求被取消,不再分发
if (request.isCanceled()) {
request.finish("cache-discard-canceled");
return;
}
// 通过url获取缓存key
Cache.Entry entry = mCache.get(request.getCacheKey());
if (entry == null) {
request.addMarker("cache-miss");
// 如果waitingRequest中没有这个request,将它加入网络队列
if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
mNetworkQueue.put(request);
}
return;
}
// 如果缓存过期了
if (entry.isExpired()) {
request.addMarker("cache-hit-expired");
//给request设置缓存
request.setCacheEntry(entry);
//缓存过期,也需判断waitingRequest中没有这个request,无则将它加入网络队列
if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
mNetworkQueue.put(request);
}
return;
}
request.addMarker("cache-hit");
// 缓存命中,解析响应数据
Response<?> response =
request.parseNetworkResponse(
new NetworkResponse(entry.data, entry.responseHeaders));
request.addMarker("cache-hit-parsed");
//如果响应不成功
if (!response.isSuccess()) {
request.addMarker("cache-parsing-failed");
//设置对应缓存key为无效的缓存
mCache.invalidate(request.getCacheKey(), true);
request.setCacheEntry(null);
//响应不成功,也需判断waitingRequest中没有这个request,无则将它加入网络队列
if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
mNetworkQueue.put(request);
}
return;
}
//判断缓存是否需要刷新
if (!entry.refreshNeeded()) {
//不需要缓存,直接分发响应
mDelivery.postResponse(request, response);
} else {
//设置request缓存
request.addMarker("cache-hit-refresh-needed");
request.setCacheEntry(entry);
// 标记为一个暂时的响应
response.intermediate = true;
//需刷新缓存,也要判断waitingRequest中没有这个request,无则将它加入网络队列
if (!mWaitingRequestManager.maybeAddToWaitingRequests(request)) {
//将暂时的响应返回给用户,并将请求放入网络请求队列
mDelivery.postResponse(
request,
response,
new Runnable() {
@Override
public void run() {
try {
mNetworkQueue.put(request);
} catch (InterruptedException e) {
// Restore the interrupted status
Thread.currentThread().interrupt();
}
}
});
} else {
//直接将响应返回给用户
mDelivery.postResponse(request, response);
}
}
} finally {
//缓存请求查找结束
request.sendEvent(RequestQueue.RequestEvent.REQUEST_CACHE_LOOKUP_FINISHED);
}
}
在CacheDispatcher
中,会循环从BlockingQueue
中取request
,取不到会阻塞,然后根据url从缓存中取response
,判断缓存及response状态,是否将请求加入网络请求队列,还是直接返回response
接着再查看NetworkDispatcher
的实现
public NetworkDispatcher(
BlockingQueue<Request<?>> queue,
Network network,
Cache cache,
ResponseDelivery delivery) {
//网络请求的阻塞队列
mQueue = queue;
//底层网络请求的实现
mNetwork = network;
//缓存
mCache = cache;
//响应分发器
mDelivery = delivery;
}
然后再看看NetworkDispatcher
的run()
方法
@Override
public void run() {
//同CacheDispatcher
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND);
while (true) {
try {
processRequest();
} catch (InterruptedException e) {
// We may have been interrupted because it was time to quit.
if (mQuit) {
Thread.currentThread().interrupt();
return;
}
}
}
}
private void processRequest() throws InterruptedException {
// 从阻塞队列去请求
Request<?> request = mQueue.take();
processRequest(request);
}
@VisibleForTesting
void processRequest(Request<?> request) {
//获取从开机到现在的时间
long startTimeMs = SystemClock.elapsedRealtime();
request.sendEvent(RequestQueue.RequestEvent.REQUEST_NETWORK_DISPATCH_STARTED);
try {
request.addMarker("network-queue-take");
// 请求被取消则结束请求
if (request.isCanceled()) {
request.finish("network-discard-cancelled");
//通知listener没有可用的响应
request.notifyListenerResponseNotUsable();
return;
}
addTrafficStatsTag(request);
// 执行网络请求
NetworkResponse networkResponse = mNetwork.performRequest(request);
request.addMarker("network-http-complete");
// 响应状态码304并且request已经被分发过,则停止分发
//304 有缓存再次请求时请求会包含if Modifiled Since,服务端根据修改时间对比是返回200还是304
if (networkResponse.notModified && request.hasHadResponseDelivered()) {
request.finish("not-modified");
request.notifyListenerResponseNotUsable();
return;
}
// 解析response
Response<?> response = request.parseNetworkResponse(networkResponse);
request.addMarker("network-parse-complete");
// 如果request是可缓存的,并且response的缓存不为null
if (request.shouldCache() && response.cacheEntry != null) {
mCache.put(request.getCacheKey(), response.cacheEntry);
request.addMarker("network-cache-written");
}
// 标记request分发过响应
request.markDelivered();
//分发数据
mDelivery.postResponse(request, response);
//通知listener已经有一个合法的响应被接收
request.notifyListenerResponseReceived(response);
} catch (VolleyError volleyError) {
volleyError.setNetworkTimeMs(SystemClock.elapsedRealtime() - startTimeMs);
parseAndDeliverNetworkError(request, volleyError);
request.notifyListenerResponseNotUsable();
} catch (Exception e) {
VolleyLog.e(e, "Unhandled exception %s", e.toString());
VolleyError volleyError = new VolleyError(e);
volleyError.setNetworkTimeMs(SystemClock.elapsedRealtime() - startTimeMs);
mDelivery.postError(request, volleyError);
request.notifyListenerResponseNotUsable();
} finally {
//网络请求查找结束
request.sendEvent(RequestQueue.RequestEvent.REQUEST_NETWORK_DISPATCH_FINISHED);
}
}
在NetworkDispatcher
中,会循环从BlockingQueue
中取request
,取不到会阻塞,然后获取networkResponse
,304则结束请求,否则判断request
是否允许缓存来缓存数据,最后分发response
数据
接着查看mNetwork.performRequest(request)
网络请求方法
@Override
public NetworkResponse performRequest(Request<?> request) throws VolleyError {
//记录开机到现在的时间 方便计算request所用时间
long requestStart = SystemClock.elapsedRealtime();
//循环执行 连接异常时会有重试机制 默认最大重试次数为1
while (true) {
HttpResponse httpResponse = null;
byte[] responseContents = null;
List<Header> responseHeaders = Collections.emptyList();
try {
// Gather headers.
Map<String, String> additionalRequestHeaders =
getCacheHeaders(request.getCacheEntry());
//获取服务器返回的httpResponse
httpResponse = mBaseHttpStack.executeRequest(request, additionalRequestHeaders);
int statusCode = httpResponse.getStatusCode();
//将头部解析成键值对的形式
responseHeaders = httpResponse.getHeaders();
// Handle cache validation.
//响应码为304
if (statusCode == HttpURLConnection.HTTP_NOT_MODIFIED) {
Entry entry = request.getCacheEntry();
if (entry == null) {
//缓存为空构造NetworkResponse返回
return new NetworkResponse(
HttpURLConnection.HTTP_NOT_MODIFIED,
/* data= */ null,
/* notModified= */ true,
SystemClock.elapsedRealtime() - requestStart,
responseHeaders);
}
// Combine cached and response headers so the response will be complete.
//用缓存的响应头和响应体构造NetworkResponse返回
List<Header> combinedHeaders = combineHeaders(responseHeaders, entry);
return new NetworkResponse(
HttpURLConnection.HTTP_NOT_MODIFIED,
entry.data,
/* notModified= */ true,
SystemClock.elapsedRealtime() - requestStart,
combinedHeaders);
}
// Some responses such as 204s do not have content. We must check.
InputStream inputStream = httpResponse.getContent();
//判断204(无内容),服务器成功,但没有任何返回
if (inputStream != null) {
responseContents =
inputStreamToBytes(inputStream, httpResponse.getContentLength());
} else {
// Add 0 byte response as a way of honestly representing a
// no-content request.
responseContents = new byte[0];
}
// if the request is slow, log it.
long requestLifetime = SystemClock.elapsedRealtime() - requestStart;
logSlowRequests(requestLifetime, request, responseContents, statusCode);
// 响应不成功抛出IOException
if (statusCode < 200 || statusCode > 299) {
throw new IOException();
}
//构造一个成功的NetworkResponse
return new NetworkResponse(
statusCode,
responseContents,
/* notModified= */ false,
SystemClock.elapsedRealtime() - requestStart,
responseHeaders);
} catch (SocketTimeoutException e) {
//连接超时异常 会继续重试执行
attemptRetryOnException("socket", request, new TimeoutError());
} catch (MalformedURLException e) {
//url地址错误
throw new RuntimeException("Bad URL " + request.getUrl(), e);
} catch (IOException e) {
int statusCode;
if (httpResponse != null) {
statusCode = httpResponse.getStatusCode();
} else {
//该request是否允许重试
if (request.shouldRetryConnectionErrors()) {
//重试 超出最大次数则抛出NoConnectionError
attemptRetryOnException("connection", request, new NoConnectionError(e));
continue;
} else {
//直接抛出异常
throw new NoConnectionError(e);
}
}
VolleyLog.e("Unexpected response code %d for %s", statusCode, request.getUrl());
NetworkResponse networkResponse;
//响应内容是否为空
if (responseContents != null) {
networkResponse =
new NetworkResponse(
statusCode,
responseContents,
/* notModified= */ false,
SystemClock.elapsedRealtime() - requestStart,
responseHeaders);
//响应状态码401或403
if (statusCode == HttpURLConnection.HTTP_UNAUTHORIZED
|| statusCode == HttpURLConnection.HTTP_FORBIDDEN) {
//重试 否则 抛出印证失败异常
attemptRetryOnException(
"auth", request, new AuthFailureError(networkResponse));
} else if (statusCode >= 400 && statusCode <= 499) {
// Don't retry other client errors.
//客户端异常
throw new ClientError(networkResponse);
} else if (statusCode >= 500 && statusCode <= 599) {
if (request.shouldRetryServerErrors()) {
//重试 否则 服务端异常
attemptRetryOnException(
"server", request, new ServerError(networkResponse));
} else {
//服务端异常
throw new ServerError(networkResponse);
}
} else {
// 3xx? No reason to retry.
// 服务端异常
throw new ServerError(networkResponse);
}
} else {
//重试 网络异常
attemptRetryOnException("network", request, new NetworkError());
}
}
}
}
上述代码就是执行网络请求,根据响应及响应码判断是否重试,或抛出异常,或构造NetworkResponse
返回
获取到响应后,然后解析request.parseNetworkResponse(networkResponse)
,接着调用mDelivery.postResponse(request, response)
分发响应
@Override
public void postResponse(Request<?> request, Response<?> response) {
postResponse(request, response, null);
}
@Override
public void postResponse(Request<?> request, Response<?> response, Runnable runnable) {
request.markDelivered();
request.addMarker("post-response");
mResponsePoster.execute(new ResponseDeliveryRunnable(request, response, runnable));
}
接着查看ResponseDeliveryRunnable
的实现
public ResponseDeliveryRunnable(Request request, Response response, Runnable runnable) {
//请求
mRequest = request;
//响应
mResponse = response;
//是否需要执行runnable
mRunnable = runnable;
}
然后再查看其run()
方法的实现
@Override
public void run() {
//如果取消 直接结束分发
if (mRequest.isCanceled()) {
mRequest.finish("canceled-at-delivery");
return;
}
// 根据响应状态回调对应结果
if (mResponse.isSuccess()) {
mRequest.deliverResponse(mResponse.result);
} else {
mRequest.deliverError(mResponse.error);
}
// response为intermediate 对应缓存调度器中需要刷新缓存
if (mResponse.intermediate) {
mRequest.addMarker("intermediate-response");
} else {
mRequest.finish("done");
}
//对应缓存调度器 刷新缓存加入网络请求队列
if (mRunnable != null) {
mRunnable.run();
}
}
上面就是一个响应结果的分发实现,取消则不再分发,否则回调成功或失败;但需要注意的是request有相应缓存时,正常情况下会先将缓存作为response,如果需要刷新缓存,当服务器数据更新时,会有两次response
3.3.Volley为什么不适合大文件的下载和上传
首先结合源码看一下下载,前面已分析,所有的网络请求都是基于BasicNetwork
封装的,看看他的``````
@Override
public NetworkResponse performRequest(Request<?> request) throws VolleyError {
while (true) {
...
try {
// Gather headers.
Map<String, String> additionalRequestHeaders =
getCacheHeaders(request.getCacheEntry());
//底层网络框架的请求
httpResponse = mBaseHttpStack.executeRequest(request, additionalRequestHeaders);
int statusCode = httpResponse.getStatusCode();
...
InputStream inputStream = httpResponse.getContent();
if (inputStream != null) {
//输入流的读取
responseContents =
inputStreamToBytes(inputStream, httpResponse.getContentLength());
} else {
// Add 0 byte response as a way of honestly representing a
// no-content request.
responseContents = new byte[0];
}
...
}
}
private byte[] inputStreamToBytes(InputStream in, int contentLength)
throws IOException, ServerError {
PoolingByteArrayOutputStream bytes = new PoolingByteArrayOutputStream(mPool, contentLength);
byte[] buffer = null;
try {
if (in == null) {
throw new ServerError();
}
buffer = mPool.getBuf(1024);
int count;
while ((count = in.read(buffer)) != -1) {
bytes.write(buffer, 0, count);
}
return bytes.toByteArray();
} finally {
...
//全部读取输入流 且缓存在mPool中
mPool.returnBuf(buffer);
bytes.close();
}
}
@Deprecated
public BasicNetwork(HttpStack httpStack) {
//第二个参数即mPool
this(httpStack, new ByteArrayPool(DEFAULT_POOL_SIZE));
}
//默认值4K
private static final int DEFAULT_POOL_SIZE = 4096;
public synchronized void returnBuf(byte[] buf) {
//如果数据大于缓存 则直接不缓存
if (buf == null || buf.length > mSizeLimit) {
return;
}
...
}
从上可以看出,进行响应读取时,会全部读取输入流数据,且超过ByteArrayPool(缓存池,减少内存分配的次数)
大小时,缓存失效,对大I/O
操作没有进行处理,所有Volley
不适合大文件的下载
上面Volley
不适合大文件下载搞清楚了,下面的大文件上传其实是类似的,首先是执行请求mBaseHttpStack.executeRequest(request, additionalRequestHeaders)
的方法,然后由底层网络库具体实现,最后都会读取request
的body
private static void setEntityIfNonEmptyBody(
HttpEntityEnclosingRequestBase httpRequest, Request<?> request)
throws AuthFailureError {
//这里也是将请求体的内容一次性读取 并发时容易造成OOM
byte[] body = request.getBody();
if (body != null) {
HttpEntity entity = new ByteArrayEntity(body);
httpRequest.setEntity(entity);
}
}
android中一般处理大文件的网络操作时,都会采用到多线程分块(RandomAccessFile)
的方式,为了防止中断也会加上本地保存进度,进行断点处理(下载是head中加入range,上传是与服务端协商参数分块总数,当前块,请求输入流的分块setChunkedStreamingMode,防止OOM,服务端需回传每块文件的下载位置)
4.总结
上述就是整个volley
的网络请求过程,流程还是比较清晰的,首先用户可以自定义BaseHttpStack
,OkHttp的作者Jake Wharton就提供OkHttpStack
底层通过okhttp访问网络,默认情况下,android 2.3以前采用的是HttpClient,2.3及以后采用的是HttpURLConnection访问网络.
Volley
默认开启一个缓存请求线程,4个网络请求线程,通过判断请求是否可以缓存来加入网络队列还是缓存队列,这两个队列都是阻塞队列,循环从队列里去请求,取不到则阻塞,第一次都会从网路获取数据,然后判断是否缓存,缓存队列判断缓存信息,304则需要重新从网络队列获取
参考:
https://blog.csdn.net/guolin_blog/article/details/17656437