Feign接口 多线程问题

Spring Cloud Feign传输Header,并保证多线程情况下也适用
一、现象
微服务在生产中,常遇到需要把 header 传递到下一子服务的情况(如服务A访问服务B的接口,需要传递header),网上大多数的方案是实现 RequestInterceptor 接口,在重写方法中,把 header 填进 Feign 的请求中。我们先按这种方式,简单实现代码如下:

1、继承RequestInterceptor
服务A新建类,继承 RequestInterceptor,把 header 设置到请求中,注意 header 的key若是大写时,请求中一般会被转为小写,所以建议header的key一般设置为小写。

 

package com.he.feign.config;import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;/*** <b>@Desc</b>:   1、继承RequestInterceptor,把header设置到请求中,注意header的key若是大写时,请求中会被转为小写* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Configuration
public class FeignConfig implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();//当主线程的请求执行完毕后,Servlet容器会被销毁当前的Servlet,因此在这里需要做判空if (attributes != null) {HttpServletRequest request = attributes.getRequest();Enumeration<String> headerNames = request.getHeaderNames();while (headerNames.hasMoreElements()) {String name = headerNames.nextElement();//不能把所有消息头都传递下去,否则会引起其他异常;header的name都是小写if (name.equals("feignheader")) {requestTemplate.header(name,request.getHeader(name));}}}}}


2、修改 hystrix 的隔离策略为 semaphore
RequestContextHolder.getRequestAttributes()方法,实际上是从ThreadLocal变量中取得相应信息的。hystrix断路器的默认隔离策略为THREAD,该策略是无法取得ThreadLocal值的,所以需要修改hystrix的隔离策略,一般是改为[semaphore],在服务A中的 yml 新增配置如下#2、hystrix 的隔离策略改为 SEMAPHORE

hystrix:command:default:execution:timeout:enable: trueisolation:strategy: SEMAPHOREthread:timeoutInMilleseconds: 60000


 
3、客户端A的测试代码
3.1、服务A的controller接口

 

package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>:   测试* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet容器不会销毁HttpServletRequest,//所以请求属性还保存在请求链路中,能被传递下去CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
}


3.2、Feign类
feignclient的注解可以省略configuration配置,即configuration = FeignConfig.class可不声明
 

package com.he.feign.feign;import com.he.feign.feign.hystrix.HeaderFeignFallback;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;/*** <b>@Desc</b>:   TODO* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
//@FeignClient(value = "eureka-client",path = "/header",fallback = HeaderFeignFallback.class,configuration = FeignConfig.class)//可以省略configuration配置
@FeignClient(value = "eureka-client",path = "/header",fallback = HeaderFeignFallback.class)
public interface HeaderFeign {@GetMapping("/test")String test();
}


 

package com.he.feign.feign.hystrix;import com.he.feign.feign.HeaderFeign;
import org.springframework.stereotype.Component;/*** <b>@Desc</b>:   TODO* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/1* <b>@Modify</b>:*/
@Component
public class HeaderFeignFallback implements HeaderFeign {@Overridepublic String test() {return null;}
} 

 


4、服务端B的接口代码

 

package com.he.eurekaclient.controller;import com.he.eurekaclient.feign.HelloFeign;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;/*** <b>@Desc</b>:   测试header传递* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@RequestMapping("/header")
@RestController
public class HeaderController {@Value("${spring.application.name}")private String appName;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignHeader-test@GetMapping("/test")public String test() {StringBuffer sb = new StringBuffer("hello from ").append(appName).append("\n");StringBuffer requestURL = servletRequest.getRequestURL();sb.append("requestURL: ").append(requestURL).append("\n");boolean isContain = false;sb.append("headers: \n");Enumeration<String> headerNames = servletRequest.getHeaderNames();//header的name都是小写while (headerNames.hasMoreElements()){String headername = headerNames.nextElement();String headerValue = servletRequest.getHeader(headername);sb.append(headername).append("-").append(headerValue).append("\n");if (headername.equals("feignheader")) isContain = true;}if (!isContain) {sb.append("--error--").append("not contain required header!");}return sb.toString();}
}


5、启动服务,在postman中测试如下
5.1、调用接口 http://localhost:8060/test_header/main_thread,结果如下

在这里插入图片描述
5.2、调用接口 http://localhost:8060/test_header/sub_thread ,结果如下

在这里插入图片描述
5.3、调用 http://localhost:8060/test_header/sub_thread/block,结果如下

在这里插入图片描述
从5.1 – 5.3的查询结果,可以得到结论

经过上述的配置后,用户线程(主线程)中调用非feign请求,可把header传递到服务B中;
若在用户线程(主线程)中启动子线程,并在子线程中调用feign请求,header传递不到服务B中;
即是子线程最终异步转同步阻塞等待结果,header仍传递不到服务B中。
二、网络上大多数的解决方案
出现上面的原因, 主要是 RequestAttributes 默认不是线程共享的;主线程调用子线程时,没把 RequestAttributes 共享给子线程。因此,只要在主线程调用其他线程前将RequestAttributes对象设置为子线程共享,就能把header等信息传递下去。

1、因此,网络上大多数的解决方案如下,在主线程调用子线程前,增加下面配置
RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承,线程共享
1
修改后的代码如下

package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestContextHolder;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>:   测试* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
}

 
2、重新启动服务A,再次调用两个带子线程的接口,现象如下
调用 http://localhost:8060/test_header/sub_thread/block,结果如下

在这里插入图片描述

调用接口 http://localhost:8060/test_header/sub_thread ,结果如下

在这里插入图片描述
测试结果,有以下两种现象

在主线程get()阻塞等待子线程执行完毕时,每次请求都成功;
主线程直接启动子线程,且执行完自己逻辑后便结束不需理会子线程结果的,请求偶尔成功, 偶尔失败;
这是为什么呢,作者认为主要是以下原因

Servlet容器中Servlet属性生命周期与接收请求的用户线程(父线程)同步, 随着父线程执行完destroy()而销毁;
子线程虽然可以从父线程共享信息中获得了请求属性,但这个属性由父线程维护
当父线程比子线程执行完慢时,请求属性还在,子线程请求成功;当快时,请求属性随着父线程结束而销毁,子线程的请求属性变为null,请求失败。
由此可见,简单的设置 RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);

在多线程情况下, 并非是一劳永逸的。

三、作者的解决方案
针对上面的问题,及问题根本原因,我们团队的解决方案仍是使用 ThreadLocal,进行线程间的变量共享通信。

1、新建 ThreadLocalUtil

package com.he.feign.thread;import java.util.HashMap;
import java.util.Map;/*** <b>@Desc</b>:   线程共享* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/22* <b>@Modify</b>:*/
public class ThreadLocalUtil {//使用InheritableThreadLocal,使得共享变量可被子线程继承private static final InheritableThreadLocal<Map<String,String>> headerMap = new InheritableThreadLocal<Map<String, String>>(){@Overrideprotected Map<String, String> initialValue() {return new HashMap<>();}};public static Map<String,String> get(){return headerMap.get();}public static String get(String key) {return headerMap.get().get(key);}public static void set(String key, String value){headerMap.get().put(key,value);}
} 


2、修改服务A 的接口 TestHeaderController

package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import com.he.feign.thread.ThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>:   测试* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {
//        RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承Enumeration<String> headerNames = servletRequest.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,servletRequest.getHeader(name));}}new Thread(() -> {new Thread(() -> {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}).start();}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
//        RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承Enumeration<String> headerNames = servletRequest.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,servletRequest.getHeader(name));}}CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
} 


3、修改服务A的 FeignConfig

package com.he.feign.config;import com.he.feign.thread.ThreadLocalUtil;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;import java.util.Map;/*** <b>@Desc</b>:   1、继承RequestInterceptor,把header设置到请求中,注意header的key若是大写时,请求中会被转为小写* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@Configuration
public class FeignConfig implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {
//        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
//        //当主线程的请求执行完毕后,Servlet会被销毁,因此在这里需要做判空
//        if (attributes != null) {
//            HttpServletRequest request = attributes.getRequest();
//
//            Enumeration<String> headerNames = request.getHeaderNames();
//
//            while (headerNames.hasMoreElements()) {
//                String name = headerNames.nextElement();
//                //不能把所有消息头都传递下去,否则会引起其他异常;header的name都是小写
//                if (name.equals("feignheader")) {
//                    requestTemplate.header(name,request.getHeader(name));
//                }
//            }
//        }//读取设置的header信息,传递到下一个服务Map<String, String> headerMap = ThreadLocalUtil.get();for (String key : headerMap.keySet()) {log.info("--从ThreadLocal获取消息头传递到下一个服务:key-[{}],value-[{}]",key,headerMap.get(key));requestTemplate.header(key,headerMap.get(key));}}
}

 
4、重启服务A,测试结果如下
4.1、连续调用 http://localhost:8060/test_header/sub_thread 接口,日志打印如下

2020-06-22 23:18:23.658  INFO 18236 --- [     Thread-131] com.he.feign.config.FeignConfig          : --从ThreadLocal获取消息头传递到下一个服务:key-[feignheader],value-[test]
2020-06-22 23:18:23.662  INFO 18236 --- [     Thread-131] c.h.f.controller.TestHeaderController    : resp: hello from eureka-client
requestURL: http://192.168.56.1:8200/header/test
headers:
feignheader-test
accept-*/*
user-agent-Java/1.8.0_162
host-192.168.56.1:8200
connection-keep-alive 

在这里插入图片描述
结合执行日志可知,header信息通过feign成功传递到下一个服务,而且不再出现偶尔失败的情况!

4.2、连续调用接口 http://localhost:8060/test_header/sub_thread/block

在这里插入图片描述
综上可见,真正解决从网关或者上层链路,把header经过feign传递到另一个服务,既要配置feign,也需要结合threadlocal。

下一步的优化,可设置拦截器或者切面,把header信息统一设置到threadlocal中。

package com.he.feign.config;import com.he.feign.thread.ThreadLocalUtil;
import org.springframework.web.servlet.HandlerInterceptor;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Enumeration;
import java.util.Objects;/*** <b>@Desc</b>:   拦截器* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/22* <b>@Modify</b>:*/
public class MyInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//拦截请求,设置header到ThreadLocal中Enumeration<String> headerNames = request.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,request.getHeader(name));}}return true;}} 
package com.he.feign.config;import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;/*** <b>@Desc</b>:   web配置* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/25* <b>@Modify</b>:*/
@Configuration
public class WebConfig extends WebMvcConfigurationSupport {@Overrideprotected void addInterceptors(InterceptorRegistry registry) {//添加自定义的拦截器registry.addInterceptor(new MyInterceptor()).addPathPatterns("/**");}
}

 
TestHeaderController修改如下

package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>:   测试* <b>@Author</b>: hesh* <b>@Date</b>:   2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Value("${server.port}")private String port;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {
//        RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承//        Enumeration<String> headerNames = servletRequest.getHeaderNames();
//        while (headerNames.hasMoreElements()){
//            String name = headerNames.nextElement();
//            if (Objects.equals(name,"feignheader")){
//                ThreadLocalUtil.set(name,servletRequest.getHeader(name));
//            }
//        }new Thread(() -> {new Thread(() -> {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}).start();}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
//        RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承//        Enumeration<String> headerNames = servletRequest.getHeaderNames();
//        while (headerNames.hasMoreElements()){
//            String name = headerNames.nextElement();
//            if (Objects.equals(name,"feignheader")){
//                ThreadLocalUtil.set(name,servletRequest.getHeader(name));
//            }
//        }CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: {}", resp);return resp;}}


以上,便是作者针对spring cloud feign 传递 header 信息在多线程情况下失败问题的解决方式,若有错误请指正,欢迎交流指导。
————————————————
版权声明:本文为CSDN博主「HE-RUNNING」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。

 

以上是刚开始使用的文章,但是随着使用线程池,就出现了问题,由于线程池是把线程回收,不是新建,就出现了在变量传递的时候,下次取到线程是从上一次父线程提供的共享变量导致了变量错乱问题。经过研究 阿里的解决方案出现在眼前

 

加入以下pom依赖:

<dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId><version>2.2.0</version>
</dependency>

  

转载改造hystrix线程池方法:

改造线程池方式

上面介绍了改造线程的方式,并且通过建一个同样的Java类来覆盖Jar包中的实现,感觉有点投机取巧,其实不用这么麻烦,Hystrix默认提供了HystrixPlugins类,可以让用户自定义线程池,下面来看看怎么使用:

在启动之前调用进行注册自定义实现的逻辑:

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

ThreadLocalHystrixConcurrencyStrategy就是我们自定义的创建线程池的类,需要继承HystrixConcurrencyStrategy,前面也有讲到通过调试代码发现最终获取线程池的代码就在HystrixConcurrencyStrategy中。

我们只需要重写getThreadPool方法即可完成对线程池的改造,由于TtlExecutors只能修饰ExecutorService和Executor,而HystrixConcurrencyStrategy中返回的是ThreadPoolExecutor,我们需要对ThreadPoolExecutor进行包装一层,最终在execute方法中对线程修饰,也就相当于改造了线程池。

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import com.netflix.hystrix.util.PlatformSpecific;public class ThreadLocalHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {private final static Logger logger = LoggerFactory.getLogger(ThreadLocalHystrixConcurrencyStrategy.class);@Overridepublic ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);final int dynamicCoreSize = corePoolSize.get();final int dynamicMaximumSize = maximumPoolSize.get();if (dynamicCoreSize > dynamicMaximumSize) {logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize+ ".  Maximum size will be set to " + dynamicCoreSize+ ", the coreSize value, since it must be equal to or greater than the coreSize value");return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit,workQueue, threadFactory);} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit,workQueue, threadFactory);}}@Overridepublic ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,HystrixThreadPoolProperties threadPoolProperties) {final ThreadFactory threadFactory = getThreadFactory(threadPoolKey);final boolean allowMaximumSizeToDivergeFromCoreSize = threadPoolProperties.getAllowMaximumSizeToDivergeFromCoreSize().get();final int dynamicCoreSize = threadPoolProperties.coreSize().get();final int keepAliveTime = threadPoolProperties.keepAliveTimeMinutes().get();final int maxQueueSize = threadPoolProperties.maxQueueSize().get();final BlockingQueue<Runnable> workQueue = getBlockingQueue(maxQueueSize);if (allowMaximumSizeToDivergeFromCoreSize) {final int dynamicMaximumSize = threadPoolProperties.maximumSize().get();if (dynamicCoreSize > dynamicMaximumSize) {logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name()+ " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize+ ".  Maximum size will be set to " + dynamicCoreSize+ ", the coreSize value, since it must be equal to or greater than the coreSize value");return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime,TimeUnit.MINUTES, workQueue, threadFactory);} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime,TimeUnit.MINUTES, workQueue, threadFactory);}} else {return new ThreadLocalThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime, TimeUnit.MINUTES,workQueue, threadFactory);}}private static ThreadFactory getThreadFactory(final HystrixThreadPoolKey threadPoolKey) {if (!PlatformSpecific.isAppEngineStandardEnvironment()) {return new ThreadFactory() {private final AtomicInteger threadNumber = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r,"hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());thread.setDaemon(true);return thread;}};} else {return PlatformSpecific.getAppEngineThreadFactory();}}
}

ThreadLocalThreadPoolExecutor的代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;public class ThreadLocalThreadPoolExecutor extends ThreadPoolExecutor {private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();public static TransmittableThreadLocal<Long> THREAD_LOCAL = new TransmittableThreadLocal<Long>();public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler);}public ThreadLocalThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {super(maximumPoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);}@Overridepublic void execute(Runnable command) {super.execute(TtlRunnable.get(command));}
}

启动时加入插件

HystrixPlugins.getInstance().registerConcurrencyStrategy(new ThreadLocalHystrixConcurrencyStrategy());

 使用方法:调用feign client服务之前,设置线程变量

ThreadLocalThreadPoolExecutor.THREAD_LOCAL.set(10086L);

 在FeignAuthConfiguration里,调用appTokenHolder.get();之前加入设置租户id

Long tenantId = ThreadLocalThreadPoolExecutor.THREAD_LOCAL.get();
DefaultAppTokenHolder.TENANT_FOR_NO_SESSION.set(tenantId);

  

  

使用线程变量三种方式测试:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;import com.alibaba.ttl.TransmittableThreadLocal;
import com.alibaba.ttl.TtlRunnable;public class Test {public static void main(String[] args) throws InterruptedException, ExecutionException {
//		testThreadLocal1();// testThreadLocal2();testThreadLocal3();}private static void testThreadLocal1() throws InterruptedException, ExecutionException {final ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {local.set(i + "");System.out.println(local.get());Future<?> future = executorService.submit(new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ":" + local.get());local.set(null);}});future.get();System.out.println(local.get());local.set(null);}}private static void testThreadLocal2() throws InterruptedException, ExecutionException {ThreadLocal<String> local = new java.lang.InheritableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {local.set(i + "");System.out.println(local.get());Future<?> future = executorService.submit(new ParamRunnable(i + ""));future.get();System.out.println(local.get());local.set(null);}}private static void testThreadLocal3() throws InterruptedException, ExecutionException {final TransmittableThreadLocal<String> context = new TransmittableThreadLocal<String>();ExecutorService executorService = Executors.newFixedThreadPool(1);for (int i = 0; i < 20; i++) {context.set(i + "");System.out.println(context.get());Future<?> future = executorService.submit(TtlRunnable.get(new Runnable() {public void run() {System.out.println(Thread.currentThread().getName() + ":" + context.get());context.set(null);}}));future.get();System.out.println(context.get());context.set(null);}}private static class ParamRunnable implements Runnable {private String param;public ParamRunnable(String param) {this.param = param;}@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + ":" + param);}}}

 


原文链接:https://blog.csdn.net/weishaoqi2/article/details/106964787

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/386453.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

我在华为做Android外包的真实经历!吊打面试官系列!

导语 本部分内容是关于Android进阶的一些知识总结&#xff0c;涉及到的知识点比较杂&#xff0c;不过都是面试中几乎常问的知识点&#xff0c;也是加分的点。 关于这部分内容&#xff0c;可能需要有一些具体的项目实践。在面试的过程中&#xff0c;结合具体自身实践经历&…

logstash windows

最新在研究elastic stack (elk) &#xff1a; logstash 安装&#xff0c;下载最新版本的logstash: 点击打开链接 解压到磁盘根目录下&#xff1a;在logstash>bin 1、目录下创建&#xff1a;logstash.conf 2、输入内容: # Sample Logstash configuration for creating …

H3C端口状态

转载于:https://www.cnblogs.com/fanweisheng/p/11153315.html

还有人不知道什么是AndroidX的吗?文末领取面试资料

谈起Android框架体系架构&#xff0c;我先提个问&#xff1a;什么是Android框架体系架构 &#xff1f; Android系统构架是安卓系统的体系结构&#xff0c;android的系统架构和其操作系统一样&#xff0c;采用了分层的架构&#xff0c;共分为四层&#xff0c;从高到低分别是And…

zookeeper+kafka+logstash+elasticsearc+kibana

研究背景 1、之所以选用kafka是因为量起来的话单台logstash的抗压能力比较差 2、为了解决整个链路查询的问题&#xff0c;多个Feign传层的话&#xff0c;可以按照一个ID进行穿层&#xff0c;所以采用logback的MDC进行对唯一标识存储并且在Feign的调用链放在Header里&#xff…

还没吃透内存缓存LruCache实现原理的看这篇文章,跳槽薪资翻倍

目前情况&#xff1a;10届某民办大学本科生&#xff0c;实际接触Android年限6年多了&#xff0c;工作年限五年半&#xff08;注意&#xff0c;我说的是工作年限&#xff0c;不是工作经验&#xff09;&#xff0c;今年1月份裸辞后歇了大半年&#xff0c;经常一周也收不到几个off…

利用 Docker 搭建单机的 Cloudera CDH 以及使用实践

利用 Docker 搭建单机的 Cloudera CDH 以及使用实践 想用 CDH 大礼包&#xff0c;于是先在 Mac 上和 Centos7.4 上分别搞个了单机的测试用。其实操作的流和使用到的命令差不多就一并说了: 首先前往官方下载包&#xff1a; https://www.cloudera.com/downloads/quickstart_vm…

还没吃透内存缓存LruCache实现原理的看这篇文章,面试必会

前言 这篇文章主要是分享今年上半年的面试心得&#xff0c;现已就职于某大厂有三个月了&#xff0c;近期有很多公司均已启动秋招&#xff0c;也祝大家在 2020 的下半年面试顺利&#xff0c;获得理想的offer&#xff01; 之前找工作的那段时间感想颇多&#xff0c;总结一点面试…

fastjson反序列化漏洞原理及利用

重要漏洞利用poc及版本 我是从github上的参考中直接copy的exp&#xff0c;这个类就是要注入的类 import java.lang.Runtime; import java.lang.Process; public class Exploit { public Exploit() { try{ // 要执行的命令 String commands "calc.exe"; Process pc …

这个回答让我错失offer!offer拿到手软

开头 每到“金三银四”的季节&#xff0c;总人很多人去寻找名叫“面经”一样的东西&#xff0c;其实就是一个个具体的题目&#xff0c;然后临阵磨枪&#xff0c;去“背”答案&#xff0c;如果一直是这样的话&#xff0c;我相信你的能力不会有任何提高&#xff0c;即使工作三年…

Spark Windows

本文主要是讲解Spark在Windows环境是如何搭建的 一、JDK的安装 1、1 下载JDK 首先需要安装JDK&#xff0c;并且将环境变量配置好&#xff0c;如果已经安装了的老司机可以忽略。JDK&#xff08;全称是JavaTM Platform Standard Edition Development Kit&#xff09;的安装&…

这个回答让我错失offer!成功收获美团,小米安卓offer

前言 我们移动开发程序员应该首先明白一个要点&#xff0c;能够学习的东西可以区分为『知识』和『技能』。 知识&#xff0c;就是你知道就知道、不知道就不知道的东西&#xff0c;比如『计算机系统中一个字节是包含8个bit』&#xff0c;你知道了之后就算掌握了。 技能&#…

这么香的技术还不快点学起来,不吃透都对不起自己

大家应该看过很多分享面试成功的经验&#xff0c;但根据幸存者偏差的理论&#xff0c;也许多看看别人面试失败在哪里&#xff0c;对自己才更有帮助。 最近跟一个朋友聊天&#xff0c;他准备了几个月&#xff0c;刚刚参加完字节跳动面试&#xff0c;第二面结束后&#xff0c;嗯&…

Unity3D热更新之LuaFramework篇[06]--Lua中是怎么实现脚本生命周期的

前言 用c#开发的时候&#xff0c;新建的脚本都默认继承自Monobehaviour, 因此脚本才有了自己的生命周期函数&#xff0c;如Awake,Start, Update, OnDestroy等。 在相应的方法中实现游戏逻辑&#xff0c;引擎会适时调用。 而Lua在这里做为c#的一个外延语言&#xff0c;自然是不受…

这么香的技术还不快点学起来,含BATJM大厂

前言 北京字节跳动科技有限公司成立于2012年3月&#xff0c;是最早将人工智能应用于移动互联网场景的科技企业之一。其独立研发的“今日头条”客户端&#xff0c;开创了一种全新的新闻阅读模式。 我一直很向往这样有创新精神&#xff0c;并做出了巨大成果的大公司&#xff0c…

这些Android高级必会知识点你能答出来几个?含BATJM大厂

前言 首先介绍一下自己&#xff0c;计算机水本&#xff0c;考研与我无缘。之前在帝都某公司算法部实习&#xff0c;公司算大公司吧&#xff0c;然而个人爱好偏开发&#xff0c;大二的时候写个一个app&#xff0c;主要是用各种框架。 一、掌握架构师筑基必备技能 二、掌握Andr…

这些年Android面试的那些套路,社招面试心得

前言 说不焦虑其实是假的&#xff0c;因为无论是现在还是最近几年&#xff0c;很早就有人察觉Android开发的野蛮生长时代已经过去。过去的优势是市场需要&#xff0c;这个技术少有人有&#xff0c;所以在抢占市场的时候&#xff0c;基本上满足需要就已经可以了。但是现在&…

安卓开发入门到精通!免费Android高级工程师学习资源,系列篇

前言 2017年进大学开始接触Android&#xff0c;从刚开始接触就不断地听到Android市场饱和&#xff0c;工作难找等消息。虽然当时也非常迷茫&#xff0c;不过由于第一次深入接触编程语言&#xff0c;再加上自己的一点兴趣&#xff0c;就一直坚持下来了。 到现在要毕业了&#…

安卓开发基础面试题,9次Android面试经验总结,面试必备

前言 上回承诺过大家&#xff0c;一定会出 HTTP 的系列文章&#xff0c;今天终于整理完成了。作为一个 web 开发&#xff0c;HTTP 几乎是天天要打交道的东西&#xff0c;但我发现大部分人对 HTTP 只是浅尝辄止&#xff0c;对更多的细节及原理就了解不深了&#xff0c;在面试的…

基于TCP的在线聊天程序

在线聊天服务端 import tkinter import tkinter.font as tkFont import socket import threading import time import sys class ServerUI():local127.0.0.1port5505global serverSock;flagFalsedef __init__(self):self.roottkinter.Tk()self.root.title(在线聊天-服务端v1.0)…