openFeign 和 ribbon 及其eureka源码整合

    技术2022-07-13  72

    1.容器启动 ribbon 的初始化

    1.1 ribbon 入口

    1.2 spring.factories 内容

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration

    1.3 进入 RibbonAutoConfiguration.java

    @Configuration @ConditionalOnClass({IClient.class, RestTemplate.class, AsyncRestTemplate.class, Ribbon.class}) @RibbonClients @AutoConfigureAfter( name = {"org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration"} ) @AutoConfigureBefore({LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class}) @EnableConfigurationProperties({RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class}) public class RibbonAutoConfiguration { @Bean public SpringClientFactory springClientFactory() { // 实例化一个工厂类进入这里 SpringClientFactory factory = new SpringClientFactory(); factory.setConfigurations(this.configurations); return factory; } }

    1.4 SpringClientFactory.java

    public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> { static final String NAMESPACE = "ribbon"; public SpringClientFactory() { //SpringClientFactory 实例化,注入RibbonClientConfiguration.class super(RibbonClientConfiguration.class, "ribbon", "ribbon.client.name"); } }

    1.5 NamedContextFactory.java

    public abstract class NamedContextFactory<C extends NamedContextFactory.Specification> implements DisposableBean, ApplicationContextAware { private Class<?> defaultConfigType; private final String propertySourceName; private final String propertyName; public NamedContextFactory(Class<?> defaultConfigType, String propertySourceName, String propertyName) { this.defaultConfigType = defaultConfigType; this.propertySourceName = propertySourceName; this.propertyName = propertyName; } }

    小结:spring ioc 启动时候会实例化SpringClientFactory, 同时 会实例化父类对象 NamedContextFactory。同时注入RibbonClientConfiguration.class 。这个类将来这ribbon 子容器中,会注入其他组件,后面详细说明。

    2.容器启动 openFeign 的初始化

    2.1 入口

    进入到@EnableFeignClients 这个注解类

    2.2 EnableFeignClients .java

    @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented // ioc 启动时候 会加载 FeignClientsRegistrar.class @Import(FeignClientsRegistrar.class) public @interface EnableFeignClients { }

    2.3 进入到 FeignClientsRegistrar.java

    class FeignClientsRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware, EnvironmentAware { // 实现了 ImportBeanDefinitionRegistrar 接口 在ioc 中必然执行registerBeanDefinitions @Override public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { // 会注入一些 属性信息比如Client ,编解码器 及其concat(参数绑定信息) registerDefaultConfiguration(metadata, registry); // 会生成一些jdk 代理 registerFeignClients(metadata, registry); } }

    2.3.1 registerDefaultConfiguration()

    private void registerDefaultConfiguration(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { // 获取元数据信息 Map<String, Object> defaultAttrs = metadata .getAnnotationAttributes(EnableFeignClients.class.getName(), true); if (defaultAttrs != null && defaultAttrs.containsKey("defaultConfiguration")) { String name; if (metadata.hasEnclosingClass()) { name = "default." + metadata.getEnclosingClassName(); } else { name = "default." + metadata.getClassName(); } // 跟进看一下 registerClientConfiguration(registry, name, defaultAttrs.get("defaultConfiguration")); } }

    2.3.1.1 registerClientConfiguration()

    private void registerClientConfiguration(BeanDefinitionRegistry registry, Object name, Object configuration) { BeanDefinitionBuilder builder = BeanDefinitionBuilder .genericBeanDefinition(FeignClientSpecification.class); builder.addConstructorArgValue(name); builder.addConstructorArgValue(configuration); // 往ioc 中注入 name= "default." + metadata.getClassName()+"."+name = "default." + metadata.getClassName(); // value =FeignClientSpecification 的bean 对象 registry.registerBeanDefinition( name + "." + FeignClientSpecification.class.getSimpleName(), builder.getBeanDefinition()); }

    2.3.2registerFeignClient()

    回到 2.3 中的 registerFeignClients(metadata, registry) 方法。

    public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { // 获取 扫描对象 和 eureka 中 server 中注入9个 resouce 一样 // 这里扫描 带@FeignClient 的接口 ClassPathScanningCandidateComponentProvider scanner = getScanner(); scanner.setResourceLoader(this.resourceLoader); Set<String> basePackages; Map<String, Object> attrs = metadata .getAnnotationAttributes(EnableFeignClients.class.getName()); AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter( FeignClient.class); final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients"); if (clients == null || clients.length == 0) { scanner.addIncludeFilter(annotationTypeFilter); // 获取@EnableFeignClients的扫描包路径 basePackages = getBasePackages(metadata); } else { final Set<String> clientClasses = new HashSet<>(); basePackages = new HashSet<>(); for (Class<?> clazz : clients) { basePackages.add(ClassUtils.getPackageName(clazz)); clientClasses.add(clazz.getCanonicalName()); } AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() { @Override protected boolean match(ClassMetadata metadata) { String cleaned = metadata.getClassName().replaceAll("\\$", "."); return clientClasses.contains(cleaned); } }; scanner.addIncludeFilter( new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter))); } for (String basePackage : basePackages) { Set<BeanDefinition> candidateComponents = scanner .findCandidateComponents(basePackage); for (BeanDefinition candidateComponent : candidateComponents) { if (candidateComponent instanceof AnnotatedBeanDefinition) { // 获取到带有 @FeignClient 的接口 AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent; AnnotationMetadata annotationMetadata = beanDefinition.getMetadata(); Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface"); Map<String, Object> attributes = annotationMetadata .getAnnotationAttributes( FeignClient.class.getCanonicalName()); String name = getClientName(attributes); registerClientConfiguration(registry, name, attributes.get("configuration")); //跟进 registerFeignClient(registry, annotationMetadata, attributes); } } } }

    2.3.2.1 registerFeignClient(registry, annotationMetadata, attributes)

    private void registerFeignClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) { String className = annotationMetadata.getClassName(); //实例化一个 FeignClientFactoryBean 的bean 实例。一下是注入一些属性 BeanDefinitionBuilder definition = BeanDefinitionBuilder .genericBeanDefinition(FeignClientFactoryBean.class); validate(attributes); definition.addPropertyValue("url", getUrl(attributes)); definition.addPropertyValue("path", getPath(attributes)); String name = getName(attributes); definition.addPropertyValue("name", name); definition.addPropertyValue("type", className); definition.addPropertyValue("decode404", attributes.get("decode404")); definition.addPropertyValue("fallback", attributes.get("fallback")); definition.addPropertyValue("fallbackFactory", attributes.get("fallbackFactory")); definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); String alias = name + "FeignClient"; AbstractBeanDefinition beanDefinition = definition.getBeanDefinition(); boolean primary = (Boolean)attributes.get("primary"); // has a default, won't be null beanDefinition.setPrimary(primary); String qualifier = getQualifier(attributes); if (StringUtils.hasText(qualifier)) { alias = qualifier; } // 会往ioc 中注册一个bean name=@FeignClient 的接口 value 为FeignClientFactoryBean.class BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[] { alias }); BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); }

    2.3.2.2 FeignClientFactoryBean.java

    class FeignClientFactoryBean implements FactoryBean<Object>, InitializingBean, ApplicationContextAware { // FeignClientFactoryBean 实现了FactoryBean 接口在@FeignClient 的接口 实例化的时候 //会调用getObject()方法 @Override public Object getObject() throws Exception { return getTarget(); } /** * @param <T> the target type of the Feign client * @return a {@link Feign} client created with the specified data and the context information */ <T> T getTarget() { // 从 spring 容器中获取FeignContext.class 对象 FeignContext context = applicationContext.getBean(FeignContext.class); // Feign.Builder builder = feign(context); if (!StringUtils.hasText(this.url)) { String url; if (!this.name.startsWith("http")) { url = "http://" + this.name; } else { url = this.name; } // 默认走到 这个分支 url += cleanPath(); return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type, this.name, url)); } if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) { this.url = "http://" + this.url; } String url = this.url + cleanPath(); Client client = getOptional(context, Client.class); if (client != null) { if (client instanceof LoadBalancerFeignClient) { // not load balancing because we have a url, // but ribbon is on the classpath, so unwrap client = ((LoadBalancerFeignClient)client).getDelegate(); } builder.client(client); } Targeter targeter = get(context, Targeter.class); return (T) targeter.target(this, builder, context, new HardCodedTarget<>( this.type, this.name, url)); } }

    2.3.2.3 feign()

    2.3.2.2 中的 Feign.Builder builder = feign(context);

    protected Feign.Builder feign(FeignContext context) { // 更进 在子容器中注册 FeignLoggerFactory 对象 FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class); Logger logger = loggerFactory.create(this.type); // 往子容器中注入其他属性 Feign.Builder builder = get(context, Feign.Builder.class) // required values .logger(logger) .encoder(get(context, Encoder.class)) .decoder(get(context, Decoder.class)) .contract(get(context, Contract.class)); // @formatter:on configureFeign(context, builder); return builder; }

    2.3.2.4 get()

    2.3.2.3 中的 FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class);

    protected <T> T get(FeignContext context, Class<T> type) { // 继续跟进 T instance = context.getInstance(this.name, type); if (instance == null) { throw new IllegalStateException("No bean found of type " + type + " for " + this.name); } return instance; }

    2.3.2.5 getInstance()

    2.3.2.4 中的 T instance = context.getInstance(this.name, type)

    public <T> T getInstance(String name, Class<T> type) { // 首先获取到一个容器 完事从容器中获取到对应的bean ,继续跟进 AnnotationConfigApplicationContext context = getContext(name); if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0) { return context.getBean(type); } return null; }

    2.3.2.6 getContext()

    2.3.2.5 中的 AnnotationConfigApplicationContext context = getContext(name);

    // 这个name 是@FeignClient 的name 属性 说明不同的name 回创建不同的createContext(); protected AnnotationConfigApplicationContext getContext(String name) { if (!this.contexts.containsKey(name)) { synchronized (this.contexts) { if (!this.contexts.containsKey(name)) { //private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>(); // contexts 是一个 key 是 服务名 value 为一个容器的 map 对象 // 跟进 createContext() this.contexts.put(name, createContext(name)); } } } return this.contexts.get(name); }

    2.3.2.7 createContext()

    2.3.2.6 中的 createContext(name)

    protected AnnotationConfigApplicationContext createContext(String name) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); if (this.configurations.containsKey(name)) { for (Class<?> configuration : this.configurations.get(name) .getConfiguration()) { context.register(configuration); } } for (Map.Entry<String, C> entry : this.configurations.entrySet()) { if (entry.getKey().startsWith("default.")) { for (Class<?> configuration : entry.getValue().getConfiguration()) { context.register(configuration); } } } context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType); context.getEnvironment().getPropertySources().addFirst(new MapPropertySource( this.propertySourceName, Collections.<String, Object> singletonMap(this.propertyName, name))); if (this.parent != null) { // 这个类实现了 ApplicationContextAware 接口 说明他讲spring 的上下文容器设置成父容器 context.setParent(this.parent); } context.setDisplayName(generateDisplayName(name)); // 走的是spring 刷新那一套 context.refresh(); return context; }

    这个方法是 创建一个容器同时将一些配置文件 设置到该容器中 同时会刷新 容器

    2.3.2.8 loadBalance()

    回到2.3.2.2 (T) loadBalance(builder, context, new HardCodedTarget<>(this.type, this.name, url));

    protected <T> T loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget<T> target) { // 会往容器中注入LoadBalancerFeignClient 这个Client 实例 Client client = getOptional(context, Client.class); if (client != null) { builder.client(client); Targeter targeter = get(context, Targeter.class); // 跟进 这个方法 return targeter.target(this, builder, context, target); } throw new IllegalStateException( "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?"); }

    2.3.2.9 target()

    2.3.2.8 targeter.target(this, builder, context, target);

    @Override public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context, Target.HardCodedTarget<T> target) { if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) { return feign.target(target); } feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign; SetterFactory setterFactory = getOptional(factory.getName(), context, SetterFactory.class); if (setterFactory != null) { builder.setterFactory(setterFactory); } Class<?> fallback = factory.getFallback(); if (fallback != void.class) { return targetWithFallback(factory.getName(), context, target, builder, fallback); } Class<?> fallbackFactory = factory.getFallbackFactory(); if (fallbackFactory != void.class) { return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory); } // 默认走这个分支 继续跟进去 return feign.target(target); }

    2.3.2.10 Feign.java ->target()

    2.3.2.9 feign.target(target);

    public <T> T target(Target<T> target) { return build().newInstance(target); }

    2.3.2.11 HystrixFeign.java ->build()

    2.3.2.10 build()

    @Override public Feign build() { return build(null); }

    2.3.2.12 HystrixFeign.java ->build()

    2.3.2.11 build()

    Feign build(final FallbackFactory<?> nullableFallbackFactory) { super.invocationHandlerFactory(new InvocationHandlerFactory() { @Override public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) { return new HystrixInvocationHandler(target, dispatch, setterFactory, nullableFallbackFactory); } }); super.contract(new HystrixDelegatingContract(contract)); return super.build(); }

    这是一个 匿名方法 ,会创建一个 HystrixInvocationHandler 的动态代理 对象工厂 ,同时创建 contract 这个是方法入参和形参绑定的插件

    2.3.2.13 Feign.java ->newInstance()

    2.3.2.10 newInstance()

    public <T> T newInstance(Target<T> target) { // 获取到 带有@FeignClient 接口的所有方法 将参数类型进行绑定 ,并创建一个map对象 Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target); Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>(); List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>(); for (Method method : target.type().getMethods()) { if (method.getDeclaringClass() == Object.class) { continue; } else if(Util.isDefault(method)) { DefaultMethodHandler handler = new DefaultMethodHandler(method); defaultMethodHandlers.add(handler); methodToHandler.put(method, handler); } else { methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method))); } } InvocationHandler handler = factory.create(target, methodToHandler); // 通过工厂方法创建 HystrixInvocationHandler的代理对象 T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler); // 将方法的map 和改动态代理进行绑定 for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) { defaultMethodHandler.bindTo(proxy); } return proxy; }

    小结: 容器启动后,会自动扫描指定目录下的带有@FeignClient 的接口,同时会为不同实例创建一个容器,并将 log,编解码器 参数绑定器 注入到容器中。同时会创建 jdk 代理类,bean 的那么为@FeignClient 的接口的bean,同时 每个方法会创建一个RequestTemplate ,保存在map 中,key 为完成的方法名称, 最后将jdk 动态代理和map 绑定。

    3.服务的调用过程

    容器中注入HystrixInvocationHandler 代理类,同时调用的时候会执行 其invoke()

    3.1 HystrixInvocationHandler.java

    @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { // early exit if the invoked method is from java.lang.Object // code is the same as ReflectiveFeign.FeignInvocationHandler if ("equals".equals(method.getName())) { try { Object otherHandler = args.length > 0 && args[0] != null ? Proxy.getInvocationHandler(args[0]) : null; return equals(otherHandler); } catch (IllegalArgumentException e) { return false; } } else if ("hashCode".equals(method.getName())) { return hashCode(); } else if ("toString".equals(method.getName())) { return toString(); } HystrixCommand<Object> hystrixCommand = new HystrixCommand<Object>(setterMethodMap.get(method)) { @Override protected Object run() throws Exception { try { // 这个方法是通过 hystrixCommand.execute();触发的 跟进看一下invoke() return HystrixInvocationHandler.this.dispatch.get(method).invoke(args); } catch (Exception e) { throw e; } catch (Throwable t) { throw (Error) t; } } @Override protected Object getFallback() { if (fallbackFactory == null) { return super.getFallback(); } try { Object fallback = fallbackFactory.create(getExecutionException()); Object result = fallbackMethodMap.get(method).invoke(fallback, args); if (isReturnsHystrixCommand(method)) { return ((HystrixCommand) result).execute(); } else if (isReturnsObservable(method)) { // Create a cold Observable return ((Observable) result).toBlocking().first(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return ((Single) result).toObservable().toBlocking().first(); } else if (isReturnsCompletable(method)) { ((Completable) result).await(); return null; } else { return result; } } catch (IllegalAccessException e) { // shouldn't happen as method is public due to being an interface throw new AssertionError(e); } catch (InvocationTargetException e) { // Exceptions on fallback are tossed by Hystrix throw new AssertionError(e.getCause()); } } }; if (Util.isDefault(method)) { return hystrixCommand.execute(); } else if (isReturnsHystrixCommand(method)) { return hystrixCommand; } else if (isReturnsObservable(method)) { // Create a cold Observable return hystrixCommand.toObservable(); } else if (isReturnsSingle(method)) { // Create a cold Observable as a Single return hystrixCommand.toObservable().toSingle(); } else if (isReturnsCompletable(method)) { return hystrixCommand.toObservable().toCompletable(); } return hystrixCommand.execute(); }

    3.2 SynchronousMethodHandler.java ->invoke

    @Override public Object invoke(Object[] argv) throws Throwable { // 将入参塞入到方法中的 requestTemplate 中,同时返回一个RequestTemplate RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this.retryer.clone(); while (true) { try { // 跟进 这个方法 return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } }

    3.3 SynchronousMethodHandler.java ->executeAndDecode

    3.2 中的 executeAndDecode(template);

    Object executeAndDecode(RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { // client 是初始化注册的 LoadBalancerFeignClient.java response = client.execute(request, options); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); boolean shouldClose = true; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } if (Response.class == metadata.returnType()) { if (response.body() == null) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false; return response; } // Ensure the response body is disconnected byte[] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300) { if (void.class == metadata.returnType()) { return null; } else { Object result = decode(response); shouldClose = closeAfterDecode; return result; } } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { Object result = decode(response); shouldClose = closeAfterDecode; return result; } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } }

    3.4 LoadBalancerFeignClient.java ->execute()

    3.3 中的 response = client.execute(request, options);

    @Override public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); //进入这里 这是和 ribbon 有关的入口 IClientConfig requestConfig = getClientConfig(options, clientName); return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } }

    3.4.1 LoadBalancerFeignClient.java->getClientConfig

    3.4 IClientConfig requestConfig = getClientConfig(options, clientName);

    IClientConfig getClientConfig(Request.Options options, String clientName) { IClientConfig requestConfig; if (options == DEFAULT_OPTIONS) { // 默认进入这个分支 clientName 为服务名称(注册到eureka上的) requestConfig = this.clientFactory.getClientConfig(clientName); } else { requestConfig = new FeignOptionsClientConfig(options); } return requestConfig; }

    3.4.1.1 SpringClientFactory.java ->getClientConfig

    3.4.1 requestConfig = this.clientFactory.getClientConfig(clientName);

    public IClientConfig getClientConfig(String name) { return (IClientConfig)this.getInstance(name, IClientConfig.class); }

    3.4.1.2 SpringClientFactory.java ->getInstance

    public <C> C getInstance(String name, Class<C> type) { // 默认走这个分支 C instance = super.getInstance(name, type); if (instance != null) { return instance; } else { IClientConfig config = (IClientConfig)this.getInstance(name, IClientConfig.class); return instantiateWithConfig(this.getContext(name), type, config); } }

    3.4.1.3 NamedContextFactory.java ->getInstance

    3.4.1.2 C instance = super.getInstance(name, type);

    public <T> T getInstance(String name, Class<T> type) { // 进入 AnnotationConfigApplicationContext context = getContext(name); if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0) { return context.getBean(type); } return null; }

    3.4.1.4 SpringClientFactory.java ->getContext

    3.4.13 AnnotationConfigApplicationContext context = getContext(name);

    protected AnnotationConfigApplicationContext getContext(String name) { // 继续跟进 return super.getContext(name); }

    3.4.1.5 NamedContextFactory.java ->getContext

    3.4.1.4 super.getContext(name);

    protected AnnotationConfigApplicationContext getContext(String name) { if (!this.contexts.containsKey(name)) { synchronized (this.contexts) { if (!this.contexts.containsKey(name)) { // private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>(); // key 为注册到eueka的服务名称 value 为一个容器 // 第一次跟进这个容器看一下 this.contexts.put(name, createContext(name)); } } } return this.contexts.get(name); }

    3.4.1.6 NamedContextFactory.java ->createContext(name)

    3.4.1.5 createContext(name)

    protected AnnotationConfigApplicationContext createContext(String name) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); if (this.configurations.containsKey(name)) { for (Class<?> configuration : this.configurations.get(name) .getConfiguration()) { context.register(configuration); } } for (Map.Entry<String, C> entry : this.configurations.entrySet()) { if (entry.getKey().startsWith("default.")) { for (Class<?> configuration : entry.getValue().getConfiguration()) { context.register(configuration); } } } context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType); // 这个propertyName 为1.4 RibbonClientConfiguration.class // ribbon 在启动时候 将 propertySourceName 属性赋值的 context.getEnvironment().getPropertySources().addFirst(new MapPropertySource( this.propertySourceName, Collections.<String, Object> singletonMap(this.propertyName, name))); if (this.parent != null) { // Uses Environment from parent as well as beans context.setParent(this.parent); } context.setDisplayName(generateDisplayName(name)); // 刷新容器 context.refresh(); return context; }

    从以上代码可以看书 每个eurek 注册的服务会增加一个 容器,容器中是ribbon 相关的属性,第一次调用的时候会创建 容器。容器刷新的时候 会实例化RibbonClientConfiguration.class 。暂时进入到RibbonClientConfiguration.class 看一下里面有哪些属性

    @Configuration @EnableConfigurationProperties @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class}) public class RibbonClientConfiguration { @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : // 默认走的是 new ZoneAwareLoadBalancer 这个分支 new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater)); } }

    跟进 new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater)); 进入 ZoneAwareLoadBalancer.java 看一下实例化的过程

    public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping, serverList, filter, serverListUpdater); }

    跟进 super(clientConfig, rule, ping, serverList, filter, serverListUpdater); 实例化 ZoneAwareLoadBalancer 之前 必定先实例化 父类DynamicServerListLoadBalancer.java。

    public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { // 一大坨 继续跟进 super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } // 这个地方打个标记 一会回来看一下 restOfInit(clientConfig); }

    跟进 super(clientConfig, rule, ping) BaseLoadBalancer.java 的实例化方法

    public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) { // 实例化 会执行他 看起来像是一个 配置 负载策略及其ping 的规则 initWithConfig(config, rule, ping); } void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping) { this.config = clientConfig; String clientName = clientConfig.getClientName(); this.name = clientName; int pingIntervalTime = Integer.parseInt("" + clientConfig.getProperty( CommonClientConfigKey.NFLoadBalancerPingInterval, Integer.parseInt("30"))); int maxTotalPingTime = Integer.parseInt("" + clientConfig.getProperty( CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime, Integer.parseInt("2"))); setPingInterval(pingIntervalTime); setMaxTotalPingTime(maxTotalPingTime); // cross associate with each other // i.e. Rule,Ping meet your container LB // LB, these are your Ping and Rule guys ... // 默认配置 ZoneAvoidanceRule.java 这种负载均衡策略 setRule(rule); // setPing(ping); setLoadBalancerStats(new LoadBalancerStats(clientName)); rule.setLoadBalancer(this); if (ping instanceof AbstractLoadBalancerPing) { ((AbstractLoadBalancerPing) ping).setLoadBalancer(this); } logger.info("Client: {} instantiated a LoadBalancer: {}", name, this); boolean enablePrimeConnections = clientConfig.get( CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS); if (enablePrimeConnections) { this.setEnablePrimingConnections(true); PrimeConnections primeConnections = new PrimeConnections( this.getName(), clientConfig); this.setPrimeConnections(primeConnections); } init(); }

    跟进setPing(ping); BaseLoadBalancer.java->setping()

    public void setPing(IPing ping) { if (ping != null) { if (!ping.equals(this.ping)) { this.ping = ping; // 默认走这个分支 setupPingTask(); // since ping data changed } } else { this.ping = null; // cancel the timer task lbTimer.cancel(); } }

    setupPingTask();

    void setupPingTask() { // 默认走 false 分支 if (canSkipPing()) { return; } if (lbTimer != null) { lbTimer.cancel(); } lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); // 创建一个定时job lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); // 强制执行 forceQuickPing(); }

    跟进 new PingTask() 定时job 30s 回执行一次 runPinger()

    class PingTask extends TimerTask { public void run() { try { new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error pinging", name, e); } } }

    setupPingTask() 中, forceQuickPing()

    public void forceQuickPing() { if (canSkipPing()) { return; } logger.debug("LoadBalancer [{}]: forceQuickPing invoking", name); try { // 也会执行 runPinger() new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e); } }

    说明 第一次会强制执行runPinger() ,完事启动一个定时job 每隔30s执行一次

    public void runPinger() throws Exception { if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do } // we are "in" - we get to Ping Server[] allServers = null; boolean[] results = null; Lock allLock = null; Lock upLock = null; try { /* * The readLock should be free unless an addServer operation is * going on... */ allLock = allServerLock.readLock(); allLock.lock(); allServers = allServerList.toArray(new Server[allServerList.size()]); allLock.unlock(); int numCandidates = allServers.length; // 检查 allServers 中的状态,跟进一下 pingServers() results = pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); notifyServerStatusChangeListener(changedServers); } finally { pingInProgress.set(false); } } }

    跟进 results = pingerStrategy.pingServers(ping, allServers);

    /** * Default implementation for <c>IPingStrategy</c>, performs ping * serially, which may not be desirable, if your <c>IPing</c> * implementation is slow, or you have large number of servers. */ private static class SerialPingStrategy implements IPingStrategy { @Override public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates); for (int i = 0; i < numCandidates; i++) { results[i] = false; /* Default answer is DEAD. */ try { // NOTE: IFF we were doing a real ping // assuming we had a large set of servers (say 15) // the logic below will run them serially // hence taking 15 times the amount of time it takes // to ping each server // A better method would be to put this in an executor // pool // But, at the time of this writing, we dont REALLY // use a Real Ping (its mostly in memory eureka call) // hence we can afford to simplify this design and run // this // serially if (ping != null) { // 逐一检查 查看状态 results[i] = ping.isAlive(servers[i]); } } catch (Exception e) { logger.error("Exception while pinging Server: '{}'", servers[i], e); } } return results; } }

    跟进 results[i] = ping.isAlive(servers[i]); NIWSDiscoveryPing.java -> isAlive()

    public boolean isAlive(Server server) { boolean isAlive = true; if (server!=null && server instanceof DiscoveryEnabledServer){ DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server; // eureka 实例 InstanceInfo instanceInfo = dServer.getInstanceInfo(); if (instanceInfo!=null){ InstanceStatus status = instanceInfo.getStatus(); if (status!=null){ isAlive = status.equals(InstanceStatus.UP); } } } return isAlive; }

    小结一下: 每隔30s 检查一次 ,查看每隔服务在eureka 中的状态信息。对于存活的就放在 servList 中。

    回到 前面 DynamicServerListLoadBalancer.java -> restOfInit(clientConfig);

    void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); // 第一次会拉取eureka 全量信息服务信息 enableAndInitLearnNewServersFeature(); // 跟新server 信息 server 那边服务的保护策略 有些下线的服务没有剔除 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }

    跟进 enableAndInitLearnNewServersFeature() DynamicServerListLoadBalancer.java ->enableAndInitLearnNewServersFeature

    public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); }

    PollingServerListUpdater.java ->start 跟进 serverListUpdater.start(updateAction);

    @Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { // 跟进看一下这个方法 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; // 启动后 1s 后执行 默认30s 会执行一次 updateAction.doUpdate(); 方法 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }

    跟进 updateAction.doUpdate(); DynamicServerListLoadBalancer.java ->doUpdate

    class NamelessClass_1 implements UpdateAction { NamelessClass_1() { } public void doUpdate() { DynamicServerListLoadBalancer.this.updateListOfServers(); } }

    跟进 updateListOfServers() DynamicServerListLoadBalancer.java ->updateListOfServers

    @VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); // 会进到这个分支 if (serverListImpl != null) { // 先进到这里 servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }

    servers = serverListImpl.getUpdatedListOfServers(); DomainExtractingServerList.java ->getUpdatedListOfServers

    @Override public List<DiscoveryEnabledServer> getUpdatedListOfServers() { List<DiscoveryEnabledServer> servers = setZones(this.list .getUpdatedListOfServers()); return servers; }

    DiscoveryEnabledNIWSServerList.java->getUpdatedListOfServers()

    @Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } // 在obtainServersViaDiscovery 调用的 private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }

    小结: 每隔30s 会执行一下 ,从eureka 拉取 server 信息,或从客户端直接获取或者从server 拉取。 回到DynamicServerListLoadBalancer.java -> restOfInit(clientConfig); 中的 updateListOfServers();

    protected void updateAllServerList(List<T> ls) { // other threads might be doing this - in which case, we pass if (serverListUpdateInProgress.compareAndSet(false, true)) { try { for (T s : ls) { s.setAlive(true); // set so that clients can start using these // servers right away instead // of having to wait out the ping cycle. } setServersList(ls); // 强制检查每一个server 看他的状态是不是down super.forceQuickPing(); } finally { serverListUpdateInProgress.set(false); } } }

    小结一下: 容器在刷新的时候会注入DynamicServerListLoadBalancer 对象到子容器,同时在实例化的时候,会创建两个定时任务,默认都是30s 执行一次,会通过eurek client 获取实例信息,领完一个定时任务会 刷新实例列表,检查是不是down 状态。 回到3.4 中

    3.4.2 AbstractLoadBalancerAwareClient.java ->executeWithLoadBalancer

    回到 LoadBalancerFeignClient.java 中的 execute 中的executeWithLoadBalancer

    // 这个方法有两个目的 1; 根据服务名及其负载策略定位及其 // 实现远程调用 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { // 这里是一个匿名函数 return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }

    3.4.2.1 LoadBalancerCommand.java ->submit()

    这里是选择server

    // 整体采用Rxjava 1.0 响应式编程 实现的 public Observable<T> submit(final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer Observable<T> o = // 创建观察这的时候 selectServer() 负载原理 进去看一下 (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { @Override // Called for each server being selected public Observable<T> call(Server server) { context.setServer(server); final ServerStats stats = loadBalancerContext.getServerStats(server); // Called for each attempt and retry Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); return operation.call(server).doOnEach(new Observer<T>() { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // TODO: What to do if onNext or onError are never called? } @Override public void onError(Throwable e) { recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); // 重试 if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { if (context.getAttemptCount() > 0) { if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }

    3.4.2.2 LoadBalancerCommand.java ->selectServer()

    3.4.2.1 selectServer()

    private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { //跟进这个方法,loadBalancerKey 为null,loadBalancerURI:不带服务名的uri Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); }

    3.4.2.3 LoadBalancerContext.java ->getServerFromLoadBalancer()

    public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException { String host = null; int port = -1; if (original != null) { host = original.getHost(); } if (original != null) { Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original); port = schemeAndPort.second(); } // 注入的DynamicServerListLoadBalancer.java bean对象 ILoadBalancer lb = getLoadBalancer(); if (host == null) { // Partial URI or no URI Case // well we have to just get the right instances from lb - or we fall back if (lb != null){ // 选server的部分 Server svc = lb.chooseServer(loadBalancerKey); if (svc == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Load balancer does not have available server for client: " + clientName); } host = svc.getHost(); if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Invalid Server for :" + svc); } logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original}); return svc; } else { // No Full URL - and we dont have a LoadBalancer registered to // obtain a server // if we have a vipAddress that came with the registration, we // can use that else we // bail out if (vipAddresses != null && vipAddresses.contains(",")) { throw new ClientException( ClientException.ErrorType.GENERAL, "Method is invoked for client " + clientName + " with partial URI of (" + original + ") with no load balancer configured." + " Also, there are multiple vipAddresses and hence no vip address can be chosen" + " to complete this partial uri"); } else if (vipAddresses != null) { try { Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses); host = hostAndPort.first(); port = hostAndPort.second(); } catch (URISyntaxException e) { throw new ClientException( ClientException.ErrorType.GENERAL, "Method is invoked for client " + clientName + " with partial URI of (" + original + ") with no load balancer configured. " + " Also, the configured/registered vipAddress is unparseable (to determine host and port)"); } } else { throw new ClientException( ClientException.ErrorType.GENERAL, this.clientName + " has no LoadBalancer registered and passed in a partial URL request (with no host:port)." + " Also has no vipAddress registered"); } } } else { // Full URL Case // This could either be a vipAddress or a hostAndPort or a real DNS // if vipAddress or hostAndPort, we just have to consult the loadbalancer // but if it does not return a server, we should just proceed anyways // and assume its a DNS // For restClients registered using a vipAddress AND executing a request // by passing in the full URL (including host and port), we should only // consult lb IFF the URL passed is registered as vipAddress in Discovery boolean shouldInterpretAsVip = false; if (lb != null) { shouldInterpretAsVip = isVipRecognized(original.getAuthority()); } if (shouldInterpretAsVip) { Server svc = lb.chooseServer(loadBalancerKey); if (svc != null){ host = svc.getHost(); if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL, "Invalid Server for :" + svc); } logger.debug("using LB returned Server: {} for request: {}", svc, original); return svc; } else { // just fall back as real DNS logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port); } } else { // consult LB to obtain vipAddress backed instance given full URL //Full URL execute request - where url!=vipAddress logger.debug("Using full URL passed in by caller (not using load balancer): {}", original); } } // end of creating final URL if (host == null){ throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to"); } // just verify that at this point we have a full URL return new Server(host, port); }

    3.4.2.4 ZoneAwareLoadBalancer.java ->chooseServer

    3.4.2.3 Server svc = lb.chooseServer(loadBalancerKey);

    @Override public Server chooseServer(Object key) { if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); // 默认走这个分支 return super.chooseServer(key); } Server server = null; try { LoadBalancerStats lbStats = getLoadBalancerStats(); Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); if (triggeringLoad == null) { triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d); } if (triggeringBlackoutPercentage == null) { triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d); } Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); server = zoneLoadBalancer.chooseServer(key); } } } catch (Exception e) { logger.error("Error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } }

    3.4.2.5 BaseLoadBalancer.java ->chooseServer

    3.4.2.4 return super.chooseServer(key);

    public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { // 进入看一下 return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }

    3.4.2.6 PredicateBasedRule.java->choose

    public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }

    3.4.2.7 AbstractServerPredicate.java ->chooseRoundRobinAfterFiltering

    /** * Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key. */ public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } // 看一下 incrementAndGetModulo() return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); }

    3.4.2.8 AbstractServerPredicate.java-> incrementAndGetModulo()

    private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current + 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } }

    小结 ; 这里是负载均衡 轮训的实现 ,代码很简单 文字不好描述。大概就是 每次除以serverlist 求余,然后从List 中获取.

    3.4.3 RetryableFeignLoadBalancer.java->execute

    3.4.2 AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)

    public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride) throws IOException { final Request.Options options; if (configOverride != null) { RibbonProperties ribbon = RibbonProperties.from(configOverride); options = new Request.Options( ribbon.connectTimeout(this.connectTimeout), ribbon.readTimeout(this.readTimeout)); } else { options = new Request.Options(this.connectTimeout, this.readTimeout); } final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this); RetryTemplate retryTemplate = new RetryTemplate(); BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName()); retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy); RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName()); if (retryListeners != null && retryListeners.length != 0) { retryTemplate.setListeners(retryListeners); } retryTemplate.setRetryPolicy(retryPolicy == null ? new NeverRetryPolicy() : new FeignRetryPolicy(request.toHttpRequest(), retryPolicy, this, this.getClientName())); return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() { @Override public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException { Request feignRequest = null; //on retries the policy will choose the server and set it in the context //extract the server and update the request being made if (retryContext instanceof LoadBalancedRetryContext) { ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance(); if (service != null) { feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest(); } } if (feignRequest == null) { feignRequest = request.toRequest(); } // 这里发起http 请求 Response response = request.client().execute(feignRequest, options); if (retryPolicy.retryableStatusCode(response.status())) { byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream()); response.close(); throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response, byteArray, request.getUri()); } return new RibbonResponse(request.getUri(), response); } }, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() { @Override protected RibbonResponse createResponse(Response response, URI uri) { return new RibbonResponse(uri, response); } }); }

    3.4.3.1 Default.java ->execute

    @Override public Response execute(Request request, Options options) throws IOException { HttpURLConnection connection = convertAndSend(request, options); return convertResponse(connection).toBuilder().request(request).build(); } HttpURLConnection convertAndSend(Request request, Options options) throws IOException { final HttpURLConnection connection = (HttpURLConnection) new URL(request.url()).openConnection(); if (connection instanceof HttpsURLConnection) { HttpsURLConnection sslCon = (HttpsURLConnection) connection; if (sslContextFactory != null) { sslCon.setSSLSocketFactory(sslContextFactory); } if (hostnameVerifier != null) { sslCon.setHostnameVerifier(hostnameVerifier); } } connection.setConnectTimeout(options.connectTimeoutMillis()); connection.setReadTimeout(options.readTimeoutMillis()); connection.setAllowUserInteraction(false); connection.setInstanceFollowRedirects(options.isFollowRedirects()); connection.setRequestMethod(request.method()); Collection<String> contentEncodingValues = request.headers().get(CONTENT_ENCODING); boolean gzipEncodedRequest = contentEncodingValues != null && contentEncodingValues.contains(ENCODING_GZIP); boolean deflateEncodedRequest = contentEncodingValues != null && contentEncodingValues.contains(ENCODING_DEFLATE); boolean hasAcceptHeader = false; Integer contentLength = null; for (String field : request.headers().keySet()) { if (field.equalsIgnoreCase("Accept")) { hasAcceptHeader = true; } for (String value : request.headers().get(field)) { if (field.equals(CONTENT_LENGTH)) { if (!gzipEncodedRequest && !deflateEncodedRequest) { contentLength = Integer.valueOf(value); connection.addRequestProperty(field, value); } } else { connection.addRequestProperty(field, value); } } } // Some servers choke on the default accept string. if (!hasAcceptHeader) { connection.addRequestProperty("Accept", "*/*"); } if (request.body() != null) { if (contentLength != null) { connection.setFixedLengthStreamingMode(contentLength); } else { connection.setChunkedStreamingMode(8196); } connection.setDoOutput(true); OutputStream out = connection.getOutputStream(); if (gzipEncodedRequest) { out = new GZIPOutputStream(out); } else if (deflateEncodedRequest) { out = new DeflaterOutputStream(out); } try { out.write(request.body()); } finally { try { out.close(); } catch (IOException suppressed) { // NOPMD } } } return connection; } Response convertResponse(HttpURLConnection connection) throws IOException { int status = connection.getResponseCode(); String reason = connection.getResponseMessage(); if (status < 0) { throw new IOException(format("Invalid status(%s) executing %s %s", status, connection.getRequestMethod(), connection.getURL())); } Map<String, Collection<String>> headers = new LinkedHashMap<String, Collection<String>>(); for (Map.Entry<String, List<String>> field : connection.getHeaderFields().entrySet()) { // response message if (field.getKey() != null) { headers.put(field.getKey(), field.getValue()); } } Integer length = connection.getContentLength(); if (length == -1) { length = null; } InputStream stream; if (status >= 400) { stream = connection.getErrorStream(); } else { stream = connection.getInputStream(); } return Response.builder() .status(status) .reason(reason) .headers(headers) .body(stream, length) .build(); } }

    以上是client 发起请求,过程 包括序列化。 总结: 容器启动时候,会为每一个eureka 注册的实例 注册一个openfeigin 的配置容器,同时创建一个jdk 生成的动态代理,同时将每个方法 放到map 中,同时每个方法就是一个requestTemplate. ribbbon 启动时候 会在容器中 生成SpringClientFactory 对象。找到动态代理的invoke 方法,并且匹配map中的方法,在服务第一次调用的时候 对每个eureka实例,生成一个容器,容器在刷新的时候,会生成两个30s 执行一次的定时任务。一个是30s从eureka 拉取实例信息,另外一个将下线的服务剔除服务列表。最后根据ribbon 容器总的负载策略选择一个server,最后发起服务调用。

    Processed: 0.016, SQL: 9