声明式接口调用:Feign源码剖析(3)Feign是如何处理请求的
上文已经完整的分析了FeignClient被创建的过程,每个服务消费者被注入的ServiceAClient,其实都是他的动态代理实例,对ServiceAClient的任何方法调用,都会被委托给动态代理实例来完成,也就是
SynchronousMethodHandler#invoke
方法。本文就分析一下该方法的执行原理
1 | class SynchronousMethodHandler |
- 根据你传入的方法入参还原出一个RequestTemplate
- 这个Retryer目前是Retryer.NERVER_RETRY
1 | class SynchronousMethodHandler |
执行一系列的RequestTemplate
1
2
3
4
5
6Request targetRequest(RequestTemplate template) {
for (RequestInterceptor interceptor : requestInterceptors) {
interceptor.apply(template);
}
return target.apply(template);
}调用
LoadBalancerFeignClient#execute()
处理请求1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27class LoadBalancerFeignClient implements Client
public Response execute(Request request, Request.Options options) throws IOException {
try {
// 解析URI
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
// 获取客户端配置
IClientConfig requestConfig = getClientConfig(options, clientName);
// CachingSpringLoadBalancerFactory#create(clientName)创建一个FeignLoadBalancer,这个工厂是带缓存的
// 如果缓存没有,会委托SpringClientFactory#getInstance()创建一个 FeignLoadBalancer
// 最后执行FeignLoadBalancer的executeWithLoadBalancer执行请求
return lbClient(clientName)
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}- 解析URI
- 获取客户端配置
- 获取一个
FeignLoadBalancer
- 委托
FeignLoadBalancer
的executeWithLoadBalancer
执行请求(其实最后是委托给父类AbstractLoadBalancerAwareClient
)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22class AbstractLoadBalancerAwareClient
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
return command.submit(
new ServerOperation<T>() {
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
}- 这feel又是国外喜闻乐见的reactor编程,我们看看这个command.submit做了些什么吧
1 | class LoadBalancerCommand |
选择一个server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
public void call(Subscriber<? super Server> next) {
try {
// 这里其实就是调用Ribbon的ZoneAwareLoadBalancer选择一个Server出来
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}- 调用Ribbon的
ZoneAwareLoadBalancer
选择一个Server出来
- 调用Ribbon的
接着又回调到
AbstractLoadBalancerAwareClient#executeWithLoadBalancer()
这里笔者不得不吐槽下这个rx系列的编程方式,难以调试,运行流程不清晰。国内很少人用,这里咱们看个大概就好
reconstructURIWithServer
和request.replaceUri(finalUri)
根据选择好的server,把服务名替换成对应的主机名+端口号(比如http://serviceA/greeting/sam -> http://127.0.0.1:8081/greeting/sam)
AbstractLoadBalancerAwareClient.this.execute
这个会调用FeignLoadBalancer#execute()
方法再往下就到了FeignCore的HTTP客户端组件,这里是最后发起HTTP请求的地方
可以看到feign底层使用JDK自带的HttpURLConnection
这套HTTP API进行通信的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80interface Client
@Override
public Response execute(Request request, Options options) throws IOException {
HttpURLConnection connection = convertAndSend(request, options);
return convertResponse(connection, request);
}
HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
final URL url = new URL(request.url());
final HttpURLConnection connection = this.getConnection(url);
if (connection instanceof HttpsURLConnection) {
HttpsURLConnection sslCon = (HttpsURLConnection) connection;
if (sslContextFactory != null) {
sslCon.setSSLSocketFactory(sslContextFactory);
}
if (hostnameVerifier != null) {
sslCon.setHostnameVerifier(hostnameVerifier);
}
}
// 这里其实就是你配置的一系列的连接超时参数
connection.setConnectTimeout(options.connectTimeoutMillis());
connection.setReadTimeout(options.readTimeoutMillis());
connection.setAllowUserInteraction(false);
connection.setInstanceFollowRedirects(options.isFollowRedirects());
connection.setRequestMethod(request.httpMethod().name());
Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
boolean gzipEncodedRequest =
contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP);
boolean deflateEncodedRequest =
contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE);
boolean hasAcceptHeader = false;
Integer contentLength = null;
for (String field : request.headers().keySet()) {
if (field.equalsIgnoreCase("Accept")) {
hasAcceptHeader = true;
}
for (String value : request.headers().get(field)) {
if (field.equals(CONTENT_LENGTH)) {
if (!gzipEncodedRequest && !deflateEncodedRequest) {
contentLength = Integer.valueOf(value);
connection.addRequestProperty(field, value);
}
} else {
connection.addRequestProperty(field, value);
}
}
}
// Some servers choke on the default accept string.
if (!hasAcceptHeader) {
connection.addRequestProperty("Accept", "*/*");
}
if (request.requestBody().asBytes() != null) {
if (contentLength != null) {
connection.setFixedLengthStreamingMode(contentLength);
} else {
connection.setChunkedStreamingMode(8196);
}
connection.setDoOutput(true);
OutputStream out = connection.getOutputStream();
if (gzipEncodedRequest) {
out = new GZIPOutputStream(out);
} else if (deflateEncodedRequest) {
out = new DeflaterOutputStream(out);
}
try {
out.write(request.requestBody().asBytes());
} finally {
try {
out.close();
} catch (IOException suppressed) { // NOPMD
}
}
}
return connection;
}
}