dubbo的Reference注解实现原理

dubbo 是一个流行的 RPC 框架,它提供了一个 @Reference 注解,用于配置依赖的远程接口,完成代理类的自动注入。本文主要关注 @Reference 注解的实现原理。

Reference 注解的用法

如下是一个 dubbo 调用 RPC 接口的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SpringBootApplication
public class DubboDemoClientApplication {

// dubbo会自动完成依赖注入
@Reference
private HelloService demoService;

public static void main(String[] args) {
SpringApplication.run(DubboDemoClientApplication.class, args);
}

@PostConstruct
public void init() {
// 就像调用本地方法一样
String sayHello = demoService.sayHello("world");
System.out.println(sayHello);
}
}

这里要调用的接口为 HelloService,只需要在 private HelloService demoService 属性上加上 @Reference 注解(当然项目中还要配置一些公共信息),dubbo 会自动为其生成对应的代理类并完成注入,使用起来就像是调用本地的接口方法一样。

Reference 注解的实现原理

要完成代理类的注入,dubbo 需要提供如下两个功能:

  1. 获取相关信息,生成接口的代理类。用于代理方法请求,将请求信息转换为 dubbo 协议数据发送给服务提供方,接收服务提供方的返回数据并转换为方法返回结果。实际上还会包含负载均衡、路由、鉴权等处理。
  2. 在合适的时机完成代理类的注入。

dubbo 的 @Reference 注解的实现参考了 spring 的 @Autowired 注解的实现,后者是通过 AutowiredAnnotationBeanPostProcessor 后处理器实现的,主要流程是:

  1. 在创建完 bean 实例之后 postProcessMergedBeanDefinition 时获取注解信息,构建依赖注入元数据
  2. 在 populateBean 阶段,postProcessProperties 时获取依赖注入元数据,调用其 inject 方法完成 field 或 metho 的依赖注入

更具体的流程可查看 Autowired 注解依赖注入原理

类似地, dubbo 是通过 ReferenceAnnotationBeanPostProcessor 后处理器实现的。下面先来看下它的类继承结构(实际上它实现的接口不止这些,不过在这里重点只需关注图上的接口)

image-20200415004940541

从图中可以看到,共用有两个接口分支:一个分支是 MergedBeanDefinitionPostProcessor,它主要用于在创建完 bean 实例之后对 MergedBeanDefinition 进行处理,另一个分支是 InstantiationAwareBeanPostProcessor,它主要用于在 populateBean 阶段对 bean 实例的属性进行处理。

image-20200402015737100

构建依赖注入元数据

构建依赖注入元数据是通过 MergedBeanDefinitionPostProcessorpostProcessMergedBeanDefinition 回调方法触发的。

相关代码在 AnnotationInjectedBeanPostProcessor 中,与 AutowiredAnnotationBeanPostProcessor 的实现基本一致:在首次获取时创建并添加到缓存中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private InjectionMetadata findInjectionMetadata(String beanName, Class<?> clazz, PropertyValues pvs) {
// Fall back to class name as cache key, for backwards compatibility with custom callers.
String cacheKey = (StringUtils.hasLength(beanName) ? beanName : clazz.getName());
// Quick check on the concurrent map first, with minimal locking.
AnnotationInjectedBeanPostProcessor.AnnotatedInjectionMetadata metadata = this.injectionMetadataCache.get(cacheKey);
if (InjectionMetadata.needsRefresh(metadata, clazz)) {
synchronized (this.injectionMetadataCache) {
metadata = this.injectionMetadataCache.get(cacheKey);
if (InjectionMetadata.needsRefresh(metadata, clazz)) {
if (metadata != null) {
metadata.clear(pvs);
}
try {
// 构建依赖注入元数据
metadata = buildAnnotatedMetadata(clazz);
this.injectionMetadataCache.put(cacheKey, metadata);
} catch (NoClassDefFoundError err) {
throw new IllegalStateException("Failed to introspect object class [" + clazz.getName() +
"] for annotation metadata: could not find class that it depends on", err);
}
}
}
}
return metadata;
}

分别构建 field 和 method 的依赖注入元数据

1
2
3
4
5
6
7
8
private AnnotationInjectedBeanPostProcessor.AnnotatedInjectionMetadata buildAnnotatedMetadata(final Class<?> beanClass) {
// field
Collection<AnnotationInjectedBeanPostProcessor.AnnotatedFieldElement> fieldElements = findFieldAnnotationMetadata(beanClass);
// method
Collection<AnnotationInjectedBeanPostProcessor.AnnotatedMethodElement> methodElements = findAnnotatedMethodMetadata(beanClass);
// 构建AnnotatedInjectionMetadata
return new AnnotationInjectedBeanPostProcessor.AnnotatedInjectionMetadata(beanClass, fieldElements, methodElements);
}

构建 AnnotatedInjectionMetadata 对象

image-20200415012759565
1
2
3
4
5
6
7
8
9
10
11
12
13
public AnnotatedInjectionMetadata(Class<?> targetClass, Collection<AnnotationInjectedBeanPostProcessor.AnnotatedFieldElement> fieldElements,                                  Collection<AnnotationInjectedBeanPostProcessor.AnnotatedMethodElement> methodElements) {
// 相关信息保存到 InjectionMetadata 属性中
super(targetClass, combine(fieldElements, methodElements));
// 忽略
}

private static <T> Collection<T> combine(Collection<? extends T>... elements) {
List<T> allElements = new ArrayList<>();
for (Collection<? extends T> e : elements) {
allElements.addAll(e);
}
return allElements;
}

findFieldAnnotationMetadata

查找 field 上的@Reference 注解,构建 AnnotatedFieldElement 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private List<AnnotationInjectedBeanPostProcessor.AnnotatedFieldElement> findFieldAnnotationMetadata(final Class<?> beanClass) {
final List<AnnotationInjectedBeanPostProcessor.AnnotatedFieldElement> elements = new LinkedList<AnnotationInjectedBeanPostProcessor.AnnotatedFieldElement>();
// 循环遍历类的field
ReflectionUtils.doWithFields(beanClass, field -> {
// 处理Reference注解
for (Class<? extends Annotation> annotationType : getAnnotationTypes()) {

AnnotationAttributes attributes = getMergedAttributes(field, annotationType, getEnvironment(), true);

if (attributes != null) {
if (Modifier.isStatic(field.getModifiers())) {
if (logger.isWarnEnabled()) {
logger.warn("@" + annotationType.getName() + " is not supported on static fields: " + field);
}
return;
}
// 构建AnnotatedFieldElement对象
elements.add(new AnnotatedFieldElement(field, attributes));
}
}
});
return elements;
}

findAnnotatedMethodMetadata

查找 method 上的@Reference 注解,构建 AnnotatedMethodElement 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private List<AnnotationInjectedBeanPostProcessor.AnnotatedMethodElement> findAnnotatedMethodMetadata(final Class<?> beanClass) {

final List<AnnotationInjectedBeanPostProcessor.AnnotatedMethodElement> elements = new LinkedList<AnnotationInjectedBeanPostProcessor.AnnotatedMethodElement>();
// 循环遍历类的method
ReflectionUtils.doWithMethods(beanClass, method -> {

Method bridgedMethod = findBridgedMethod(method);
if (!isVisibilityBridgeMethodPair(method, bridgedMethod)) {
return;
}
// 处理Reference注解
for (Class<? extends Annotation> annotationType : getAnnotationTypes()) {

AnnotationAttributes attributes = getMergedAttributes(bridgedMethod, annotationType, getEnvironment(), true);

if (attributes != null && method.equals(ClassUtils.getMostSpecificMethod(method, beanClass))) {
if (Modifier.isStatic(method.getModifiers())) {
if (logger.isWarnEnabled()) {
logger.warn("@" + annotationType.getName() + " annotation is not supported on static methods: " + method);
}
return;
}
if (method.getParameterTypes().length == 0) {
if (logger.isWarnEnabled()) {
logger.warn("@" + annotationType.getName() + " annotation should only be used on methods with parameters: " +
method);
}
}
PropertyDescriptor pd = BeanUtils.findPropertyForMethod(bridgedMethod, beanClass);
// 构建AnnotatedMethodElement对象
elements.add(new AnnotatedMethodElement(method, pd, attributes));
}
}
});

return elements;

}

完成依赖注入

依赖注入是通过 InstantiationAwareBeanPostProcessorpostProcessProperties 回调方法触发的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public PropertyValues postProcessPropertyValues(
PropertyValues pvs, PropertyDescriptor[] pds, Object bean, String beanName) throws BeanCreationException {
// 获取依赖注入元数据
InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
try {
// 调用其 inject 方法完成注入
metadata.inject(bean, beanName, pvs);
} catch (BeanCreationException ex) {
throw ex;
} catch (Throwable ex) {
throw new BeanCreationException(beanName, "Injection of @" + getAnnotationType().getSimpleName()
+ " dependencies is failed", ex);
}
return pvs;
}

InjectionMetadata 的 inject 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void inject(Object target, @Nullable String beanName, @Nullable PropertyValues pvs) throws Throwable {
Collection<InjectedElement> checkedElements = this.checkedElements;
Collection<InjectedElement> elementsToIterate =
(checkedElements != null ? checkedElements : this.injectedElements);
if (!elementsToIterate.isEmpty()) {
for (InjectedElement element : elementsToIterate) {
if (logger.isTraceEnabled()) {
logger.trace("Processing injected element of bean '" + beanName + "': " + element);
}
element.inject(target, beanName, pvs);
}
}
}

它实际上是调用的 InjectedElement 的 inject 方法,在这里也就是调用 AnnotatedFieldElementAnnotatedMethodElement 的 inject 方法。

AnnotatedFieldElement 的 inject 方法

它的主要逻辑是:获取依赖注入对象,通过反射将其设置到对应的 field 中,完成依赖注入

1
2
3
4
5
6
7
8
9
protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
Class<?> injectedType = field.getType();
// 获取依赖注入对象
Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);

// 完成依赖注入
ReflectionUtils.makeAccessible(field);
field.set(bean, injectedObject);
}

AnnotatedMethodElement 的 inject 方法

它的主要逻辑与 AnnotatedFieldElement 类似:先获取依赖注入对象,然后通过反射调用对应方法设置依赖对象,完成依赖注入

1
2
3
4
5
6
7
8
9
protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
Class<?> injectedType = pd.getPropertyType();
// 获取依赖注入对象
Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);

// 完成依赖注入
ReflectionUtils.makeAccessible(method);
method.invoke(bean, injectedObject);
}

getInjectedObject

getInjectedObject 方法用于获取依赖的对象,也就是 RPC 接口的代理类。为了提高性能,首次创建后会添加到缓存中。

1
2
3
4
5
6
7
8
9
10
11
12
13
protected Object getInjectedObject(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType, InjectionMetadata.InjectedElement injectedElement) throws Exception {
// 构建缓存的 key
String cacheKey = buildInjectedObjectCacheKey(attributes, bean, beanName, injectedType, injectedElement);
Object injectedObject = injectedObjectsCache.get(cacheKey);
if (injectedObject == null) {
// 创建代理类实例
injectedObject = doGetInjectedBean(attributes, bean, beanName, injectedType, injectedElement);
// Customized inject-object if necessary
injectedObjectsCache.putIfAbsent(cacheKey, injectedObject);
}

return injectedObject;
}

doGetInjectedBean

实际创建代理类实例的逻辑由 ReferenceAnnotationBeanPostProcessordoGetInjectedBean 方法实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType, InjectionMetadata.InjectedElement injectedElement) throws Exception {

// 生成beanName(interfaceClassName,以及可选的group、version等信息组合)
String referencedBeanName = buildReferencedBeanName(attributes, injectedType);

// 构建ReferenceBean,代理其实在这里
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referencedBeanName, attributes, injectedType);

// 注册referenceBean到beanFactory中
registerReferenceBean(referencedBeanName, referenceBean, attributes, injectedType);

// 缓存
cacheInjectedReferenceBean(referenceBean, injectedElement);

// 创建代理
return buildProxy(referencedBeanName, referenceBean, injectedType);
}

初看以为是在 buildProxy 时创建接口的代理,实际看下来是创建代理不假,但逻辑并不在此。它的 InvocationHandler 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private static class ReferenceBeanInvocationHandler implements InvocationHandler {
// 一个FactoryBean,包含了很多配置,封装了代理类
private final ReferenceBean referenceBean;
// 由referenceBean创建的实例,真实的代理类
private Object bean;

private ReferenceBeanInvocationHandler(ReferenceBean referenceBean) {
this.referenceBean = referenceBean;
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Object result;
try {
if (bean == null) { // If the bean is not initialized, invoke init()
// issue: https://github.com/apache/dubbo/issues/3429
init();
}
// 注意第一个入参是 bean,而不是 referenceBean
result = method.invoke(bean, args);
} catch (InvocationTargetException e) {
// re-throws the actual Exception.
throw e.getTargetException();
}
return result;
}

private void init() {
// 获取真实的代理类实例
this.bean = referenceBean.get();
}
}

由此可见,buildProxy 创建的代理只是一个壳,实际的代理是在 ReferenceBean 中创建的。

ReferenceBean 是一个 FactoryBean,它内部缓存了接口的代理类的引用,使用时通过 getObject 获取,它的 init 方法会创建接口的代理。至于为什么这么设计,不是很明白,总之它就是这么做了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {
urls.clear(); // reference retry init will add url to urls, lead to OOM
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (StringUtils.isEmpty(url.getPath())) {
url = url.setPath(interfaceName);
}
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
// if protocols not injvm checkRegistry
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())){
checkRegistry();
List<URL> us = loadRegistries(false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
}

if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use RegistryAwareCluster only when register's CLUSTER is available
URL u = registryURL.addParameter(CLUSTER_KEY, RegistryAwareCluster.NAME);
// The invoker wrap relation would be: RegistryAwareClusterInvoker(StaticDirectory) -> FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
invoker = CLUSTER.join(new StaticDirectory(u, invokers));
} else { // not a registry url, must be direct invoke.
invoker = CLUSTER.join(new StaticDirectory(invokers));
}
}
}

if (shouldCheck() && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
metadataReportService.publishConsumer(consumerURL);
}
// create service proxy
return (T) PROXY_FACTORY.getProxy(invoker);
}