Ribbon从入门到源码解析
目录
1、简介
2、案例
2.1 搭建服务注册中心EurekaServer
2.2 搭建order-service服务
2.3 搭建user-service服务
2.4 服务启动
2.5 测试结果
3、Ribbon如何实现负载均衡
3.1 拦截http请求
3.2 解析请求中的服务名
3.3 根据服务名获取服务IP和Port信息
3.4 根据负载均衡策略发起http请求
4、简单源码解析
4.1 ILoadBalancer
4.2 AbstractLoadBalancer
4.3 BaseLoadBalancer
4.4 DynamicServerListLoadBalancer
4.5 ZoneAwareLoadBalancer
1、简介
在微服务架构中,服务拆分成一个个的微服务,并且以集群化的方式进行部署;此时服务与服务之间的调用变得复杂了起来,客户端需要自主选择调用服务端集群中的某个服务,这就是我们经常说到的客户端负载均衡,在Spring Cloud生态中使用的比较广泛的技术是Ribbon。
2、案例
无论是使用Fegin还是RestTemplate发起服务调用,客户端负载均衡均是通过Ribbon来实现,这里使用RestTemplate演示案例。
2.1 搭建服务注册中心EurekaServer
-
pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
-
application.yml
server:port: 18888spring:application:name: eurekaServereureka:client:
# fetch-registry: false
# register-with-eureka: falseservice-url:defaultZone: http://127.0.0.1:18888/eureka
-
启动类
@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {public static void main(String[] args) {SpringApplication.run(EurekaServerApplication.class, args);}}
2.2 搭建order-service服务
-
pom依赖
<!--web-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--EurekaClient-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
-
application.yml
# server port
server:port: 18070# name
spring:application:name: order-service# eureka server
eureka:client:service-url:defaultZone: http://127.0.0.1:18888/eureka
-
模拟业务代码
@RestController
@RequestMapping("order")
public class OrderController {@Autowiredprivate OrderService orderService;@GetMapping("{orderId}")public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {// 根据id查询订单并返回return orderService.queryOrderById(orderId);}
}
@Service
public class OrderService {@Autowiredprivate RestTemplate restTemplate;@Autowiredprivate OrderMapper orderMapper;public Order queryOrderById(Long orderId) {// 1.查询订单Order order = orderMapper.findById(orderId);// 2、查询用户信息if (Objects.nonNull(order)) {String url = String.format("http://user-service/user/%s", order.getUserId());User user = restTemplate.getForObject(url, User.class);// 3、封装用户信息order.setUser(user);}// 4.返回return order;}
}
-
启动类中注入RestTemplate并开启负载均衡
@MapperScan("com.lzb.order.mapper")
@SpringBootApplication
@EnableEurekaClient
public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);}/*** RestTemplate bean容器的注入* LoadBalanced 负载均衡注解* @return*/@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();}}
2.3 搭建user-service服务
-
pom依赖
<!--web-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--EurekaClient-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
-
application.yml
# server port
server:port: 18080# name
spring:application:name: user-service# eureka server
eureka:client:service-url:defaultZone: http://127.0.0.1:18888/eureka
-
模拟业务代码
@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate UserService userService;@GetMapping("/{id}")public User queryById(@PathVariable("id") Long id) {return userService.queryById(id);}
}
-
启动类
@MapperScan("com.lzb.user.mapper")
@SpringBootApplication
@EnableEurekaClient
public class UserApplication {public static void main(String[] args) {SpringApplication.run(UserApplication.class, args);}
}
2.4 服务启动
在上述服务搭建之后,可以看出order-service服务调用了user-service服务,因此我将user-service服务集群部署,并且在order-service注入了RestTemplate且标注了LoadBalanced注解;启动顺序如下所示:
-
启动EurekaServer
-
启动user-service
-
启动user-service2
-
启动order-service
关于IDEA 如何集群启动某个服务,方式比较多,我这里介绍一种常用的方法,步骤如下:
-
首先启动该服务,直至服务启动成功
-
右键启动的服务,选择Copy Configuration
-
Edit Configuration中修改服务Name;传入端口参数,在Environment中的VM options键入-Dserver.port=xxxx;点击Apply;点击OK即可;
-
启动服务,右上角选择刚刚编辑的服务信息,DEBUG启动即可。
-
服务启动后Eureka Server中服务注册信息如下所示
2.5 测试结果
清空user-service和user-service2的控制台日志,在浏览器中请求四次order-service,order-service中会通过RestTemplate调用order-service,由于RestTemplate使用了LoadBlanced注解修饰,因此Ribbon托管了RestTemplate,在发起调用之前会解析服务名获取服务Ip和port,然后根据负载均衡策略选择服务进行调用!
可以在console打印的日志中看出,第一次请求大到了user-service,第二次请求打到了user-service1,第三次请求大到了user-service,第四次请求打到了user-service1
3、Ribbon如何实现负载均衡
可以试想一下,如果是你本人去实现一个Ribbon的功能你会怎么做?我想大家的思路应该都差不多如下:
-
拦截Http请求
-
解析请求中的服务名
-
在Eureka Client拉取的Eureka Server中注册的可用服务信息中,根据服务名获取服务IP和Port信息
-
根据负载均衡策略选择服务提供者发起http请求
3.1 拦截http请求
在springboot中常用的拦截器有三个:
-
org.springframework.web.servlet.HandlerInterceptor
-
org.springframework.http.client.ClientHttpRequestInterceptor
-
feign.RequestInterceptor
三者均是对http请求进行拦截,但是3个拦截器应用的项目不同,HandlerInterceptor主要是处理http servlet请求;ClientHttpRequestInterceptor主要是处理HttpTemplate请求或者Ribbon请求;RequestInterceptor用于处理Fegin请求,Fegin本质上是http请求;因此很明显,Ribbon实现的是ClientHttpRequestInterceptor拦截器。
3.2 解析请求中的服务名
org.springframework.http.client.ClientHttpRequestInterceptor接口中只有一个方法intercept(),其子类均会重写该方法org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor,在该方法入口处打上断点。并且在浏览器中访问order-service,order-service中会使用RestTemplate请求user-service
此时可以看到request.getURI()得到的是http://user-service/user/4 通过final URI originalUri = request.getURI(); String serviceName = originalUri.getHost();解析获得服务名
3.3 根据服务名获取服务IP和Port信息
在org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor类中重写的intercept()方法,最后一行代码至关重要,this.requestFactory.createRequest(request, body, execution)为包装http请求,不是很重要,最终的是org.springframework.cloudflix.ribbon.RibbonLoadBalancerClient类中execute()方法。
此处的serviceId即为服务名user-service,this.getLoadBalancer(serviceId);会根据服务名从eureka中解析中对应的服务地址和端口。 this.getLoadBalancer(serviceId)方法调用了org.springframework.cloudflix.ribbon.SpringClientFactory类中的getLoadBalancer()方法,随后调用了org.springframework.cloudflix.ribbon.SpringClientFactory.getInstance()方法,之后调用了其父类org.springframework.cloud.context.named.NamedContextFactory.getInstance()方法,最终返回org.springframework.context.annotation.AnnotationConfigApplicationContext,可以看到其实获取的是spring 容器中的ILoadBalancer.class实现类comflix.loadbalancer.DynamicServerListLoadBalancer实例。 那现在还有最后一个问题,DynamicServerListLoadBalancer实例中的服务信息是怎么来的呢?这里其实是Eureka Clinet从Eureka Server中拉取的服务列表。
3.4 根据负载均衡策略发起http请求
最后一步就是根据负载均衡策略选择服务提供者发起http请求,负载均衡策略的选择在comflix.loadbalancer.ZoneAwareLoadBalancer的chooseServer()方法中实现。在选择发起请求的服务之后执行org.springframework.cloudflix.ribbon.RibbonLoadBalancerClient中的execute()方法即完成整个Ribbon负载均衡过程。
4、简单源码解析
在Ribbon整个源码体系中,ILoadBalancer接口的类关系图十分重要,因此源码解析也会根据这张图的类关系图来。
4.1 ILoadBalancer
comflix.loadbalancer.ILoadBalancer是一个顶层接口类,该类中定义了几个未实现的方法,具体实现在子类中完成。
方法作用如下所示:
方法名 | 作用 |
---|---|
addServers | 1、服务器列表初始化 |
2、添加新的服务 | |
chooseServer | 从负载均衡器中选择服务器 |
markServerDown | 负载均衡客户端主动通知下机,否则不可用的服务将会存活到下一个ping周期 |
getServerList | @Deprecated |
getReachableServers | 获取能正常访问的服务器 |
getAllServers | 获取所有已知的服务器,包括可访问的和不可访问的 |
4.2 AbstractLoadBalancer
comflix.loadbalancer.AbstractLoadBalancer是一个抽象类,它实现了comflix.loadbalancer.ILoadBalancer接口;其源码非常少,如下所示:
public abstract class AbstractLoadBalancer implements ILoadBalancer {public enum ServerGroup{ALL,STATUS_UP,STATUS_NOT_UP }public Server chooseServer() {return chooseServer(null);}public abstract List<Server> getServerList(ServerGroup serverGroup);public abstract LoadBalancerStats getLoadBalancerStats();
}
AbstractLoadBalancer抽象类中定义类一个ServerGroup内部枚举类,ServerGroup用于标志服务实例的分组类型:
-
ALL 表示所有服务
-
STATUS_UP 表示正常服务
-
STATUS_NOT_UP 表示下线服务
4.3 BaseLoadBalancer
comflix.loadbalancer.BaseLoadBalancer类继承了comflix.loadbalancer.AbstractLoadBalancer,BaseLoadBalancer类源码比较复杂,但是有几个点是比较重要的。
-
allServerList 用于保存所有服务实例
-
upServerList用于保存所有在线服务实例
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
-
定义负载均衡默认策略为轮询
private final static IRule DEFAULT_RULE = new RoundRobinRule();
protected IRule rule = DEFAULT_RULE;
-
IPingStrategy表示服务检查策略,用于检查服务是否可用;默认的服务检查策略为SerialPingStrategy,SerialPingStrategy中的pingServers方法就是遍历所有服务实例,一个个发送ping请求,查看服务是否有效。
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
-
BaseLoadBalancer构造函数中启动了一个PingTask,PingTask每隔10秒钟会ping一次服务列表中的服务是否可用,PingTask中干的事情就是pingStrategy服务检查策略。
protected int pingIntervalSeconds = 10;public BaseLoadBalancer() {this.name = DEFAULT_NAME;this.ping = null;setRule(DEFAULT_RULE);setupPingTask();lbStats = new LoadBalancerStats(DEFAULT_NAME);
}void setupPingTask() {if (canSkipPing()) {return;}if (lbTimer != null) {lbTimer.cancel();}lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,true);lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);forceQuickPing();
}
4.4 DynamicServerListLoadBalancer
comflix.loadbalancer.DynamicServerListLoadBalancer类继承了comflix.loadbalancer.BaseLoadBalancer,因此DynamicServerListLoadBalancer类主要是对BaseLoadBalancer类功能进行扩展,DynamicServerListLoadBalancer类源码比较复杂,但是有几个点是比较重要的。
-
serverListImpl是DynamicServerListLoadBalancer中声明的ServerList类型的变量,ServerList接口中定义了两个方法
volatile ServerList<T> serverListImpl;
-
getInitialListOfServers方法用于获取所有初始化服务列表
-
getUpdatedListOfServers方法用于获取更新的服务实例列表
public interface ServerList<T extends Server> {public List<T> getInitialListOfServers();public List<T> getUpdatedListOfServers(); }
-
ServerList接口有5个实现类,DynamicServerListLoadBalancer默认实现是DomainExtractingServerList,但是DomainExtractingServerList构造函数中传入的是DiscoveryEnabledNIWSServerList(可以看我下面Debug的图),因此可以看出重点类其实是DiscoveryEnabledNIWSServerList
-
DiscoveryEnabledNIWSServerList类中一个比较重要的方法是obtainServersViaDiscovery方法,可以从名字看出这是通过注册中心获取服务列表,代码中可以看出依赖 EurekaClient从服务注册中心中获取具体的服务实例InstanceInfo
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {logger.warn("EurekaClient has not been initialized yet, returning an empty list");return new ArrayList<DiscoveryEnabledServer>();}EurekaClient eurekaClient = eurekaClientProvider.get();if (vipAddresses!=null){for (String vipAddress : vipAddresses.split(",")) {// if targetRegion is null, it will be interpreted as the same region of clientList<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);for (InstanceInfo ii : listOfInstanceInfo) {if (ii.getStatus().equals(InstanceStatus.UP)) {if(shouldUseOverridePort){if(logger.isDebugEnabled()){logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);}// copy is necessary since the InstanceInfo builder just uses the original reference,// and we don't want to corrupt the global eureka copy of the object which may be// used by other clients in our systemInstanceInfo copy = new InstanceInfo(ii);if(isSecure){ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();}else{ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();}}DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);serverList.add(des);}}if (serverList.size()>0 && prioritizeVipAddressBasedServers){break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers}}}return serverList;}
-
DiscoveryEnabledNIWSServerList类中另一个比较重要点是定义了一个ServerListUpdater.UpdateAction更新器,该更新器用于更新服务信息。ServerListUpdater提供两个实现类comflix.niws.loadbalancer.EurekaNotificationServerListUpdater和comflix.loadbalancer.PollingServerListUpdater;其中EurekaNotificationServerListUpdater通过Eureka的事件监听机制来更新服务信息;而此处默认的是PollingServerListUpdater定时任务更新机制。
-
PollingServerListUpdater代码中可以看出定时任务延迟启动initialDelayMs为1秒,刷新频率refreshIntervalMs为30秒
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs; public PollingServerListUpdater() {this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {this.initialDelayMs = initialDelayMs;this.refreshIntervalMs = refreshIntervalMs;
}public synchronized void start(final UpdateAction updateAction) {if (isActivepareAndSet(false, true)) {final Runnable wrapperRunnable = new Runnable() {@Overridepublic void run() {if (!isActive.get()) {if (scheduledFuture != null) {scheduledFuture.cancel(true);}return;}try {updateAction.doUpdate();lastUpdated = System.currentTimeMillis();} catch (Exception e) {logger.warn("Failed one update cycle", e);}}};scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable,initialDelayMs,refreshIntervalMs,TimeUnit.MILLISECONDS);} else {logger.info("Already active, no-op");}
}
-
在DynamicServerListLoadBalancer定义了一个变量ServerListFilter,可以看到在updateListOfServers方法中,会判断filter是否为空,然后对getUpdatedListOfServers获取到的服务列表servers执行getFilteredListOfServers方法,其实就是对服务列表根据ServerListFilter接口的实现类逻辑进行过滤。
volatile ServerListFilter<T> filter;public void updateListOfServers() {List<T> servers = new ArrayList<T>();if (serverListImpl != null) {servers = serverListImpl.getUpdatedListOfServers();LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);if (filter != null) {servers = filter.getFilteredListOfServers(servers);LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);}}updateAllServerList(servers);
}
-
ServerListFilter的实现类如下所示,默认的实现类是DefaultNIWSServerListFilter,但是DefaultNIWSServerListFilter啥也没有,仅仅继承了ZoneAffinityServerListFilter;因此具体的功能还是在ZoneAffinityServerListFilter中实现,而ZoneAffinityServerListFilter主要提供的是对服务提供者所处的Zone和服务消费者所在的Zone进行比较,过滤掉不在一个Zone的实例。
4.5 ZoneAwareLoadBalancer
comflix.loadbalancer.ZoneAwareLoadBalancer是comflix.loadbalancer.DynamicServerListLoadBalancer的唯一子类,在DynamicServerListLoadBalancer中还有一个非常重要的方法没有实现,那就是chooseServer方法。chooseServer用于负载均衡器选择服务器进行调用,因此ZoneAwareLoadBalancer的出现就是解决这个问题。此外ZoneAwareLoadBalancer重写了setServerListForZones方法,setServerListForZones方法getLoadBalancer(zone)用于创建负载均衡器; existingLBEntry.getValue().setServersList(Collections.emptyList())用于清除不包含server的zone
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {super.setServerListForZones(zoneServersMap);if (balancers == null) {balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();}for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {String zone = entry.getKey().toLowerCase();getLoadBalancer(zone).setServersList(entry.getValue());}// check if there is any zone that no longer has a server// and set the list to empty so that the zone related metrics does not// contain stale datafor (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {existingLBEntry.getValue().setServersList(Collections.emptyList());}}
}
👇🏻 关注公众号 我们一起进大厂👇🏻
Ribbon从入门到源码解析
目录
1、简介
2、案例
2.1 搭建服务注册中心EurekaServer
2.2 搭建order-service服务
2.3 搭建user-service服务
2.4 服务启动
2.5 测试结果
3、Ribbon如何实现负载均衡
3.1 拦截http请求
3.2 解析请求中的服务名
3.3 根据服务名获取服务IP和Port信息
3.4 根据负载均衡策略发起http请求
4、简单源码解析
4.1 ILoadBalancer
4.2 AbstractLoadBalancer
4.3 BaseLoadBalancer
4.4 DynamicServerListLoadBalancer
4.5 ZoneAwareLoadBalancer
1、简介
在微服务架构中,服务拆分成一个个的微服务,并且以集群化的方式进行部署;此时服务与服务之间的调用变得复杂了起来,客户端需要自主选择调用服务端集群中的某个服务,这就是我们经常说到的客户端负载均衡,在Spring Cloud生态中使用的比较广泛的技术是Ribbon。
2、案例
无论是使用Fegin还是RestTemplate发起服务调用,客户端负载均衡均是通过Ribbon来实现,这里使用RestTemplate演示案例。
2.1 搭建服务注册中心EurekaServer
-
pom依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
-
application.yml
server:port: 18888spring:application:name: eurekaServereureka:client:
# fetch-registry: false
# register-with-eureka: falseservice-url:defaultZone: http://127.0.0.1:18888/eureka
-
启动类
@EnableEurekaServer
@SpringBootApplication
public class EurekaServerApplication {public static void main(String[] args) {SpringApplication.run(EurekaServerApplication.class, args);}}
2.2 搭建order-service服务
-
pom依赖
<!--web-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--EurekaClient-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
-
application.yml
# server port
server:port: 18070# name
spring:application:name: order-service# eureka server
eureka:client:service-url:defaultZone: http://127.0.0.1:18888/eureka
-
模拟业务代码
@RestController
@RequestMapping("order")
public class OrderController {@Autowiredprivate OrderService orderService;@GetMapping("{orderId}")public Order queryOrderByUserId(@PathVariable("orderId") Long orderId) {// 根据id查询订单并返回return orderService.queryOrderById(orderId);}
}
@Service
public class OrderService {@Autowiredprivate RestTemplate restTemplate;@Autowiredprivate OrderMapper orderMapper;public Order queryOrderById(Long orderId) {// 1.查询订单Order order = orderMapper.findById(orderId);// 2、查询用户信息if (Objects.nonNull(order)) {String url = String.format("http://user-service/user/%s", order.getUserId());User user = restTemplate.getForObject(url, User.class);// 3、封装用户信息order.setUser(user);}// 4.返回return order;}
}
-
启动类中注入RestTemplate并开启负载均衡
@MapperScan("com.lzb.order.mapper")
@SpringBootApplication
@EnableEurekaClient
public class OrderApplication {public static void main(String[] args) {SpringApplication.run(OrderApplication.class, args);}/*** RestTemplate bean容器的注入* LoadBalanced 负载均衡注解* @return*/@Bean@LoadBalancedpublic RestTemplate restTemplate() {return new RestTemplate();}}
2.3 搭建user-service服务
-
pom依赖
<!--web-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--EurekaClient-->
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
-
application.yml
# server port
server:port: 18080# name
spring:application:name: user-service# eureka server
eureka:client:service-url:defaultZone: http://127.0.0.1:18888/eureka
-
模拟业务代码
@RestController
@RequestMapping("/user")
public class UserController {@Autowiredprivate UserService userService;@GetMapping("/{id}")public User queryById(@PathVariable("id") Long id) {return userService.queryById(id);}
}
-
启动类
@MapperScan("com.lzb.user.mapper")
@SpringBootApplication
@EnableEurekaClient
public class UserApplication {public static void main(String[] args) {SpringApplication.run(UserApplication.class, args);}
}
2.4 服务启动
在上述服务搭建之后,可以看出order-service服务调用了user-service服务,因此我将user-service服务集群部署,并且在order-service注入了RestTemplate且标注了LoadBalanced注解;启动顺序如下所示:
-
启动EurekaServer
-
启动user-service
-
启动user-service2
-
启动order-service
关于IDEA 如何集群启动某个服务,方式比较多,我这里介绍一种常用的方法,步骤如下:
-
首先启动该服务,直至服务启动成功
-
右键启动的服务,选择Copy Configuration
-
Edit Configuration中修改服务Name;传入端口参数,在Environment中的VM options键入-Dserver.port=xxxx;点击Apply;点击OK即可;
-
启动服务,右上角选择刚刚编辑的服务信息,DEBUG启动即可。
-
服务启动后Eureka Server中服务注册信息如下所示
2.5 测试结果
清空user-service和user-service2的控制台日志,在浏览器中请求四次order-service,order-service中会通过RestTemplate调用order-service,由于RestTemplate使用了LoadBlanced注解修饰,因此Ribbon托管了RestTemplate,在发起调用之前会解析服务名获取服务Ip和port,然后根据负载均衡策略选择服务进行调用!
可以在console打印的日志中看出,第一次请求大到了user-service,第二次请求打到了user-service1,第三次请求大到了user-service,第四次请求打到了user-service1
3、Ribbon如何实现负载均衡
可以试想一下,如果是你本人去实现一个Ribbon的功能你会怎么做?我想大家的思路应该都差不多如下:
-
拦截Http请求
-
解析请求中的服务名
-
在Eureka Client拉取的Eureka Server中注册的可用服务信息中,根据服务名获取服务IP和Port信息
-
根据负载均衡策略选择服务提供者发起http请求
3.1 拦截http请求
在springboot中常用的拦截器有三个:
-
org.springframework.web.servlet.HandlerInterceptor
-
org.springframework.http.client.ClientHttpRequestInterceptor
-
feign.RequestInterceptor
三者均是对http请求进行拦截,但是3个拦截器应用的项目不同,HandlerInterceptor主要是处理http servlet请求;ClientHttpRequestInterceptor主要是处理HttpTemplate请求或者Ribbon请求;RequestInterceptor用于处理Fegin请求,Fegin本质上是http请求;因此很明显,Ribbon实现的是ClientHttpRequestInterceptor拦截器。
3.2 解析请求中的服务名
org.springframework.http.client.ClientHttpRequestInterceptor接口中只有一个方法intercept(),其子类均会重写该方法org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor,在该方法入口处打上断点。并且在浏览器中访问order-service,order-service中会使用RestTemplate请求user-service
此时可以看到request.getURI()得到的是http://user-service/user/4 通过final URI originalUri = request.getURI(); String serviceName = originalUri.getHost();解析获得服务名
3.3 根据服务名获取服务IP和Port信息
在org.springframework.cloud.client.loadbalancer.LoadBalancerInterceptor类中重写的intercept()方法,最后一行代码至关重要,this.requestFactory.createRequest(request, body, execution)为包装http请求,不是很重要,最终的是org.springframework.cloudflix.ribbon.RibbonLoadBalancerClient类中execute()方法。
此处的serviceId即为服务名user-service,this.getLoadBalancer(serviceId);会根据服务名从eureka中解析中对应的服务地址和端口。 this.getLoadBalancer(serviceId)方法调用了org.springframework.cloudflix.ribbon.SpringClientFactory类中的getLoadBalancer()方法,随后调用了org.springframework.cloudflix.ribbon.SpringClientFactory.getInstance()方法,之后调用了其父类org.springframework.cloud.context.named.NamedContextFactory.getInstance()方法,最终返回org.springframework.context.annotation.AnnotationConfigApplicationContext,可以看到其实获取的是spring 容器中的ILoadBalancer.class实现类comflix.loadbalancer.DynamicServerListLoadBalancer实例。 那现在还有最后一个问题,DynamicServerListLoadBalancer实例中的服务信息是怎么来的呢?这里其实是Eureka Clinet从Eureka Server中拉取的服务列表。
3.4 根据负载均衡策略发起http请求
最后一步就是根据负载均衡策略选择服务提供者发起http请求,负载均衡策略的选择在comflix.loadbalancer.ZoneAwareLoadBalancer的chooseServer()方法中实现。在选择发起请求的服务之后执行org.springframework.cloudflix.ribbon.RibbonLoadBalancerClient中的execute()方法即完成整个Ribbon负载均衡过程。
4、简单源码解析
在Ribbon整个源码体系中,ILoadBalancer接口的类关系图十分重要,因此源码解析也会根据这张图的类关系图来。
4.1 ILoadBalancer
comflix.loadbalancer.ILoadBalancer是一个顶层接口类,该类中定义了几个未实现的方法,具体实现在子类中完成。
方法作用如下所示:
方法名 | 作用 |
---|---|
addServers | 1、服务器列表初始化 |
2、添加新的服务 | |
chooseServer | 从负载均衡器中选择服务器 |
markServerDown | 负载均衡客户端主动通知下机,否则不可用的服务将会存活到下一个ping周期 |
getServerList | @Deprecated |
getReachableServers | 获取能正常访问的服务器 |
getAllServers | 获取所有已知的服务器,包括可访问的和不可访问的 |
4.2 AbstractLoadBalancer
comflix.loadbalancer.AbstractLoadBalancer是一个抽象类,它实现了comflix.loadbalancer.ILoadBalancer接口;其源码非常少,如下所示:
public abstract class AbstractLoadBalancer implements ILoadBalancer {public enum ServerGroup{ALL,STATUS_UP,STATUS_NOT_UP }public Server chooseServer() {return chooseServer(null);}public abstract List<Server> getServerList(ServerGroup serverGroup);public abstract LoadBalancerStats getLoadBalancerStats();
}
AbstractLoadBalancer抽象类中定义类一个ServerGroup内部枚举类,ServerGroup用于标志服务实例的分组类型:
-
ALL 表示所有服务
-
STATUS_UP 表示正常服务
-
STATUS_NOT_UP 表示下线服务
4.3 BaseLoadBalancer
comflix.loadbalancer.BaseLoadBalancer类继承了comflix.loadbalancer.AbstractLoadBalancer,BaseLoadBalancer类源码比较复杂,但是有几个点是比较重要的。
-
allServerList 用于保存所有服务实例
-
upServerList用于保存所有在线服务实例
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());
-
定义负载均衡默认策略为轮询
private final static IRule DEFAULT_RULE = new RoundRobinRule();
protected IRule rule = DEFAULT_RULE;
-
IPingStrategy表示服务检查策略,用于检查服务是否可用;默认的服务检查策略为SerialPingStrategy,SerialPingStrategy中的pingServers方法就是遍历所有服务实例,一个个发送ping请求,查看服务是否有效。
private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
-
BaseLoadBalancer构造函数中启动了一个PingTask,PingTask每隔10秒钟会ping一次服务列表中的服务是否可用,PingTask中干的事情就是pingStrategy服务检查策略。
protected int pingIntervalSeconds = 10;public BaseLoadBalancer() {this.name = DEFAULT_NAME;this.ping = null;setRule(DEFAULT_RULE);setupPingTask();lbStats = new LoadBalancerStats(DEFAULT_NAME);
}void setupPingTask() {if (canSkipPing()) {return;}if (lbTimer != null) {lbTimer.cancel();}lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name,true);lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);forceQuickPing();
}
4.4 DynamicServerListLoadBalancer
comflix.loadbalancer.DynamicServerListLoadBalancer类继承了comflix.loadbalancer.BaseLoadBalancer,因此DynamicServerListLoadBalancer类主要是对BaseLoadBalancer类功能进行扩展,DynamicServerListLoadBalancer类源码比较复杂,但是有几个点是比较重要的。
-
serverListImpl是DynamicServerListLoadBalancer中声明的ServerList类型的变量,ServerList接口中定义了两个方法
volatile ServerList<T> serverListImpl;
-
getInitialListOfServers方法用于获取所有初始化服务列表
-
getUpdatedListOfServers方法用于获取更新的服务实例列表
public interface ServerList<T extends Server> {public List<T> getInitialListOfServers();public List<T> getUpdatedListOfServers(); }
-
ServerList接口有5个实现类,DynamicServerListLoadBalancer默认实现是DomainExtractingServerList,但是DomainExtractingServerList构造函数中传入的是DiscoveryEnabledNIWSServerList(可以看我下面Debug的图),因此可以看出重点类其实是DiscoveryEnabledNIWSServerList
-
DiscoveryEnabledNIWSServerList类中一个比较重要的方法是obtainServersViaDiscovery方法,可以从名字看出这是通过注册中心获取服务列表,代码中可以看出依赖 EurekaClient从服务注册中心中获取具体的服务实例InstanceInfo
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {logger.warn("EurekaClient has not been initialized yet, returning an empty list");return new ArrayList<DiscoveryEnabledServer>();}EurekaClient eurekaClient = eurekaClientProvider.get();if (vipAddresses!=null){for (String vipAddress : vipAddresses.split(",")) {// if targetRegion is null, it will be interpreted as the same region of clientList<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);for (InstanceInfo ii : listOfInstanceInfo) {if (ii.getStatus().equals(InstanceStatus.UP)) {if(shouldUseOverridePort){if(logger.isDebugEnabled()){logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);}// copy is necessary since the InstanceInfo builder just uses the original reference,// and we don't want to corrupt the global eureka copy of the object which may be// used by other clients in our systemInstanceInfo copy = new InstanceInfo(ii);if(isSecure){ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();}else{ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();}}DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);serverList.add(des);}}if (serverList.size()>0 && prioritizeVipAddressBasedServers){break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers}}}return serverList;}
-
DiscoveryEnabledNIWSServerList类中另一个比较重要点是定义了一个ServerListUpdater.UpdateAction更新器,该更新器用于更新服务信息。ServerListUpdater提供两个实现类comflix.niws.loadbalancer.EurekaNotificationServerListUpdater和comflix.loadbalancer.PollingServerListUpdater;其中EurekaNotificationServerListUpdater通过Eureka的事件监听机制来更新服务信息;而此处默认的是PollingServerListUpdater定时任务更新机制。
-
PollingServerListUpdater代码中可以看出定时任务延迟启动initialDelayMs为1秒,刷新频率refreshIntervalMs为30秒
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs; public PollingServerListUpdater() {this(LISTOFSERVERS_CACHE_UPDATE_DELAY, LISTOFSERVERS_CACHE_REPEAT_INTERVAL);
}public PollingServerListUpdater(final long initialDelayMs, final long refreshIntervalMs) {this.initialDelayMs = initialDelayMs;this.refreshIntervalMs = refreshIntervalMs;
}public synchronized void start(final UpdateAction updateAction) {if (isActivepareAndSet(false, true)) {final Runnable wrapperRunnable = new Runnable() {@Overridepublic void run() {if (!isActive.get()) {if (scheduledFuture != null) {scheduledFuture.cancel(true);}return;}try {updateAction.doUpdate();lastUpdated = System.currentTimeMillis();} catch (Exception e) {logger.warn("Failed one update cycle", e);}}};scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable,initialDelayMs,refreshIntervalMs,TimeUnit.MILLISECONDS);} else {logger.info("Already active, no-op");}
}
-
在DynamicServerListLoadBalancer定义了一个变量ServerListFilter,可以看到在updateListOfServers方法中,会判断filter是否为空,然后对getUpdatedListOfServers获取到的服务列表servers执行getFilteredListOfServers方法,其实就是对服务列表根据ServerListFilter接口的实现类逻辑进行过滤。
volatile ServerListFilter<T> filter;public void updateListOfServers() {List<T> servers = new ArrayList<T>();if (serverListImpl != null) {servers = serverListImpl.getUpdatedListOfServers();LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);if (filter != null) {servers = filter.getFilteredListOfServers(servers);LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",getIdentifier(), servers);}}updateAllServerList(servers);
}
-
ServerListFilter的实现类如下所示,默认的实现类是DefaultNIWSServerListFilter,但是DefaultNIWSServerListFilter啥也没有,仅仅继承了ZoneAffinityServerListFilter;因此具体的功能还是在ZoneAffinityServerListFilter中实现,而ZoneAffinityServerListFilter主要提供的是对服务提供者所处的Zone和服务消费者所在的Zone进行比较,过滤掉不在一个Zone的实例。
4.5 ZoneAwareLoadBalancer
comflix.loadbalancer.ZoneAwareLoadBalancer是comflix.loadbalancer.DynamicServerListLoadBalancer的唯一子类,在DynamicServerListLoadBalancer中还有一个非常重要的方法没有实现,那就是chooseServer方法。chooseServer用于负载均衡器选择服务器进行调用,因此ZoneAwareLoadBalancer的出现就是解决这个问题。此外ZoneAwareLoadBalancer重写了setServerListForZones方法,setServerListForZones方法getLoadBalancer(zone)用于创建负载均衡器; existingLBEntry.getValue().setServersList(Collections.emptyList())用于清除不包含server的zone
protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) {super.setServerListForZones(zoneServersMap);if (balancers == null) {balancers = new ConcurrentHashMap<String, BaseLoadBalancer>();}for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) {String zone = entry.getKey().toLowerCase();getLoadBalancer(zone).setServersList(entry.getValue());}// check if there is any zone that no longer has a server// and set the list to empty so that the zone related metrics does not// contain stale datafor (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) {if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {existingLBEntry.getValue().setServersList(Collections.emptyList());}}
}
👇🏻 关注公众号 我们一起进大厂👇🏻
发布评论