上文已经完整的分析了FeignClient被创建的过程,每个服务消费者被注入的ServiceAClient,其实都是他的动态代理实例,对ServiceAClient的任何方法调用,都会被委托给动态代理实例来完成,也就是SynchronousMethodHandler#invoke方法。本文就分析一下该方法的执行原理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class SynchronousMethodHandler

public Object invoke(Object[] argv) throws Throwable {
// 根据你传入的方法入参还原出一个RequestTemplate
RequestTemplate template = buildTemplateFromArgs.create(argv);
Options options = findOptions(argv);
// 这个Retryer目前是Retryer.NERVER_RETRY
Retryer retryer = this.retryer.clone();
while (true) {
try {
return executeAndDecode(template, options);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
  • 根据你传入的方法入参还原出一个RequestTemplate
  • 这个Retryer目前是Retryer.NERVER_RETRY
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class SynchronousMethodHandler 

Object executeAndDecode(RequestTemplate template, Options options) throws Throwable {
// 执行一系列的RequestInterceptor,如果有自定义的拦截器,会在此时执行
Request request = targetRequest(template);

if (logLevel != Logger.Level.NONE) {
logger.logRequest(metadata.configKey(), logLevel, request);
}

Response response;
long start = System.nanoTime();
try {
// 这个client其实就是LoadBalancerFeignClient
response = client.execute(request, options);
} catch (IOException e) {
if (logLevel != Logger.Level.NONE) {
logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start));
}
throw errorExecuting(request, e);
}
}
  • 执行一系列的RequestTemplate

    1
    2
    3
    4
    5
    6
    Request targetRequest(RequestTemplate template) {
    for (RequestInterceptor interceptor : requestInterceptors) {
    interceptor.apply(template);
    }
    return target.apply(template);
    }
  • 调用LoadBalancerFeignClient#execute()处理请求

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    class LoadBalancerFeignClient implements Client

    public Response execute(Request request, Request.Options options) throws IOException {
    try {
    // 解析URI
    URI asUri = URI.create(request.url());
    String clientName = asUri.getHost();
    URI uriWithoutHost = cleanUrl(request.url(), clientName);
    FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
    this.delegate, request, uriWithoutHost);
    // 获取客户端配置
    IClientConfig requestConfig = getClientConfig(options, clientName);

    // CachingSpringLoadBalancerFactory#create(clientName)创建一个FeignLoadBalancer,这个工厂是带缓存的
    // 如果缓存没有,会委托SpringClientFactory#getInstance()创建一个 FeignLoadBalancer
    // 最后执行FeignLoadBalancer的executeWithLoadBalancer执行请求
    return lbClient(clientName)
    .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
    }
    catch (ClientException e) {
    IOException io = findIOException(e);
    if (io != null) {
    throw io;
    }
    throw new RuntimeException(e);
    }
    }
    • 解析URI
    • 获取客户端配置
    • 获取一个FeignLoadBalancer
    • 委托FeignLoadBalancerexecuteWithLoadBalancer执行请求(其实最后是委托给父类AbstractLoadBalancerAwareClient
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    class AbstractLoadBalancerAwareClient

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
    LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);

    return command.submit(
    new ServerOperation<T>() {
    @Override
    public Observable<T> call(Server server) {
    URI finalUri = reconstructURIWithServer(server, request.getUri());
    S requestForServer = (S) request.replaceUri(finalUri);
    try {
    return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
    }
    catch (Exception e) {
    return Observable.error(e);
    }
    }
    })
    .toBlocking()
    .single();
    }
    • 这feel又是国外喜闻乐见的reactor编程,我们看看这个command.submit做了些什么吧
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
class LoadBalancerCommand

public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();

if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

// 这是重试相关的逻辑,可以先不管他
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

// Use the load balancer
Observable<T> o =
// selectServer(),选择一个server
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// Called for each server being selected
public Observable<T> call(Server server) {
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);

// Called for each attempt and retry
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);

if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();

return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// TODO: What to do if onNext or onError are never called?
}

@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}

@Override
public void onNext(T entity) {
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}

private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});

if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});

if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));

return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
  • 选择一个server

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    private Observable<Server> selectServer() {
    return Observable.create(new OnSubscribe<Server>() {
    @Override
    public void call(Subscriber<? super Server> next) {
    try {
    // 这里其实就是调用Ribbon的ZoneAwareLoadBalancer选择一个Server出来
    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
    next.onNext(server);
    next.onCompleted();
    } catch (Exception e) {
    next.onError(e);
    }
    }
    });
    }
    • 调用Ribbon的ZoneAwareLoadBalancer选择一个Server出来
  • 接着又回调到AbstractLoadBalancerAwareClient#executeWithLoadBalancer()

    这里笔者不得不吐槽下这个rx系列的编程方式,难以调试,运行流程不清晰。国内很少人用,这里咱们看个大概就好

  • AbstractLoadBalancerAwareClient.this.execute
    这个会调用FeignLoadBalancer#execute()方法

  • 再往下就到了FeignCore的HTTP客户端组件,这里是最后发起HTTP请求的地方
    可以看到feign底层使用JDK自带的HttpURLConnection这套HTTP API进行通信的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    interface Client  

    @Override
    public Response execute(Request request, Options options) throws IOException {
    HttpURLConnection connection = convertAndSend(request, options);
    return convertResponse(connection, request);
    }

    HttpURLConnection convertAndSend(Request request, Options options) throws IOException {
    final URL url = new URL(request.url());
    final HttpURLConnection connection = this.getConnection(url);
    if (connection instanceof HttpsURLConnection) {
    HttpsURLConnection sslCon = (HttpsURLConnection) connection;
    if (sslContextFactory != null) {
    sslCon.setSSLSocketFactory(sslContextFactory);
    }
    if (hostnameVerifier != null) {
    sslCon.setHostnameVerifier(hostnameVerifier);
    }
    }
    // 这里其实就是你配置的一系列的连接超时参数
    connection.setConnectTimeout(options.connectTimeoutMillis());
    connection.setReadTimeout(options.readTimeoutMillis());
    connection.setAllowUserInteraction(false);
    connection.setInstanceFollowRedirects(options.isFollowRedirects());
    connection.setRequestMethod(request.httpMethod().name());

    Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING);
    boolean gzipEncodedRequest =
    contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP);
    boolean deflateEncodedRequest =
    contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE);

    boolean hasAcceptHeader = false;
    Integer contentLength = null;
    for (String field : request.headers().keySet()) {
    if (field.equalsIgnoreCase("Accept")) {
    hasAcceptHeader = true;
    }
    for (String value : request.headers().get(field)) {
    if (field.equals(CONTENT_LENGTH)) {
    if (!gzipEncodedRequest && !deflateEncodedRequest) {
    contentLength = Integer.valueOf(value);
    connection.addRequestProperty(field, value);
    }
    } else {
    connection.addRequestProperty(field, value);
    }
    }
    }
    // Some servers choke on the default accept string.
    if (!hasAcceptHeader) {
    connection.addRequestProperty("Accept", "*/*");
    }

    if (request.requestBody().asBytes() != null) {
    if (contentLength != null) {
    connection.setFixedLengthStreamingMode(contentLength);
    } else {
    connection.setChunkedStreamingMode(8196);
    }
    connection.setDoOutput(true);
    OutputStream out = connection.getOutputStream();
    if (gzipEncodedRequest) {
    out = new GZIPOutputStream(out);
    } else if (deflateEncodedRequest) {
    out = new DeflaterOutputStream(out);
    }
    try {
    out.write(request.requestBody().asBytes());
    } finally {
    try {
    out.close();
    } catch (IOException suppressed) { // NOPMD
    }
    }
    }
    return connection;
    }
    }

总结

Feign-Feign接收请求并执行

Comments