小结:spring ioc 启动时候会实例化SpringClientFactory, 同时 会实例化父类对象 NamedContextFactory。同时注入RibbonClientConfiguration.class 。这个类将来这ribbon 子容器中,会注入其他组件,后面详细说明。
进入到@EnableFeignClients 这个注解类
回到 2.3 中的 registerFeignClients(metadata, registry) 方法。
public void registerFeignClients(AnnotationMetadata metadata, BeanDefinitionRegistry registry) { // 获取 扫描对象 和 eureka 中 server 中注入9个 resouce 一样 // 这里扫描 带@FeignClient 的接口 ClassPathScanningCandidateComponentProvider scanner = getScanner(); scanner.setResourceLoader(this.resourceLoader); Set<String> basePackages; Map<String, Object> attrs = metadata .getAnnotationAttributes(EnableFeignClients.class.getName()); AnnotationTypeFilter annotationTypeFilter = new AnnotationTypeFilter( FeignClient.class); final Class<?>[] clients = attrs == null ? null : (Class<?>[]) attrs.get("clients"); if (clients == null || clients.length == 0) { scanner.addIncludeFilter(annotationTypeFilter); // 获取@EnableFeignClients的扫描包路径 basePackages = getBasePackages(metadata); } else { final Set<String> clientClasses = new HashSet<>(); basePackages = new HashSet<>(); for (Class<?> clazz : clients) { basePackages.add(ClassUtils.getPackageName(clazz)); clientClasses.add(clazz.getCanonicalName()); } AbstractClassTestingTypeFilter filter = new AbstractClassTestingTypeFilter() { @Override protected boolean match(ClassMetadata metadata) { String cleaned = metadata.getClassName().replaceAll("\\$", "."); return clientClasses.contains(cleaned); } }; scanner.addIncludeFilter( new AllTypeFilter(Arrays.asList(filter, annotationTypeFilter))); } for (String basePackage : basePackages) { Set<BeanDefinition> candidateComponents = scanner .findCandidateComponents(basePackage); for (BeanDefinition candidateComponent : candidateComponents) { if (candidateComponent instanceof AnnotatedBeanDefinition) { // 获取到带有 @FeignClient 的接口 AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent; AnnotationMetadata annotationMetadata = beanDefinition.getMetadata(); Assert.isTrue(annotationMetadata.isInterface(), "@FeignClient can only be specified on an interface"); Map<String, Object> attributes = annotationMetadata .getAnnotationAttributes( FeignClient.class.getCanonicalName()); String name = getClientName(attributes); registerClientConfiguration(registry, name, attributes.get("configuration")); //跟进 registerFeignClient(registry, annotationMetadata, attributes); } } } }2.3.2.2 中的 Feign.Builder builder = feign(context);
protected Feign.Builder feign(FeignContext context) { // 更进 在子容器中注册 FeignLoggerFactory 对象 FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class); Logger logger = loggerFactory.create(this.type); // 往子容器中注入其他属性 Feign.Builder builder = get(context, Feign.Builder.class) // required values .logger(logger) .encoder(get(context, Encoder.class)) .decoder(get(context, Decoder.class)) .contract(get(context, Contract.class)); // @formatter:on configureFeign(context, builder); return builder; }2.3.2.3 中的 FeignLoggerFactory loggerFactory = get(context, FeignLoggerFactory.class);
protected <T> T get(FeignContext context, Class<T> type) { // 继续跟进 T instance = context.getInstance(this.name, type); if (instance == null) { throw new IllegalStateException("No bean found of type " + type + " for " + this.name); } return instance; }2.3.2.4 中的 T instance = context.getInstance(this.name, type)
public <T> T getInstance(String name, Class<T> type) { // 首先获取到一个容器 完事从容器中获取到对应的bean ,继续跟进 AnnotationConfigApplicationContext context = getContext(name); if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0) { return context.getBean(type); } return null; }2.3.2.5 中的 AnnotationConfigApplicationContext context = getContext(name);
// 这个name 是@FeignClient 的name 属性 说明不同的name 回创建不同的createContext(); protected AnnotationConfigApplicationContext getContext(String name) { if (!this.contexts.containsKey(name)) { synchronized (this.contexts) { if (!this.contexts.containsKey(name)) { //private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>(); // contexts 是一个 key 是 服务名 value 为一个容器的 map 对象 // 跟进 createContext() this.contexts.put(name, createContext(name)); } } } return this.contexts.get(name); }2.3.2.6 中的 createContext(name)
protected AnnotationConfigApplicationContext createContext(String name) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); if (this.configurations.containsKey(name)) { for (Class<?> configuration : this.configurations.get(name) .getConfiguration()) { context.register(configuration); } } for (Map.Entry<String, C> entry : this.configurations.entrySet()) { if (entry.getKey().startsWith("default.")) { for (Class<?> configuration : entry.getValue().getConfiguration()) { context.register(configuration); } } } context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType); context.getEnvironment().getPropertySources().addFirst(new MapPropertySource( this.propertySourceName, Collections.<String, Object> singletonMap(this.propertyName, name))); if (this.parent != null) { // 这个类实现了 ApplicationContextAware 接口 说明他讲spring 的上下文容器设置成父容器 context.setParent(this.parent); } context.setDisplayName(generateDisplayName(name)); // 走的是spring 刷新那一套 context.refresh(); return context; }这个方法是 创建一个容器同时将一些配置文件 设置到该容器中 同时会刷新 容器
回到2.3.2.2 (T) loadBalance(builder, context, new HardCodedTarget<>(this.type, this.name, url));
protected <T> T loadBalance(Feign.Builder builder, FeignContext context, HardCodedTarget<T> target) { // 会往容器中注入LoadBalancerFeignClient 这个Client 实例 Client client = getOptional(context, Client.class); if (client != null) { builder.client(client); Targeter targeter = get(context, Targeter.class); // 跟进 这个方法 return targeter.target(this, builder, context, target); } throw new IllegalStateException( "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-netflix-ribbon?"); }2.3.2.8 targeter.target(this, builder, context, target);
@Override public <T> T target(FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context, Target.HardCodedTarget<T> target) { if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) { return feign.target(target); } feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign; SetterFactory setterFactory = getOptional(factory.getName(), context, SetterFactory.class); if (setterFactory != null) { builder.setterFactory(setterFactory); } Class<?> fallback = factory.getFallback(); if (fallback != void.class) { return targetWithFallback(factory.getName(), context, target, builder, fallback); } Class<?> fallbackFactory = factory.getFallbackFactory(); if (fallbackFactory != void.class) { return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory); } // 默认走这个分支 继续跟进去 return feign.target(target); }2.3.2.9 feign.target(target);
public <T> T target(Target<T> target) { return build().newInstance(target); }2.3.2.10 build()
@Override public Feign build() { return build(null); }2.3.2.11 build()
Feign build(final FallbackFactory<?> nullableFallbackFactory) { super.invocationHandlerFactory(new InvocationHandlerFactory() { @Override public InvocationHandler create(Target target, Map<Method, MethodHandler> dispatch) { return new HystrixInvocationHandler(target, dispatch, setterFactory, nullableFallbackFactory); } }); super.contract(new HystrixDelegatingContract(contract)); return super.build(); }这是一个 匿名方法 ,会创建一个 HystrixInvocationHandler 的动态代理 对象工厂 ,同时创建 contract 这个是方法入参和形参绑定的插件
2.3.2.10 newInstance()
public <T> T newInstance(Target<T> target) { // 获取到 带有@FeignClient 接口的所有方法 将参数类型进行绑定 ,并创建一个map对象 Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target); Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>(); List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>(); for (Method method : target.type().getMethods()) { if (method.getDeclaringClass() == Object.class) { continue; } else if(Util.isDefault(method)) { DefaultMethodHandler handler = new DefaultMethodHandler(method); defaultMethodHandlers.add(handler); methodToHandler.put(method, handler); } else { methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method))); } } InvocationHandler handler = factory.create(target, methodToHandler); // 通过工厂方法创建 HystrixInvocationHandler的代理对象 T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler); // 将方法的map 和改动态代理进行绑定 for(DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) { defaultMethodHandler.bindTo(proxy); } return proxy; }小结: 容器启动后,会自动扫描指定目录下的带有@FeignClient 的接口,同时会为不同实例创建一个容器,并将 log,编解码器 参数绑定器 注入到容器中。同时会创建 jdk 代理类,bean 的那么为@FeignClient 的接口的bean,同时 每个方法会创建一个RequestTemplate ,保存在map 中,key 为完成的方法名称, 最后将jdk 动态代理和map 绑定。
容器中注入HystrixInvocationHandler 代理类,同时调用的时候会执行 其invoke()
3.2 中的 executeAndDecode(template);
Object executeAndDecode(RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { // client 是初始化注册的 LoadBalancerFeignClient.java response = client.execute(request, options); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); boolean shouldClose = true; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); // ensure the request is set. TODO: remove in Feign 10 response.toBuilder().request(request).build(); } if (Response.class == metadata.returnType()) { if (response.body() == null) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false; return response; } // Ensure the response body is disconnected byte[] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300) { if (void.class == metadata.returnType()) { return null; } else { Object result = decode(response); shouldClose = closeAfterDecode; return result; } } else if (decode404 && response.status() == 404 && void.class != metadata.returnType()) { Object result = decode(response); shouldClose = closeAfterDecode; return result; } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } } }3.3 中的 response = client.execute(request, options);
@Override public Response execute(Request request, Request.Options options) throws IOException { try { URI asUri = URI.create(request.url()); String clientName = asUri.getHost(); URI uriWithoutHost = cleanUrl(request.url(), clientName); FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest( this.delegate, request, uriWithoutHost); //进入这里 这是和 ribbon 有关的入口 IClientConfig requestConfig = getClientConfig(options, clientName); return lbClient(clientName).executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse(); } catch (ClientException e) { IOException io = findIOException(e); if (io != null) { throw io; } throw new RuntimeException(e); } }3.4 IClientConfig requestConfig = getClientConfig(options, clientName);
IClientConfig getClientConfig(Request.Options options, String clientName) { IClientConfig requestConfig; if (options == DEFAULT_OPTIONS) { // 默认进入这个分支 clientName 为服务名称(注册到eureka上的) requestConfig = this.clientFactory.getClientConfig(clientName); } else { requestConfig = new FeignOptionsClientConfig(options); } return requestConfig; }3.4.1 requestConfig = this.clientFactory.getClientConfig(clientName);
public IClientConfig getClientConfig(String name) { return (IClientConfig)this.getInstance(name, IClientConfig.class); }3.4.1.2 C instance = super.getInstance(name, type);
public <T> T getInstance(String name, Class<T> type) { // 进入 AnnotationConfigApplicationContext context = getContext(name); if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0) { return context.getBean(type); } return null; }3.4.13 AnnotationConfigApplicationContext context = getContext(name);
protected AnnotationConfigApplicationContext getContext(String name) { // 继续跟进 return super.getContext(name); }3.4.1.4 super.getContext(name);
protected AnnotationConfigApplicationContext getContext(String name) { if (!this.contexts.containsKey(name)) { synchronized (this.contexts) { if (!this.contexts.containsKey(name)) { // private Map<String, AnnotationConfigApplicationContext> contexts = new ConcurrentHashMap<>(); // key 为注册到eueka的服务名称 value 为一个容器 // 第一次跟进这个容器看一下 this.contexts.put(name, createContext(name)); } } } return this.contexts.get(name); }3.4.1.5 createContext(name)
protected AnnotationConfigApplicationContext createContext(String name) { AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); if (this.configurations.containsKey(name)) { for (Class<?> configuration : this.configurations.get(name) .getConfiguration()) { context.register(configuration); } } for (Map.Entry<String, C> entry : this.configurations.entrySet()) { if (entry.getKey().startsWith("default.")) { for (Class<?> configuration : entry.getValue().getConfiguration()) { context.register(configuration); } } } context.register(PropertyPlaceholderAutoConfiguration.class, this.defaultConfigType); // 这个propertyName 为1.4 RibbonClientConfiguration.class // ribbon 在启动时候 将 propertySourceName 属性赋值的 context.getEnvironment().getPropertySources().addFirst(new MapPropertySource( this.propertySourceName, Collections.<String, Object> singletonMap(this.propertyName, name))); if (this.parent != null) { // Uses Environment from parent as well as beans context.setParent(this.parent); } context.setDisplayName(generateDisplayName(name)); // 刷新容器 context.refresh(); return context; }从以上代码可以看书 每个eurek 注册的服务会增加一个 容器,容器中是ribbon 相关的属性,第一次调用的时候会创建 容器。容器刷新的时候 会实例化RibbonClientConfiguration.class 。暂时进入到RibbonClientConfiguration.class 看一下里面有哪些属性
@Configuration @EnableConfigurationProperties @Import({HttpClientConfiguration.class, OkHttpRibbonConfiguration.class, RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class}) public class RibbonClientConfiguration { @Bean @ConditionalOnMissingBean public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping, ServerListUpdater serverListUpdater) { return (ILoadBalancer)(this.propertiesFactory.isSet(ILoadBalancer.class, this.name) ? (ILoadBalancer)this.propertiesFactory.get(ILoadBalancer.class, config, this.name) : // 默认走的是 new ZoneAwareLoadBalancer 这个分支 new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater)); } }跟进 new ZoneAwareLoadBalancer(config, rule, ping, serverList, serverListFilter, serverListUpdater)); 进入 ZoneAwareLoadBalancer.java 看一下实例化的过程
public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { super(clientConfig, rule, ping, serverList, filter, serverListUpdater); }跟进 super(clientConfig, rule, ping, serverList, filter, serverListUpdater); 实例化 ZoneAwareLoadBalancer 之前 必定先实例化 父类DynamicServerListLoadBalancer.java。
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping, ServerList<T> serverList, ServerListFilter<T> filter, ServerListUpdater serverListUpdater) { // 一大坨 继续跟进 super(clientConfig, rule, ping); this.serverListImpl = serverList; this.filter = filter; this.serverListUpdater = serverListUpdater; if (filter instanceof AbstractServerListFilter) { ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats()); } // 这个地方打个标记 一会回来看一下 restOfInit(clientConfig); }跟进 super(clientConfig, rule, ping) BaseLoadBalancer.java 的实例化方法
public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) { // 实例化 会执行他 看起来像是一个 配置 负载策略及其ping 的规则 initWithConfig(config, rule, ping); } void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping) { this.config = clientConfig; String clientName = clientConfig.getClientName(); this.name = clientName; int pingIntervalTime = Integer.parseInt("" + clientConfig.getProperty( CommonClientConfigKey.NFLoadBalancerPingInterval, Integer.parseInt("30"))); int maxTotalPingTime = Integer.parseInt("" + clientConfig.getProperty( CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime, Integer.parseInt("2"))); setPingInterval(pingIntervalTime); setMaxTotalPingTime(maxTotalPingTime); // cross associate with each other // i.e. Rule,Ping meet your container LB // LB, these are your Ping and Rule guys ... // 默认配置 ZoneAvoidanceRule.java 这种负载均衡策略 setRule(rule); // setPing(ping); setLoadBalancerStats(new LoadBalancerStats(clientName)); rule.setLoadBalancer(this); if (ping instanceof AbstractLoadBalancerPing) { ((AbstractLoadBalancerPing) ping).setLoadBalancer(this); } logger.info("Client: {} instantiated a LoadBalancer: {}", name, this); boolean enablePrimeConnections = clientConfig.get( CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS); if (enablePrimeConnections) { this.setEnablePrimingConnections(true); PrimeConnections primeConnections = new PrimeConnections( this.getName(), clientConfig); this.setPrimeConnections(primeConnections); } init(); }跟进setPing(ping); BaseLoadBalancer.java->setping()
public void setPing(IPing ping) { if (ping != null) { if (!ping.equals(this.ping)) { this.ping = ping; // 默认走这个分支 setupPingTask(); // since ping data changed } } else { this.ping = null; // cancel the timer task lbTimer.cancel(); } }setupPingTask();
void setupPingTask() { // 默认走 false 分支 if (canSkipPing()) { return; } if (lbTimer != null) { lbTimer.cancel(); } lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); // 创建一个定时job lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); // 强制执行 forceQuickPing(); }跟进 new PingTask() 定时job 30s 回执行一次 runPinger()
class PingTask extends TimerTask { public void run() { try { new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error pinging", name, e); } } }setupPingTask() 中, forceQuickPing()
public void forceQuickPing() { if (canSkipPing()) { return; } logger.debug("LoadBalancer [{}]: forceQuickPing invoking", name); try { // 也会执行 runPinger() new Pinger(pingStrategy).runPinger(); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error running forceQuickPing()", name, e); } }说明 第一次会强制执行runPinger() ,完事启动一个定时job 每隔30s执行一次
public void runPinger() throws Exception { if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do } // we are "in" - we get to Ping Server[] allServers = null; boolean[] results = null; Lock allLock = null; Lock upLock = null; try { /* * The readLock should be free unless an addServer operation is * going on... */ allLock = allServerLock.readLock(); allLock.lock(); allServers = allServerList.toArray(new Server[allServerList.size()]); allLock.unlock(); int numCandidates = allServers.length; // 检查 allServers 中的状态,跟进一下 pingServers() results = pingerStrategy.pingServers(ping, allServers); final List<Server> newUpList = new ArrayList<Server>(); final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { boolean isAlive = results[i]; Server svr = allServers[i]; boolean oldIsAlive = svr.isAlive(); svr.setAlive(isAlive); if (oldIsAlive != isAlive) { changedServers.add(svr); logger.debug("LoadBalancer [{}]: Server [{}] status changed to {}", name, svr.getId(), (isAlive ? "ALIVE" : "DEAD")); } if (isAlive) { newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); notifyServerStatusChangeListener(changedServers); } finally { pingInProgress.set(false); } } }跟进 results = pingerStrategy.pingServers(ping, allServers);
/** * Default implementation for <c>IPingStrategy</c>, performs ping * serially, which may not be desirable, if your <c>IPing</c> * implementation is slow, or you have large number of servers. */ private static class SerialPingStrategy implements IPingStrategy { @Override public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates); for (int i = 0; i < numCandidates; i++) { results[i] = false; /* Default answer is DEAD. */ try { // NOTE: IFF we were doing a real ping // assuming we had a large set of servers (say 15) // the logic below will run them serially // hence taking 15 times the amount of time it takes // to ping each server // A better method would be to put this in an executor // pool // But, at the time of this writing, we dont REALLY // use a Real Ping (its mostly in memory eureka call) // hence we can afford to simplify this design and run // this // serially if (ping != null) { // 逐一检查 查看状态 results[i] = ping.isAlive(servers[i]); } } catch (Exception e) { logger.error("Exception while pinging Server: '{}'", servers[i], e); } } return results; } }跟进 results[i] = ping.isAlive(servers[i]); NIWSDiscoveryPing.java -> isAlive()
public boolean isAlive(Server server) { boolean isAlive = true; if (server!=null && server instanceof DiscoveryEnabledServer){ DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server; // eureka 实例 InstanceInfo instanceInfo = dServer.getInstanceInfo(); if (instanceInfo!=null){ InstanceStatus status = instanceInfo.getStatus(); if (status!=null){ isAlive = status.equals(InstanceStatus.UP); } } } return isAlive; }小结一下: 每隔30s 检查一次 ,查看每隔服务在eureka 中的状态信息。对于存活的就放在 servList 中。
回到 前面 DynamicServerListLoadBalancer.java -> restOfInit(clientConfig);
void restOfInit(IClientConfig clientConfig) { boolean primeConnection = this.isEnablePrimingConnections(); // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList() this.setEnablePrimingConnections(false); // 第一次会拉取eureka 全量信息服务信息 enableAndInitLearnNewServersFeature(); // 跟新server 信息 server 那边服务的保护策略 有些下线的服务没有剔除 updateListOfServers(); if (primeConnection && this.getPrimeConnections() != null) { this.getPrimeConnections() .primeConnections(getReachableServers()); } this.setEnablePrimingConnections(primeConnection); LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString()); }跟进 enableAndInitLearnNewServersFeature() DynamicServerListLoadBalancer.java ->enableAndInitLearnNewServersFeature
public void enableAndInitLearnNewServersFeature() { LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName()); serverListUpdater.start(updateAction); }PollingServerListUpdater.java ->start 跟进 serverListUpdater.start(updateAction);
@Override public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { // 跟进看一下这个方法 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; // 启动后 1s 后执行 默认30s 会执行一次 updateAction.doUpdate(); 方法 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }跟进 updateAction.doUpdate(); DynamicServerListLoadBalancer.java ->doUpdate
class NamelessClass_1 implements UpdateAction { NamelessClass_1() { } public void doUpdate() { DynamicServerListLoadBalancer.this.updateListOfServers(); } }跟进 updateListOfServers() DynamicServerListLoadBalancer.java ->updateListOfServers
@VisibleForTesting public void updateListOfServers() { List<T> servers = new ArrayList<T>(); // 会进到这个分支 if (serverListImpl != null) { // 先进到这里 servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); }servers = serverListImpl.getUpdatedListOfServers(); DomainExtractingServerList.java ->getUpdatedListOfServers
@Override public List<DiscoveryEnabledServer> getUpdatedListOfServers() { List<DiscoveryEnabledServer> servers = setZones(this.list .getUpdatedListOfServers()); return servers; }DiscoveryEnabledNIWSServerList.java->getUpdatedListOfServers()
@Override public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } // 在obtainServersViaDiscovery 调用的 private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { // if targetRegion is null, it will be interpreted as the same region of client List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { if(shouldUseOverridePort){ if(logger.isDebugEnabled()){ logger.debug("Overriding port on client name: " + clientName + " to " + overridePort); } // copy is necessary since the InstanceInfo builder just uses the original reference, // and we don't want to corrupt the global eureka copy of the object which may be // used by other clients in our system InstanceInfo copy = new InstanceInfo(ii); if(isSecure){ ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build(); }else{ ii = new InstanceInfo.Builder(copy).setPort(overridePort).build(); } } DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers } } } return serverList; }小结: 每隔30s 会执行一下 ,从eureka 拉取 server 信息,或从客户端直接获取或者从server 拉取。 回到DynamicServerListLoadBalancer.java -> restOfInit(clientConfig); 中的 updateListOfServers();
protected void updateAllServerList(List<T> ls) { // other threads might be doing this - in which case, we pass if (serverListUpdateInProgress.compareAndSet(false, true)) { try { for (T s : ls) { s.setAlive(true); // set so that clients can start using these // servers right away instead // of having to wait out the ping cycle. } setServersList(ls); // 强制检查每一个server 看他的状态是不是down super.forceQuickPing(); } finally { serverListUpdateInProgress.set(false); } } }小结一下: 容器在刷新的时候会注入DynamicServerListLoadBalancer 对象到子容器,同时在实例化的时候,会创建两个定时任务,默认都是30s 执行一次,会通过eurek client 获取实例信息,领完一个定时任务会 刷新实例列表,检查是不是down 状态。 回到3.4 中
回到 LoadBalancerFeignClient.java 中的 execute 中的executeWithLoadBalancer
// 这个方法有两个目的 1; 根据服务名及其负载策略定位及其 // 实现远程调用 public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { // 这里是一个匿名函数 return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }这里是选择server
// 整体采用Rxjava 1.0 响应式编程 实现的 public Observable<T> submit(final ServerOperation<T> operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer Observable<T> o = // 创建观察这的时候 selectServer() 负载原理 进去看一下 (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1<Server, Observable<T>>() { @Override // Called for each server being selected public Observable<T> call(Server server) { context.setServer(server); final ServerStats stats = loadBalancerContext.getServerStats(server); // Called for each attempt and retry Observable<T> o = Observable .just(server) .concatMap(new Func1<Server, Observable<T>>() { @Override public Observable<T> call(final Server server) { context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); return operation.call(server).doOnEach(new Observer<T>() { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // TODO: What to do if onNext or onError are never called? } @Override public void onError(Throwable e) { recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); // 重试 if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() { @Override public Observable<T> call(Throwable e) { if (context.getAttemptCount() > 0) { if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }3.4.2.1 selectServer()
private Observable<Server> selectServer() { return Observable.create(new OnSubscribe<Server>() { @Override public void call(Subscriber<? super Server> next) { try { //跟进这个方法,loadBalancerKey 为null,loadBalancerURI:不带服务名的uri Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey); next.onNext(server); next.onCompleted(); } catch (Exception e) { next.onError(e); } } }); }3.4.2.3 Server svc = lb.chooseServer(loadBalancerKey);
@Override public Server chooseServer(Object key) { if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); // 默认走这个分支 return super.chooseServer(key); } Server server = null; try { LoadBalancerStats lbStats = getLoadBalancerStats(); Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); if (triggeringLoad == null) { triggeringLoad = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".triggeringLoadPerServerThreshold", 0.2d); } if (triggeringBlackoutPercentage == null) { triggeringBlackoutPercentage = DynamicPropertyFactory.getInstance().getDoubleProperty( "ZoneAwareNIWSDiscoveryLoadBalancer." + this.getName() + ".avoidZoneWithBlackoutPercetage", 0.99999d); } Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); server = zoneLoadBalancer.chooseServer(key); } } } catch (Exception e) { logger.error("Error choosing server using zone aware logic for load balancer={}", name, e); } if (server != null) { return server; } else { logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } }3.4.2.4 return super.chooseServer(key);
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { // 进入看一下 return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } } }小结 ; 这里是负载均衡 轮训的实现 ,代码很简单 文字不好描述。大概就是 每次除以serverlist 求余,然后从List 中获取.
3.4.2 AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)
public RibbonResponse execute(final RibbonRequest request, IClientConfig configOverride) throws IOException { final Request.Options options; if (configOverride != null) { RibbonProperties ribbon = RibbonProperties.from(configOverride); options = new Request.Options( ribbon.connectTimeout(this.connectTimeout), ribbon.readTimeout(this.readTimeout)); } else { options = new Request.Options(this.connectTimeout, this.readTimeout); } final LoadBalancedRetryPolicy retryPolicy = loadBalancedRetryFactory.createRetryPolicy(this.getClientName(), this); RetryTemplate retryTemplate = new RetryTemplate(); BackOffPolicy backOffPolicy = loadBalancedRetryFactory.createBackOffPolicy(this.getClientName()); retryTemplate.setBackOffPolicy(backOffPolicy == null ? new NoBackOffPolicy() : backOffPolicy); RetryListener[] retryListeners = this.loadBalancedRetryFactory.createRetryListeners(this.getClientName()); if (retryListeners != null && retryListeners.length != 0) { retryTemplate.setListeners(retryListeners); } retryTemplate.setRetryPolicy(retryPolicy == null ? new NeverRetryPolicy() : new FeignRetryPolicy(request.toHttpRequest(), retryPolicy, this, this.getClientName())); return retryTemplate.execute(new RetryCallback<RibbonResponse, IOException>() { @Override public RibbonResponse doWithRetry(RetryContext retryContext) throws IOException { Request feignRequest = null; //on retries the policy will choose the server and set it in the context //extract the server and update the request being made if (retryContext instanceof LoadBalancedRetryContext) { ServiceInstance service = ((LoadBalancedRetryContext) retryContext).getServiceInstance(); if (service != null) { feignRequest = ((RibbonRequest) request.replaceUri(reconstructURIWithServer(new Server(service.getHost(), service.getPort()), request.getUri()))).toRequest(); } } if (feignRequest == null) { feignRequest = request.toRequest(); } // 这里发起http 请求 Response response = request.client().execute(feignRequest, options); if (retryPolicy.retryableStatusCode(response.status())) { byte[] byteArray = response.body() == null ? new byte[]{} : StreamUtils.copyToByteArray(response.body().asInputStream()); response.close(); throw new RibbonResponseStatusCodeException(RetryableFeignLoadBalancer.this.clientName, response, byteArray, request.getUri()); } return new RibbonResponse(request.getUri(), response); } }, new LoadBalancedRecoveryCallback<RibbonResponse, Response>() { @Override protected RibbonResponse createResponse(Response response, URI uri) { return new RibbonResponse(uri, response); } }); }以上是client 发起请求,过程 包括序列化。 总结: 容器启动时候,会为每一个eureka 注册的实例 注册一个openfeigin 的配置容器,同时创建一个jdk 生成的动态代理,同时将每个方法 放到map 中,同时每个方法就是一个requestTemplate. ribbbon 启动时候 会在容器中 生成SpringClientFactory 对象。找到动态代理的invoke 方法,并且匹配map中的方法,在服务第一次调用的时候 对每个eureka实例,生成一个容器,容器在刷新的时候,会生成两个30s 执行一次的定时任务。一个是30s从eureka 拉取实例信息,另外一个将下线的服务剔除服务列表。最后根据ribbon 容器总的负载策略选择一个server,最后发起服务调用。