源码分析OKHttp

OkHttp3特点

OkHttp是一个高效的Http客户端,有如下的特点:

  • 支持HTTP2/SPDY
  • socket自动选择最好路线,并支持自动重连
  • 拥有自动维护的socket连接池,减少握手次数
  • 拥有队列线程池,轻松写并发
  • 拥有Interceptors轻松处理请求与响应(比如透明GZIP压缩,LOGGING)
  • 基于Headers的缓存策略

主要对象

  • Connections:对JDK中socket进行了引用计数封装,用来控制socket连接
  • Streams:维护HTTP的流,用来对Request/Response进行IO操作
  • Calls:HTTP请求任务封装
  • StreamAllocation:用来控制Connections/Streams的资源分配与释放

工作流程的概述

当我们OkHttpClient.newCall(request)进行execute/enenqueue时,实际是将请求Call放到了Dispatcher中,
OkHttp使用Dispatcher进行线程分发,它有两种方法,
一个是普通的同步单线程;
另一个是使用了队列进行并发任务的分发(Dispatch)与回调。

1.Dispatcher的结构

Dispatcher维护了如下变量,用于控制并发的请求:

  • maxRequests = 64:最大并发请求数为64
  • maxRequestsPerHost = 5:每个主机最大请求数为5
  • Dispatcher:分发者,也就是生产者(默认在主线程)
  • AsyncCall:队列中需要处理的Runnable(包装了异步回调接口)
  • ExecutorService: 消费者池(也就是线程池)
  • Deque: 缓存(用数组实现,可自动扩容,无大小限制)
  • Deque: 正在运行的任务,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存

根据生产者消费者模型的理论,当入队(enqueue)请求时,如果满足(runningRequests < 64 && runningRequestsPerHost < 5),
那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行。
如果消费者缓存满了,就放入readyAsyncCalls进行缓存等待。

1
2
3
4
5
6
7
8
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}

当任务执行完成后,调用finished的promoteCalls()函数,手动移动缓存区(可以看出这里是主动清理的,因此不会发生死锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();

if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}

if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}

Mark-Down

线程池基础

线程池好处都有啥

线程池的关键在于线程复用以减少非核心任务的损耗。
线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。
通过对线程进行缓存,减少了创建销毁的时间损失。
通过控制线程数量的阈值,减少了当线程过少时带来的CPU闲置与线程过多时对JVM的内存与线程切换时系统调用的压力。

OkHttp配置的线程池

在OkHttp,使用如下构造了单例线程例:

1
2
3
4
5
6
public synchronized ExecutorService executorService(){
if(executorService == null){
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60 , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.thread("OkHttp Dispatcher", false));
}
return executorService;
}}

参数说明如下:

  • int corePoolSize :最小并发线程数,这里并发同时包括空闲与活动的线程,如果是0的话,空闲一段时间后所有线程将全部被销毁。
  • int maximumPoolSize:最大线程数,当任务进来时可以扩充的线程最大数,当大于了这个值就会根据丢弃处理机制来处理。
    long keepAliveTime:当线程数大于corePoolSize时,多余的空闲线程的最大存活时间,类似于HTTP中的keep-alive
    TimeUnit unit:时间单位,一般用秒
    BlockingQueue workQueue:工作队列
    ThreadFactory threadFactory:单个线程的工厂,可以打log,设置Dameo(即当JVM退出时,线程自动结束)等。

可以看出,在OkHttp中,构建了一个阈值为[0, Integer.MAX_VALUE]的线程池,它不保留任何最小线程数,随时创建更多的线程数,当线程空闲时只能存活60秒,它使用了一个不存储元素的阻塞工作队列,一个叫做“OkHttp Disptcher”的线程工厂。
也就是说,在实际运行中,当收到10个并发请求时,线程池会创建10个线程,当工作完成后,线程池会在60s后相继关闭所有线程。

反向代理模型

在OkHttp中,使用了与Nginx类似的反向代理与分发技术,这是典型的单生产多消费者问题。
我们知道在Nginx站,用户通过HTTP(SOCKET)访问前置的服务器,服务器会添加Header并自动转发给请求给后端群,接着返回数据结果给用户。
通过将工作分配给多个后台服务器,可以提高服务的负载均衡能力,实现非阻塞、高并发连接,避免资源全部放到一台服务器而带来的负载、速度、在线率等影响。
而在OkHttp中,非常类似于上述场景,它使用Dispatcher作为任务的转发器,线程池对应多台后置服务器,用AsyncCall对应Socket请求,用Deque对应Nginx的内部缓存。

OkHttp的任务调度

当我们希望使用OkHttp的异步请求时,一般进行如下构造

1
2
3
4
5
6
7
8
9
10
11
12
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
.url("http://qq.com").get().build();
Client.newCall(request).enqueue(new Callback(){
@Override
public void onFailure(Call call, IOException e){
}

@Override
public void onResponse(Call call,Response response) throws IOException{
}
});

当HttpClient的请求入队是,根据代码,我们可以发现实际上是Dispatcher进行了入队操作

1
2
3
4
5
6
7
8
9
10
11
synchronized void enqueue(AsyncCall call){
if(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxReqeustsPerHost){
//添加正在运行的请求
runningAsyncCalls.add(call);
//线程池执行请求
executorService().execute(call);
}else{
//添加到缓存队列排队等待
readyAsyncCalls.add(call);
}
}

如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建、销毁、缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。
我们再分析请求元素AsyncCall(它实现了Runnable接口),它内部实现的execute方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override 
protected void execute(){
boolean signalledCallback = false;
try{
//执行耗时IO任务
Response response = getResponseWithInterceptorChain(forWebSocket);
if(canceled){
signalledCallback = true;
//回调,注意这里回调在线程池中,而不是想当然的在主线程回调
resposneCallback.onFailure(RealCall.this, new IOException("Cancelled"));
}else{
signalledCallback = true;
//回到,同上
responseCallback.onResponse(RealCall.this, response);
}
}catch(IOException e){
if(signalledCallback){
logger.log(Level.INFO, "Callback failure for " + toLoggableString(),e):
}else{
responseCallback.onFailure(RealCall.this, e);
}
}finally{
//最关键的代码
client.dispatcher().finished(this);
}
}

当任务执行完成后,无论是否有异常,finally代码段总会执行,也就调用Dispatcher的finished函数,打开源码,发现它将正在运行的任务Call从队列runningAsyncCalls中移除后,接着执行promoteCalls()函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void promoteCalls(){
//如果目前是最大负荷运转,接着等
if(runningAsyncCalls.size() >= maxRequests) return;
//如果缓存等待区是空的,接着等
if(readyAsyncCall.isEmpty()) return;
for(Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext();){
AsyncCall call = i.next();
if(runningCallsForHost(call) < maxRequestsPerHost){
//将缓存等待区最后一个移动到运行区中,并执行
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if(runningAsyncCalls.size() >= maxRequests) return;
}
}

这样,就主动的把缓存队列向前走了一步,而没有使用互斥锁等复杂编码。

总结

  • OkHttp采用了Dispatcher技术,类似于Nginx,与线程池配合实现了搞并发,低阻塞的运行
  • OkHttp采用Deque作为缓存,按照入队的顺序先进先出
  • OkHttp最出彩的地方就是try/finally中调用了finished函数,可以主动控制等待队列的移动,而不是采用锁,极大减少了编码复杂性。





    Socket管理(StreamAllocation)

    1.选择路线与自动重连(RouteSelector)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public Route next() throws IOException{
    if(!hasNextInetSocketAddress()){
    if(!hasNextProxy()){
    if(!hasNextPostponed()){
    throw new NoSuchElementException();
    }
    return nextPostponed();
    }
    lastProxy = nextProxy();
    }
    lastInetSocketAddress = nextInetSocketAddress();

    Route route = new Route(address, lastProxy, lastInetSocketAddress);
    if(routeDatabase.shouldPostpone(route)){
    postponsedRoutes.add(route);
    return next();
    }

    return route;
    }

如果Proxy为null:

  1. 在构造函数中设置代理为Proxy.NO_PROXY
  2. 如果缓存中的lastInetSocketAddress为空,就通过DNS(默认是Dns.SYSTEM,包装了jdk自带的lookup函数)查询,并保存结果,注意结果是数组,即一个域名有多个IP,这就是自动重连的来源。
  3. 如果还没有查询到就递归调用next查询,直到查询到为止
  4. 一切next都没有枚举到,抛出NoSuchElementException,退出


    如果Proxy为HTTP:
  5. 设置socket的ip为代理地址的ip
  6. 设置socket的端口为代理地址的端口
  7. 一切next都没有枚举到,抛出NoSuchElementException,退出

2.连接socket链路(RealConnection)

当地址,端口准备好了,就可以进TCP连接了(也就是三次握手),步骤如下:

  1. 如果连接池中已经存在连接,就从中取出(get)RealConnection,如果没有命中就进入下一步
  2. 根据选择的路线(Route),调用Platform.get().connectSokcet选择当前平台Runtime下最好的socket库进行握手
  3. 强建立成功的RealConnection放入(put)连接池缓存
  4. 如果存在TLS,就根据SSL版本与证书进行安全握手
  5. 构造HttpStream并维护刚刚的socket连接,管理建立完成。

3.释放socket链路(release)

如果不再需要(比如通信完成,连接失败)此链路,释放连接(也就是TCP断开)

  1. 尝试从缓存的连接池中删除(remove)
  2. 如果没有命中缓存,就直接调用jdk的socket关闭

HTTP请求序列化/反序列化

1.获得HTTP流(httpStream)

以下为无缓存,无多次302跳转,网络良好,HTTP/1.1下的GET访问实例分析。

1
httpStream = connect();

在connect()有非常重要的一步,它通过okio库与远程socket建立了I/O连接,为了更好的理解,我们可以把它看成管道:

1
2
3
4
//source 用于获取response
source = Okio.buffer(Okio.source(rawSocket));
//sink用于write buffer到server
sink = Okio.buffer(Okio.sink(rawSocket));

OkHttp的I/O使用的是Okio库:

  • Buffer:Buffer是可变字节,类似于byte[],相当于传输介质
  • source: source是Okio库中的输入组件,类似于inputStream,经常在下载中用到。它的重要方法是read(Buffer sink, long byteCount),从流中读取数据。
  • sink:sink是okio库中的io输入组件,类似于outputStream,经常用于写到file/socket,它的最重要方法是void write(Buffer source, long byteCount),写数据到Buffer中
    如果把连接看成管道,->为管道的方向,如下
    1
    2
    Sink -> Socket/File
    Source <- Socket/File

2.拼装Raw请求与Headers(writeRequestHeaders)

我们通过Rquest.Builder构建了简陋的请求后,可能需要进行一些修饰,这时需要使用interceptors对Request进行进一步的拼装了。
拦截器是OkHttp中强大的流程装置,它可以用来监控log,修改请求,修改结果,甚至是对用户透明的GZIP压缩。
在OkHttp中,内部维护了一个Interceptors的List,通过InterceptorChain进行多次拦截修改操作。
Mark-Down

请求代码如下,是自增递归(recursive)调用Chain.process(),直到interceptors().size()中的拦截器全部调用完。

  1. 递归调用interceptors,依次入栈对response进行处理
  2. 当全部递归出栈完成后,移交给网络模块(getResponse)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    if(index < client.interceptors().size()){
    Interceptor.Chain chain = new ApplicationInterceptorChain(index + 1, request,forWebSocket);
    Interceptor interceptor = client.interceptors().get(index);
    //递归调用Chain。process()
    Response interceptedResponse = interceptor.intercept(chain);
    if(interceptedResponse == null){
    throw new NullPointerException("application interceptor" + interceptor + " returned null");
    }
    return interceptedResponse;
    }
    return getResponse(request, forWebSocket);

接下来是正式的网络请求getResponse(),此步骤通过http协议规范将对象中的数据信息序列化为Raw文本:

  1. 在OkHttp中,通过RequestLine,Request,HttpEngine,Header等参数进行序列化操作,也就是拼装参数为socketRaw数据。拼装方法也比较暴力,直接按照RFC协议要求的格式进行concat输出就实现了
    2.通过sink写入write到socket连接

1.3 获得响应(readResponseHeaders/Bod)

此步骤根据获取到的Socket纯文本,解析为Response对象,我们可以看成是一个反序列(通过http协议将Raw文本转成对象)的过程:
拦截器的设计:

  1. 自定义网络拦截器请求进行递归入栈
  2. 在自定义网络拦截器的intercept中,调用NetworkInterceptorChain的proceed(request),进行真正的网络请求(readNetWorkResponse)

  3. 接自定义请求递归出栈
    网络读取(readNetworkResponse)分析:
    1.读取Raw的第一行,并反序列化为StatusLine对象

  4. 以Transfer-Encoding:chuncked的模式传输并组装Body
    伪代码如下:
    1
    2
    3
    4
    5
    6
    (RawData <- RemoteChannel(www.xx.com, 80) //读取远程的Raw
    map(func NetworkInterceptorChains())//云处理
    //这里的source应用了HttpEngine,并重写了read方法
    .map(func getTransferStream{})
    // 根据source拼装body对象
    .map(func RealResponseBody());

接下来进行释放socket连接。
现在我们就获得response对象,进行进一步Gson操作。