客户端弹性、断路器模式
客户端弹性
客户端弹性模式
- 远程服务发生错误或表现不佳导致的问题:客户端长时间等待调用返回
- 客户端弹性模式要解决的重点:让客户端免于崩溃。
- 目标:让客户端快速失败,而不消耗数据库连接或线程池之类的宝贵资源,防止远程服务的问题向客户端上游传播。
4种客户端弹性模式
客户端负载均衡(client load banlance)模式
- Ribbon提供的负载均衡器,帮助发现问题,并删除实例
断路器模式(Circuit Breaker Patten)
后备(fallback)模式
舱壁隔离模式(Bulkhead Isolation Pattern)
线程池充当服务的舱壁
当船的某些船舱进水、不影响其他船舱

Hystrix
Hystrix是一个延迟和容错库,旨在隔离对远程系统,服务和第三方库的访问点,停止级联故障,并在不可避免发生故障的复杂分布式系统中实现弹性。
使用Hystrix(客户端)
客户端添加依赖
1 2 3 4
| <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-hystrix</artifactId> </dependency>
|
启动类加注解:@EnableCircuitBreaker
用断路器包装远程资源调用,方法加注解:@HystrixCommand
加了此注解之后,该方法的调用会由另外一个单独的线程来处理
Tomcat正常的处理逻辑:
1 2 3
| request -> filter chain -> controller -> service -> dao ... 然后从右往左看:response <- filter <- controller <- service <- dao ... 以上所有的处理都是由一个线程完成的
|
而加了注解的方法(一般在service中),会单独分配线程。
默认1秒超时,超时会抛异常
com.netflix.hystrix.exception.HystrixRuntimeException
(返回的状态码为500)
自定义参数
后备模式
1 2 3 4 5 6 7
| @HystrixCommand( fallbackMethod = "buildFallbackLicenseList", // 备用方法。值为方法名,该方法的签名必须与本方法一致且位于同类下 commandProperties = { // 设置超时时间,也可以在配置文件application.yml中配置 @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "12000"), } )
|
客户端负载均衡模式
启动类添加bean:
1 2 3 4
| @Bean public IRule ribbonRule() { return new RandomRule(); }
|
舱壁隔离模式(Bulkhead Isolation Pattern)
将同类的方法调用放在同一个线程池中。
1 2 3 4 5 6 7 8 9
| @HystrixCommand( // 线程池 // 缺省的线程池有10个所有请求共用的线程(所以需要自定义线程池key来隔离其他线程池) threadPoolKey = "licenseByOrgThreadPool", // 线程池key threadPoolProperties = { // 线程池的一些配置 @HystrixProperty(name = "coreSize", value = "30"), // 线程池大小 @HystrixProperty(name = "maxQueueSize", value = "10") // 请求队列的最大值 } )
|
传递关联ID(correlation ID)
- 关联ID是唯一标识符,可用于在单个事务中跨多个服务调用进行跟踪
- 通过HTTP Header传递
- 通过实现过滤器拦截rest服务请求获取上游来的header属性
- 调用rest服务前使用ClientHttpRequestInterceptor或RequestInterceptor添加header属性,使传递到下游
传递的本质就是用户在发送请求时必须携带一个ID,发送给客户端时,客户端通过在一个自定义的filter中保存起来
然后再定义一个request intercepter,让客户端向外(服务端)发送请求时自动携带这个ID,这样服务端就能收到了
Filter定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Component public class UserContextFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(UserContextFilter.class);
@Override public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {
HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;
UserContextHolder.getContext().setCorrelationId( httpServletRequest.getHeader(UserContext.CORRELATION_ID) ); UserContextHolder.getContext().setUserId(httpServletRequest.getHeader(UserContext.USER_ID)); UserContextHolder.getContext().setAuthToken(httpServletRequest.getHeader(UserContext.AUTH_TOKEN)); UserContextHolder.getContext().setOrgId(httpServletRequest.getHeader(UserContext.ORG_ID));
logger.debug("UserContextFilter Correlation id: {}", UserContextHolder.getContext().getCorrelationId());
filterChain.doFilter(httpServletRequest, servletResponse); }
@Override public void init(FilterConfig filterConfig) throws ServletException {}
@Override public void destroy() {} }
|
Intercepter定义(两种,二选一)
支持Ribbon的RestTemplate
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import org.springframework.http.client.ClientHttpRequestExecution; import org.springframework.http.client.ClientHttpRequestInterceptor; import org.springframework.http.client.ClientHttpResponse;
@Component public class UserContextInterceptor implements ClientHttpRequestInterceptor { private static final Logger logger = LoggerFactory.getLogger(UserContextInterceptor.class); @Override public ClientHttpResponse intercept( HttpRequest request, byte[] body, ClientHttpRequestExecution execution) throws IOException {
HttpHeaders headers = request.getHeaders(); headers.add(UserContext.CORRELATION_ID, UserContextHolder.getContext().getCorrelationId());
headers.add(UserContext.AUTH_TOKEN, UserContextHolder.getContext().getAuthToken()); logger.debug("=================UserContextInterceptor class Correlation id: {}" ,UserContextHolder.getContext().getCorrelationId());
return execution.execute(request, body); } }
|
Feign
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import feign.Request; import feign.RequestInterceptor; import feign.RequestTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class MyRequestInterceptor implements RequestInterceptor { private static final Logger logger = LoggerFactory.getLogger(MyRequestInterceptor.class);
@Autowired private UserContext userContext;
@Override public void apply(RequestTemplate requestTemplate) {
Request request = requestTemplate.request(); String url = request.url();
requestTemplate.header(UserContext.CORRELATION_ID, userContext.getCorrelationId()); requestTemplate.header(UserContext.AUTH_TOKEN, userContext.getAuthToken());
logger.debug("==========MyRequestInterceptor class Correlation id: {}" , UserContextHolder.getContext().getCorrelationId());
} }
|
问题:全局关联的ID会被其他线程复写*
解决:ThreadLocal
ThreadLocal是JDK包提供的,它提供线程本地变量,如果创建一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题

代码改写
将保存关联ID的filter用ThreadLocal保存即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class UserContextHolder { private static final ThreadLocal<UserContext> userContext = new ThreadLocal<UserContext>();
public static final UserContext getContext(){ UserContext context = userContext.get();
if (context == null) { context = createEmptyContext(); userContext.set(context);
} return userContext.get(); }
public static final void setContext(UserContext context) { Assert.notNull(context, "Only non-null UserContext instances are permitted"); userContext.set(context); }
public static final UserContext createEmptyContext(){ return new UserContext(); } }
|
ThreadLocal的新问题*
一个线程中存储的数据无法被其他线程读取。
即:原本filter->controller->...的线程
与加了@HystrixCommand
注解的方法所在的新线程之间无法通信,也就无法传递关联ID到其他的服务
解决:section14/licensing-service/src/main/java/.../hystrix
包下面的三个类,比较复杂,有兴趣的可自行查看。
本质是解决了一个ThreadLocal传递到另外一个ThreadLocal
断路器模式*
断路器模式**(Circuit Breaker Patten)**
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @HystrixCommand( commandProperties = { // 最小的失败请求数目,数目越多情况越严重 @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "3"), // 失败的比例(这里是10%,对应下图的第二个菱形) @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10"), // 默认是5秒(第二行的窗口期) @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "7000"), // 默认是10秒(10 000毫秒)时间窗口,这里设置为30秒 @HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "30000"), // 窗口期内安排多少个桶来收集数据 @HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "5") } )
|

@HystrixCommand注解配置
属性名称 |
默认值 |
说明 |
fallbackMethod |
None |
备用方法 |
threadPoolKey |
None |
线程池 |
threadPoolProperties |
None |
线程池key |
coreSize |
10 |
线程池相关属性 |
maxQueueSize |
-1 |
线程池维持队列 |
circuitBreaker.requestVolumeThreshold |
20 |
|
circuitBreaker.errorThresholdPercentage |
50 |
|
circuitBreaker.sleepWindowInMilliseconds |
5000 |
|
metricsRollingStats.timeInMilliseconds |
10000 |
|
metricsRollingStats.numBuckets |
10 |
|