客户端弹性、断路器模式

客户端弹性

客户端弹性模式

  • 远程服务发生错误或表现不佳导致的问题:客户端长时间等待调用返回
  • 客户端弹性模式要解决的重点:让客户端免于崩溃。
  • 目标:让客户端快速失败,而不消耗数据库连接或线程池之类的宝贵资源,防止远程服务的问题向客户端上游传播。

4种客户端弹性模式

  • 客户端负载均衡(client load banlance)模式

    • Ribbon提供的负载均衡器,帮助发现问题,并删除实例
  • 断路器模式(Circuit Breaker Patten)

    • 监视调用失败的次数,快速失败
  • 后备(fallback)模式

    • 远程服务调用失败,执行替代代码路径
  • 舱壁隔离模式(Bulkhead Isolation Pattern)

    • 线程池充当服务的舱壁

      当船的某些船舱进水、不影响其他船舱

    image-20220417215959994

Hystrix

Hystrix是一个延迟和容错库,旨在隔离对远程系统,服务和第三方库的访问点,停止级联故障,并在不可避免发生故障的复杂分布式系统中实现弹性。

使用Hystrix(客户端)

  1. 客户端添加依赖

    1
    2
    3
    4
    <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-hystrix</artifactId>
    </dependency>
  2. 启动类加注解:@EnableCircuitBreaker

  3. 用断路器包装远程资源调用,方法加注解:@HystrixCommand

    • 加了此注解之后,该方法的调用会由另外一个单独的线程来处理

      Tomcat正常的处理逻辑:

      1
      2
      3
      request -> filter chain -> controller -> service -> dao ...
      然后从右往左看:response <- filter <- controller <- service <- dao ...
      以上所有的处理都是由一个线程完成的

      而加了注解的方法(一般在service中),会单独分配线程。

    • 默认1秒超时,超时会抛异常

      com.netflix.hystrix.exception.HystrixRuntimeException(返回的状态码为500)

自定义参数

后备模式

1
2
3
4
5
6
7
@HystrixCommand(
fallbackMethod = "buildFallbackLicenseList", // 备用方法。值为方法名,该方法的签名必须与本方法一致且位于同类下
commandProperties = {
// 设置超时时间,也可以在配置文件application.yml中配置
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "12000"),
}
)

客户端负载均衡模式

启动类添加bean:

1
2
3
4
@Bean
public IRule ribbonRule() {
return new RandomRule(); // 随机策略,现有最好的策略
}

舱壁隔离模式(Bulkhead Isolation Pattern)

将同类的方法调用放在同一个线程池中。

1
2
3
4
5
6
7
8
9
@HystrixCommand(
// 线程池
// 缺省的线程池有10个所有请求共用的线程(所以需要自定义线程池key来隔离其他线程池)
threadPoolKey = "licenseByOrgThreadPool", // 线程池key
threadPoolProperties = { // 线程池的一些配置
@HystrixProperty(name = "coreSize", value = "30"), // 线程池大小
@HystrixProperty(name = "maxQueueSize", value = "10") // 请求队列的最大值
}
)

传递关联ID(correlation ID)

  • 关联ID是唯一标识符,可用于在单个事务中跨多个服务调用进行跟踪
  • 通过HTTP Header传递
  • 通过实现过滤器拦截rest服务请求获取上游来的header属性
  • 调用rest服务前使用ClientHttpRequestInterceptor或RequestInterceptor添加header属性,使传递到下游

传递的本质就是用户在发送请求时必须携带一个ID,发送给客户端时,客户端通过在一个自定义的filter中保存起来

然后再定义一个request intercepter,让客户端向外(服务端)发送请求时自动携带这个ID,这样服务端就能收到了

Filter定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Component
public class UserContextFilter implements Filter {
private static final Logger logger = LoggerFactory.getLogger(UserContextFilter.class);

@Override
public void doFilter(ServletRequest servletRequest,
ServletResponse servletResponse,
FilterChain filterChain) throws IOException, ServletException {

HttpServletRequest httpServletRequest = (HttpServletRequest) servletRequest;

UserContextHolder.getContext().setCorrelationId( httpServletRequest.getHeader(UserContext.CORRELATION_ID) );
UserContextHolder.getContext().setUserId(httpServletRequest.getHeader(UserContext.USER_ID));
UserContextHolder.getContext().setAuthToken(httpServletRequest.getHeader(UserContext.AUTH_TOKEN));
UserContextHolder.getContext().setOrgId(httpServletRequest.getHeader(UserContext.ORG_ID));

logger.debug("UserContextFilter Correlation id: {}", UserContextHolder.getContext().getCorrelationId());

filterChain.doFilter(httpServletRequest, servletResponse);
}

@Override
public void init(FilterConfig filterConfig) throws ServletException {}

@Override
public void destroy() {}
}

Intercepter定义(两种,二选一)

支持Ribbon的RestTemplate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;

@Component
public class UserContextInterceptor implements ClientHttpRequestInterceptor {
private static final Logger logger = LoggerFactory.getLogger(UserContextInterceptor.class);
@Override
public ClientHttpResponse intercept(
HttpRequest request, byte[] body, ClientHttpRequestExecution execution)
throws IOException {

HttpHeaders headers = request.getHeaders();
headers.add(UserContext.CORRELATION_ID, UserContextHolder.getContext().getCorrelationId());

headers.add(UserContext.AUTH_TOKEN, UserContextHolder.getContext().getAuthToken());
logger.debug("=================UserContextInterceptor class Correlation id: {}" ,UserContextHolder.getContext().getCorrelationId());

return execution.execute(request, body);
}
}
Feign
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import feign.Request;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MyRequestInterceptor implements RequestInterceptor {
private static final Logger logger = LoggerFactory.getLogger(MyRequestInterceptor.class);

@Autowired
private UserContext userContext;

@Override
public void apply(RequestTemplate requestTemplate) {

Request request = requestTemplate.request();
String url = request.url();

requestTemplate.header(UserContext.CORRELATION_ID, userContext.getCorrelationId());
requestTemplate.header(UserContext.AUTH_TOKEN, userContext.getAuthToken());

logger.debug("==========MyRequestInterceptor class Correlation id: {}" ,
UserContextHolder.getContext().getCorrelationId());

}
}

问题:全局关联的ID会被其他线程复写*

解决:ThreadLocal

ThreadLocal是JDK包提供的,它提供线程本地变量,如果创建一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题

image-20220417225900676

代码改写

将保存关联ID的filter用ThreadLocal保存即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class UserContextHolder {
private static final ThreadLocal<UserContext> userContext = new ThreadLocal<UserContext>();

public static final UserContext getContext(){
UserContext context = userContext.get();

if (context == null) {
context = createEmptyContext();
userContext.set(context);

}
return userContext.get();
}

public static final void setContext(UserContext context) {
Assert.notNull(context, "Only non-null UserContext instances are permitted");
userContext.set(context);
}

public static final UserContext createEmptyContext(){
return new UserContext();
}
}
ThreadLocal的新问题*

一个线程中存储的数据无法被其他线程读取。

即:原本filter->controller->...的线程与加了@HystrixCommand注解的方法所在的新线程之间无法通信,也就无法传递关联ID到其他的服务

解决:section14/licensing-service/src/main/java/.../hystrix 包下面的三个类,比较复杂,有兴趣的可自行查看。

本质是解决了一个ThreadLocal传递到另外一个ThreadLocal

断路器模式*

断路器模式**(Circuit Breaker Patten)**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@HystrixCommand(
commandProperties = {
// 最小的失败请求数目,数目越多情况越严重
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "3"),
// 失败的比例(这里是10%,对应下图的第二个菱形)
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "10"),
// 默认是5秒(第二行的窗口期)
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "7000"),
// 默认是10秒(10 000毫秒)时间窗口,这里设置为30秒
@HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "30000"),
// 窗口期内安排多少个桶来收集数据
@HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "5")
}
)

image-20220417231143990

@HystrixCommand注解配置

属性名称 默认值 说明
fallbackMethod None 备用方法
threadPoolKey None 线程池
threadPoolProperties None 线程池key
coreSize 10 线程池相关属性
maxQueueSize -1 线程池维持队列
circuitBreaker.requestVolumeThreshold 20
circuitBreaker.errorThresholdPercentage 50
circuitBreaker.sleepWindowInMilliseconds 5000
metricsRollingStats.timeInMilliseconds 10000
metricsRollingStats.numBuckets 10