在Riibon中一个非常重要的组件为LoadBalancerClient,在spring-cloud-commons包下。 LoadBalancerClient是一个接口,它继承ServiceInstanceChooser,它的实现类是RibbonLoadBalancerClient,这三者之间的关系如下图:
LoaderBalancerClient是在自动装配组件RibbonAutoConfiguration中进行装配的,注入的是RibbonLoadBalancerClient实现类,负载均衡的请求处理就是由它来完成,看RibbonAutoConfiguration的源码:
@Configuration @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class) @RibbonClients @AutoConfigureAfter( name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration") @AutoConfigureBefore({ LoadBalancerAutoConfiguration.class, AsyncLoadBalancerAutoConfiguration.class }) @EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class }) public class RibbonAutoConfiguration { @Autowired(required = false) private List<RibbonClientSpecification> configurations = new ArrayList<>(); @Autowired private RibbonEagerLoadProperties ribbonEagerLoadProperties; @Bean public HasFeatures ribbonFeature() { return HasFeatures.namedFeature("Ribbon", Ribbon.class); } @Bean @ConditionalOnMissingBean public SpringClientFactory springClientFactory() { SpringClientFactory factory = new SpringClientFactory(); factory.setConfigurations(this.configurations); return factory; } @Bean @ConditionalOnMissingBean(LoadBalancerClient.class) public LoadBalancerClient loadBalancerClient() { return new RibbonLoadBalancerClient(springClientFactory()); } @Bean @ConditionalOnClass(name = "org.springframework.retry.support.RetryTemplate") @ConditionalOnMissingBean public LoadBalancedRetryFactory loadBalancedRetryPolicyFactory( final SpringClientFactory clientFactory) { return new RibbonLoadBalancedRetryFactory(clientFactory); } @Bean @ConditionalOnMissingBean public PropertiesFactory propertiesFactory() { return new PropertiesFactory(); } @Bean @ConditionalOnProperty("ribbon.eager-load.enabled") public RibbonApplicationContextInitializer ribbonApplicationContextInitializer() { return new RibbonApplicationContextInitializer(springClientFactory(), ribbonEagerLoadProperties.getClients()); } @Configuration(proxyBeanMethods = false) @ConditionalOnClass(HttpRequest.class) @ConditionalOnRibbonRestClient protected static class RibbonClientHttpRequestFactoryConfiguration { @Autowired private SpringClientFactory springClientFactory; @Bean public RestTemplateCustomizer restTemplateCustomizer( final RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory) { return restTemplate -> restTemplate .setRequestFactory(ribbonClientHttpRequestFactory); } @Bean public RibbonClientHttpRequestFactory ribbonClientHttpRequestFactory() { return new RibbonClientHttpRequestFactory(this.springClientFactory); } } // TODO: support for autoconfiguring restemplate to use apache http client or okhttp @Target({ ElementType.TYPE, ElementType.METHOD }) @Retention(RetentionPolicy.RUNTIME) @Documented @Conditional(OnRibbonRestClientCondition.class) @interface ConditionalOnRibbonRestClient { } private static class OnRibbonRestClientCondition extends AnyNestedCondition { OnRibbonRestClientCondition() { super(ConfigurationPhase.REGISTER_BEAN); } @Deprecated // remove in Edgware" @ConditionalOnProperty("ribbon.http.client.enabled") static class ZuulProperty { } @ConditionalOnProperty("ribbon.restclient.enabled") static class RibbonProperty { } } /** * {@link AllNestedConditions} that checks that either multiple classes are present. */ static class RibbonClassesConditions extends AllNestedConditions { RibbonClassesConditions() { super(ConfigurationPhase.PARSE_CONFIGURATION); } @ConditionalOnClass(IClient.class) static class IClientPresent { } @ConditionalOnClass(RestTemplate.class) static class RestTemplatePresent { } @SuppressWarnings("deprecation") @ConditionalOnClass(AsyncRestTemplate.class) static class AsyncRestTemplatePresent { } @ConditionalOnClass(Ribbon.class) static class RibbonPresent { } } }假如我们直接使用LoadBalancerClient去进行负载均衡调用的话,如以下示例:
ServiceInstance serviceInstance = loadBalancerClient.choose("order-demo"); String clientPath = "/orders"; URI client = URI.create(String.format("http://%s:%s/%s", serviceInstance.getHost(), serviceInstance.getPort(), clientPath));choose方法会返回一个ServiceInstance,它到底干了些什么呢?接下来看RibbonLoadBalancerCilent的choose源码:
@Override public ServiceInstance choose(String serviceId) { return choose(serviceId, null); } /** * New: Select a server using a 'key'. * @param serviceId of the service to choose an instance for * @param hint to specify the service instance * @return the selected {@link ServiceInstance} */ public ServiceInstance choose(String serviceId, Object hint) { Server server = getServer(getLoadBalancer(serviceId), hint); if (server == null) { return null; } return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); } protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } protected Server getServer(ILoadBalancer loadBalancer, Object hint) { if (loadBalancer == null) { return null; } // Use 'default' on a null hint, or just pass it on? return loadBalancer.chooseServer(hint != null ? hint : "default"); }1.通过传入的serviceId获取一个负载均衡器ILoadBalancer
2.通过ILoadBalancer负载均衡器中获取一个具体的Server示例
3.用RibbonServer进行包装
那我们看看ILoadBalancer的实现类:
ILoadBalancer:定义负载均衡器操作的接口,方法包含:addServers、chooseServer、markServerDown、getServerList、getReachableServers、getAllServers
AbstractLoadBalancer: AbstractLoadBalancer包含大多数负载平衡实现所需的特性。比如获取ServerGroup的的服务列表
BaseLoadBalancer:负载平衡器的基本实现,可以将任意服务器列表设置为服务器池,在内部,这个类维护一个“all”服务器列表和一个“up”服务器列表,并根据调用者的要求使用它们。
NoOpLoadBalancer:这个实现的方法都返回空,不能做任何事情
DynamicServerListLoadBalancer: 能够使用动态源获取候选服务器列表。例如,服务器列表可以在运行时更改。它还包含一些工具,其中服务器列表可以通过一个筛选标准来过滤不符合条件的服务器
符合要求的标准。
ZoneAwareLoadBalancer: 在选择服务器时可以避免一个区域作为一个整体。
在RibbonAutoConfiguration中定义了一个SpringClientFactory的Bean,其构造函数就定义了一个RibbonClientConfiguration的配置类,当该配置类加载时初始化了LoadBalaner相关的Bean:
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties // Order is important here, last should be the default, first should be optional // see // https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class }) public class RibbonClientConfiguration { /** * Ribbon client default connect timeout. */ public static final int DEFAULT_CONNECT_TIMEOUT = 1000; /** * Ribbon client default read timeout. */ public static final int DEFAULT_READ_TIMEOUT = 1000; /** * Ribbon client default Gzip Payload flag. */ public static final boolean DEFAULT_GZIP_PAYLOAD = true; @RibbonClientName private String name = "client"; // TODO: maybe re-instate autowired load balancers: identified by name they could be // associated with ribbon clients @Autowired private PropertiesFactory propertiesFactory; @Bean @ConditionalOnMissingBean public IClientConfig ribbonClientConfig() { DefaultClientConfigImpl config = new DefaultClientConfigImpl(); config.loadProperties(this.name); config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT); config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT); config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD); return config; } @Bean @ConditionalOnMissingBean public IRule ribbonRule(IClientConfig config) { if (this.propertiesFactory.isSet(IRule.class, name)) { return this.propertiesFactory.get(IRule.class, config, name); } ZoneAvoidanceRule rule = new ZoneAvoidanceRule(); rule.initWithNiwsConfig(config); return rule; } @Bean @ConditionalOnMissingBean public IPing ribbonPing(IClientConfig config) { if (this.propertiesFactory.isSet(IPing.class, name)) { return this.propertiesFactory.get(IPing.class, config, name); } return new DummyPing(); } @Bean @ConditionalOnMissingBean @SuppressWarnings("unchecked") public ServerList<Server> ribbonServerList(IClientConfig config) { if (this.propertiesFactory.isSet(ServerList.class, name)) { return this.propertiesFactory.get(ServerList.class, config, name); } ConfigurationBasedServerList serverList = new ConfigurationBasedServerList(); serverList.initWithNiwsConfig(config); return serverList; } @Bean @ConditionalOnMissingBean public ServerListUpdater ribbonServerListUpdater(IClientConfig config) { return new PollingServerListUpdater(config); } @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) { return this.propertiesFactory.get(ILoadBalancer.class, config, name); } return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList, serverListFilter, serverListUpdater); } @Bean @ConditionalOnMissingBean @SuppressWarnings("unchecked") public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) { if (this.propertiesFactory.isSet(ServerListFilter.class, name)) { return this.propertiesFactory.get(ServerListFilter.class, config, name); } ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter(); filter.initWithNiwsConfig(config); return filter; } @Bean @ConditionalOnMissingBean public RibbonLoadBalancerContext ribbonLoadBalancerContext(ILoadBalancer loadBalancer, IClientConfig config, RetryHandler retryHandler) { return new RibbonLoadBalancerContext(loadBalancer, config, retryHandler); } @Bean @ConditionalOnMissingBean public RetryHandler retryHandler(IClientConfig config) { return new DefaultLoadBalancerRetryHandler(config); } @Bean @ConditionalOnMissingBean public ServerIntrospector serverIntrospector() { return new DefaultServerIntrospector(); } @PostConstruct public void preprocess() { setRibbonProperty(name, DeploymentContextBasedVipAddresses.key(), name); } static class OverrideRestClient extends RestClient { private IClientConfig config; private ServerIntrospector serverIntrospector; protected OverrideRestClient(IClientConfig config, ServerIntrospector serverIntrospector) { super(); this.config = config; this.serverIntrospector = serverIntrospector; initWithNiwsConfig(this.config); } @Override public URI reconstructURIWithServer(Server server, URI original) { URI uri = updateToSecureConnectionIfNeeded(original, this.config, this.serverIntrospector, server); return super.reconstructURIWithServer(server, uri); } @Override protected Client apacheHttpClientSpecificInitialization() { ApacheHttpClient4 apache = (ApacheHttpClient4) super.apacheHttpClientSpecificInitialization(); apache.getClientHandler().getHttpClient().getParams().setParameter( ClientPNames.COOKIE_POLICY, CookiePolicy.IGNORE_COOKIES); return apache; } } }RibbonClientConfiguration作用 初始化ribbonRule: ZoneAvoidanceRule 初始化ribbonPing:DummyPing 初始化ribbonServerList:ConfigurationBasedServerList 初始化ServerListUpdater:new PollingServerListUpdater(config) 初始化ILoadBalancer:ZoneAwareLoadBalancer 初始化ribbonServerListFilter:ZonePreferenceServerListFilter 初始化ribbonLoadBalancerContext:RibbonLoadBalancerContext 初始化serverIntrospector:DefaultServerIntrospector
由此LoadBalancer的默认实现为ZoneAwareLoadBalancer,接着从ZoneAwareLoadBalancer中调用chooseServer去获取具体的实例:
@Override public Server chooseServer(Object key) { if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); return super.chooseServer(key); } Server server = null; try { LoadBalancerStats lbStats = getLoadBalancerStats(); Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); if (triggeringLoad == null) { triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d); } if (triggeringBlackoutPercentage == null) { triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d); } Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); server = zoneLoadBalancer.chooseServer(key); } } } catch (Exception e) { logger.error("Error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } }默认进入BaseLoadBalancer的chooseServer(key)方法,该方法中调用了rule.choose(key)方法:
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } } private final Counter createCounter() { return Monitors.newCounter("LoadBalancer_ChooseServer"); }上面已经解析出rule默认为ZoneAvoidanceRule,但是该类没有实现choose方法,而是由其父类PredicateBasedRule的choose方法执行:
public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } }此处就是真正的从服务列表中获取具体的实例的地方,默认的算法其实很简单:
1.获取实例列表
2.将AtomicInteger+1,再将值用实例的大小进行取模得到列表的下标值并返回
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(incrementAndGetModulo(eligible.size()))); } private int incrementAndGetModulo(int modulo) { for (;;) { int current = nextIndex.get(); int next = (current + 1) % modulo; if (nextIndex.compareAndSet(current, next) && current < modulo) return current; } }最后,用实例列表和下标值就能获取具体的实例,实例获取之后用RibbonServer进行包装。
用一张流程图总结一下: