hystrix线程切换导致threadLocal丢失问题及延申seata与hystrix/feign的整合逻辑

使用场景,往往我们使用theadLocal存取用户登陆信息,但是当开启hystrix时使用线程隔离模式,会使用对应线程池内的线程执行feignClient的方法,那么就会导致threadLocal丢失

通过百度以及看源码可以发现hystrix提供了HystrixPlugins,可以看到他的方法

我们先来看看HystrixPlugins暴露的方法

// 可以看到 HystrixPlugins 提供了很多东西,包括线程策略,钩子,事件,等等。。
// 首先第一感觉 钩子是可以用的
         HystrixConcurrencyStrategy hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();

首先看看HystrixCommandExecutionHook 的方法,以及执行时机

 /**
     * Invoked before {@link HystrixInvokable} begins executing.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.2
     */
    public <T> void onStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param failureType {@link FailureType} enum representing which type of error
     * @param e exception object
     *
     * @since 1.2
     */
    public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
        return e; //by default, just pass through
    }

    /**
     * Invoked when {@link HystrixInvokable} finishes a successful execution.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked at start of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
     *
     * @param commandInstance The executing HystrixCommand instance.
     *
     * @since 1.2
     */
    public <T> void onThreadStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked at completion of thread execution when {@link HystrixCommand} is executed using {@link ExecutionIsolationStrategy#THREAD}.
     * This will get invoked whenever the Hystrix thread is done executing, regardless of whether the thread finished
     * naturally, or was unsubscribed externally
     *
     * @param commandInstance The executing HystrixCommand instance.
     *
     * @since 1.2
     */
    public <T> void onThreadComplete(HystrixInvokable<T> commandInstance) {
        // do nothing by default
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} starts.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onExecutionEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param e exception object
     *
     * @since 1.4
     */
    public <T> Exception onExecutionError(HystrixInvokable<T> commandInstance, Exception e) {
        return e; //by default, just pass through
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onExecutionSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} starts.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.2
     */
    public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} emits a value.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param value value emitted
     *
     * @since 1.4
     */
    public <T> T onFallbackEmit(HystrixInvokable<T> commandInstance, T value) {
        return value; //by default, just pass through
    }

    /**
     * Invoked when the fallback method in {@link HystrixInvokable} fails with an Exception.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     * @param e exception object
     *
     * @since 1.2
     */
    public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
        //by default, just pass through
        return e;
    }

    /**
     * Invoked when the user-defined execution method in {@link HystrixInvokable} completes successfully.
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.4
     */
    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked when the command response is found in the {@link com.netflix.hystrix.HystrixRequestCache}.
     *
     * @param commandInstance The executing HystrixCommand
     *
     * @since 1.4
     */
    public <T> void onCacheHit(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

    /**
     * Invoked with the command is unsubscribed before a terminal state
     *
     * @param commandInstance The executing HystrixInvokable instance.
     *
     * @since 1.5.9
     */
    public <T> void onUnsubscribe(HystrixInvokable<T> commandInstance) {
        //do nothing by default
    }

通过查找文档以及 测试确定了我们需要的方法,这是我们可以写一个自己实现的钩子,但是这个钩子是用不同的执行时机的回调来实现,那么我们需要想办法把需要传递的信息从前面钩子的方法传到后面的钩子方法,简单的方式就是用ConcurrentMap来进行映射,但是没有合适的key,那么我们找到了一个类HystrixRequestContext,其实使用ConcurrentMap映射是一个比较蠢的方式,也就是实在没办法了再这样,正常应该是像TTL那样做一个装饰器,在成员变量里操作,或者统一的地方,那么我们又继续找到了HystrixRequestVariableDefault 这个类在操作HystrixRequestContext,再看一下源码,它是通过包内方法来操作HystrixRequestContext的threadLocal变量

  public void set(T value) {
// 这个方法没有暴露给我们使用,而是提供了一个操作器
        HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
    }

下面看看我们的钩子模样

public class MyHystrixHook extends HystrixCommandExecutionHook {

     private final HystrixRequestVariableDefault <Long> reuqestVariable = new HystrixRequestVariableDefault<>();
    @Override
     public <T> void onStart(HystrixInvokable<T> commandInstance) {
        //这里是hystrix 执行对应feignClient方法时开始时的钩子,那么就是在当前主线程内操作
// 先初始化 hystrix的线程私有变量容器
        HystrixRequestContext.initializeContext();
// 你要传递的 变量,其实就是从当前主线程的threadLocal中获取到的
         reuqestVariable.set(111L);
        
    }
    @Override
    public <T> Exception onError(HystrixInvokable<T> commandInstance, FailureType failureType, Exception e) {
        // 这里是你的 FeignClient方法执行失败后的回调钩子,需要清理你操作的数据
// 需要清理 子线程的threadLocal以及HystrixRequestContext
        HystrixRequestContext.getContextForCurrentThread().shutdown();
        return e; //by default, just pass through
    }

    @Override
    public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
               // 这里是你的 FeignClient方法执行成功后的回调钩子,需要清理你操作的数据
// 需要清理 子线程的threadLocal以及HystrixRequestContext
        HystrixRequestContext.getContextForCurrentThread().shutdown();
    }
@Override
 public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
// 这里已经是子线程了,执行目标 feignClient方法前
       Long tenantId = requestVariableDefault.get();
// 这里你可以获取到想要的信息,可以存到threadLocal保证后续方法的一致性
    }
@Override
 public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
        // 这里是熔断回调方法开始,也可以管理threadLocal的信息传递, 和上述onStart方法一致

    }
@Override
   public <T> Exception onFallbackError(HystrixInvokable<T> commandInstance, Exception e) {
        //by default, just pass through
// 熔断回调失败 的钩子,清理即可
        return e;
    }
@Override
    public <T> void onFallbackSuccess(HystrixInvokable<T> commandInstance) {
// 熔断回调成功 的钩子,清理即可
    }
}

同时怎么才能将我们自定义的钩子让框架去调用呢?

// 直接在项目启动时 调用这个方法 会报错,那么直接这样是不行的。
// 而且我们也可以看到HystrixPlugins.reset() 方法重置,那么再看看getInstance方法内部
HystrixPlugins.getInstance().registerCommandExecutionHook(你的钩子);

hystrix的spi体系

下面来看钩子的实例获取方法

    public HystrixCommandExecutionHook getCommandExecutionHook() {
// 这里可以看到hystrix各种类型的插件默认只有一个,但是我们可以在自定义的插件套一层装饰器来实现多个相同类型的插件对方法进行增强(seata集成hystrix中使用到此方式)
        if (commandExecutionHook.get() == null) {
            // check for an implementation from Archaius first
// 内部spi的方式获取实例
            Object impl = getPluginImplementation(HystrixCommandExecutionHook.class);
            if (impl == null) {
                // cas的方式来保证多个组件或实现只有一个实例正常放入
                commandExecutionHook.compareAndSet(null, HystrixCommandExecutionHookDefault.getInstance());
                // we don't return from here but call get() again in case of thread-race so the winner will always get returned
            } else {
                // we received an implementation from Archaius so use it
                commandExecutionHook.compareAndSet(null, (HystrixCommandExecutionHook) impl);
            }
        }
        return commandExecutionHook.get();
    }

    private <T> T getPluginImplementation(Class<T> pluginClass) {
// hystrix 自己提供的 配置文件spi方式获取实例
        T p = getPluginImplementationViaProperties(pluginClass, dynamicProperties);
        if (p != null) return p;        
// 利用jdk 自动的 serviceLoader方式获取实例
// serviceLoader可以参考 //www.greatytc.com/p/ccfd80d407ef
        return findService(pluginClass, classLoader);
    }

我们来看看hystrix配置方式的spi, 其实就是读取hystrix-plugins.properties中文件读取到key value,例如我们替换hook的配置就为
hystrix.plugin.HystrixConcurrencyStrategy.implementation=xxx.xxx.MyTestHystrix

    private static <T> T getPluginImplementationViaProperties(Class<T> pluginClass, HystrixDynamicProperties dynamicProperties) {
        String classSimpleName = pluginClass.getSimpleName();
        // Check Archaius for plugin class.
        String propertyName = "hystrix.plugin." + classSimpleName + ".implementation";
        String implementingClass = dynamicProperties.getString(propertyName, null).get();
        if (implementingClass != null) {
            try {
                Class<?> cls = Class.forName(implementingClass);
                // narrow the scope (cast) to the type we're expecting
                cls = cls.asSubclass(pluginClass);
                return (T) cls.newInstance();
            } catch (ClassCastException e) {
                throw new RuntimeException(classSimpleName + " implementation is not an instance of " + classSimpleName + ": " + implementingClass);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(classSimpleName + " implementation class not found: " + implementingClass, e);
            } catch (InstantiationException e) {
                throw new RuntimeException(classSimpleName + " implementation not able to be instantiated: " + implementingClass, e);
            } catch (IllegalAccessException e) {
                throw new RuntimeException(classSimpleName + " implementation not able to be accessed: " + implementingClass, e);
            }
        } else {
            return null;
        }
    }

关于seata集成hystrix以及feign的实现

上述使用钩子 + spi 替换自己的钩子方式实现,但是我们还可以观察到HystrixConcurrencyStrategy#wrapCallable方法,这不就是妥妥的一个线程执行器的装饰器预留的方法嘛?很显然也可以通过这种方式来实现

如果你引入了spring-cloud-starter-alibaba-seata的话,可以看到线程策略的一个实现com.alibaba.cloud.seata.feign.hystrix.SeataHystrixConcurrencyStrategy

下面来看看代码

public class SeataHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
    private final Logger logger = LoggerFactory
            .getLogger(SeataHystrixConcurrencyStrategy.class);
        // 这里是seata自定义线程策略的被装饰的对象,那么其实是允许多个同样插件存在,不过是通过装饰器包裹后层层增强
    private HystrixConcurrencyStrategy delegate;
    public SeataHystrixConcurrencyStrategy() {
        try {
// 这里通过spi获取实例
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof SeataHystrixConcurrencyStrategy) {
// 如果已经是当前实现则不做任何操作
                return;
            }
// 这里会重新获取所有其它插件,然后重置后统一再注册进去
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins
                    .getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance()
                    .getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance()
                    .getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance()
                    .getPropertiesStrategy();
            logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
                    propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance()
                .registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
        HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
    HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        }
        catch (Exception ex) {
            logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
        }
    }
    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
            HystrixMetricsPublisher metricsPublisher,
            HystrixPropertiesStrategy propertiesStrategy) {
        if (logger.isDebugEnabled()) {
            logger.debug("Current Hystrix plugins configuration is ["
                    + "concurrencyStrategy [" + this.delegate + "]," + "eventNotifier ["
                    + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
                    + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
            logger.debug("Registering Seata Hystrix Concurrency Strategy.");
        }
    }
    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
            HystrixProperty<Integer> corePoolSize,
            HystrixProperty<Integer> maximumPoolSize,
            HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue) {
        return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
                keepAliveTime, unit, workQueue);
    }
    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
            HystrixThreadPoolProperties threadPoolProperties) {
        return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
    }
    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.delegate.getBlockingQueue(maxQueueSize);
    }
    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(
            HystrixRequestVariableLifecycle<T> rv) {
        return this.delegate.getRequestVariable(rv);
    }
// 这里是上面提到的 线程执行的装饰增强
    @Override
    public <K> Callable<K> wrapCallable(Callable<K> c) {
// 如果是已经装饰了 提前返回
        if (c instanceof SeataContextCallable) {
            return c;
        }
        Callable<K> wrappedCallable;
        if (this.delegate != null) {
// 如果有其它的自定义的插件,需要再被装饰一层
            wrappedCallable = this.delegate.wrapCallable(c);
        }
        else {
            wrappedCallable = c;
        }
// 如果是已经装饰了 提前返回
        if (wrappedCallable instanceof SeataContextCallable) {
            return wrappedCallable;
        }
// 真正的对其装饰
        return new SeataContextCallable<>(wrappedCallable,
                RequestContextHolder.getRequestAttributes());
    }
    private static class SeataContextCallable<K> implements Callable<K> {
        private final Callable<K> actual;
// 哈哈,我们看到了 seata对线程策略的线程执行装饰的真正目的,用于传递seata全局事务id
        private final String xid;
        private final RequestAttributes requestAttributes;
        SeataContextCallable(Callable<K> actual, RequestAttributes requestAttribute) {
            this.actual = actual;
            this.requestAttributes = requestAttribute;
                      // 当前还是主线程在执行,所以直接从当前线程获取全局事务id
            this.xid = RootContext.getXID();
        }
        @Override
        public K call() throws Exception {
                      // 典型的装饰增强,这里已经是子线程在执行了
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
                RootContext.bind(xid);
                return actual.call();
            }
            finally {
                RootContext.unbind();
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

那么我们自己可不可以用他这种方式实现threadLocal传递呢,答案是可以的,但是要注意的是循环创建问题,我们添加了一个volatile 变量防止自定义插件的构造方法中通过spi获取实例形成循环创建实例
// 逻辑和seata的自定义插件基本一致
public class MyTestHystrix extends HystrixConcurrencyStrategy {

    private final Logger logger = LoggerFactory
        .getLogger(SeataHystrixConcurrencyStrategy.class);
    private HystrixConcurrencyStrategy delegate;
    private static volatile boolean alreadyInit = false;
    public MyTestHystrix() {
// 这里添加一个 volatile变量防止构造方法和spi获取实例形成循环
        if (alreadyInit) {
            return;
        }
        alreadyInit = true;
        try {
            this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.delegate instanceof MyTestHystrix) {
                return;
            }
            HystrixConcurrencyStrategy hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
            logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher,
                propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance()
                .registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        } catch (Exception ex) {
            logger.error("Failed to register Seata Hystrix Concurrency Strategy", ex);
        }
    }


    @Override
    public <K> Callable<K> wrapCallable(Callable<K> c) {
        if (c instanceof MyTestHystrix.MyContext) {
            return c;
        }

        Callable<K> wrappedCallable;
        if (this.delegate != null) {
            wrappedCallable = this.delegate.wrapCallable(c);
        } else {
            wrappedCallable = c;
        }
        if (wrappedCallable instanceof MyTestHystrix.MyContext) {
            return wrappedCallable;
        }

        return new MyTestHystrix.MyContext<>(wrappedCallable,
            RequestContextHolder.getRequestAttributes());
    }

    private static class MyContext<K> implements Callable<K> {

        private final Callable<K> actual;

        private final String tenantId;

        private final RequestAttributes requestAttributes;


        MyContext(Callable<K> actual, RequestAttributes requestAttribute) {
            this.actual = actual;
            this.requestAttributes = requestAttribute;
            this.tenantId = LoginInfoUtils.getTenantId();
        }

        @Override
        public K call() throws Exception {
            try {
                RequestContextHolder.setRequestAttributes(requestAttributes);
              // 这里就是手动的将当前子线程信息填充
                LoginInfoUtils.fillLoginInfo(null, this.tenantId, null);
                return actual.call();
            } finally {
              // 清理或者 还原(TTL的重放逻辑是还原)
                LoginInfoUtils.remove();
                RequestContextHolder.resetRequestAttributes();
            }
        }

    }
}

以上就是另一种方式来实现threadLocal的传递,但是据其它文章描述通过callable的wrap方式并不能覆盖到hystrix的fallback,而钩子的自定义实现可以,本人没有去考证,因为直接使用的钩子方式

seata集成feign以及hystrix

先看hystrix的集成
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(HystrixCommand.class)
public class SeataHystrixAutoConfiguration {
// 通过注入bean的方式 使用自定义的线程策略,这样的好处是可以通过装饰器来拼装多个自定义实现的插件
    @Bean
    SeataHystrixConcurrencyStrategy seataHystrixConcurrencyStrategy() {
        return new SeataHystrixConcurrencyStrategy();
    }

}
再来看feign的集成

再bean的注入会一层层的进行装饰器增强来完成我们需要的目标,即处理全局事务id


@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(Client.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class SeataFeignClientAutoConfiguration {
// 如果开启了hystrix,返回一个feign构造器
    @Bean
    @Scope("prototype")
    @ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")
    @ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")
    Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {
        return SeataHystrixFeignBuilder.builder(beanFactory);
    }
// 如果使用了阿里的 sentinal 返回一个feign构造器
    @Bean
    @Scope("prototype")
    @ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")
    @ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")
    Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {
        return SeataSentinelFeignBuilder.builder(beanFactory);
    }
// 如果没有开启hystrix和sentinal 返回一个只有feign装饰器的feign构造器实例
    @Bean
    @ConditionalOnMissingBean
    @Scope("prototype")
    Feign.Builder feignBuilder(BeanFactory beanFactory) {
        return SeataFeignBuilder.builder(beanFactory);
    }

    @Configuration(proxyBeanMethods = false)
    protected static class FeignBeanPostProcessorConfiguration {
// 一个 beanPostProcessor 对所有spring 管理的bean进行选择性的装饰
        @Bean
        SeataBeanPostProcessor seataBeanPostProcessor(
                SeataFeignObjectWrapper seataFeignObjectWrapper) {
            return new SeataBeanPostProcessor(seataFeignObjectWrapper);
        }
// 对于feignContext修饰的一个 拦截处理器
        @Bean
        SeataContextBeanPostProcessor seataContextBeanPostProcessor(
                BeanFactory beanFactory) {
            return new SeataContextBeanPostProcessor(beanFactory);
        }
// seata自己的真正选择某些bean进行装饰的类
        @Bean
        SeataFeignObjectWrapper seataFeignObjectWrapper(BeanFactory beanFactory) {
            return new SeataFeignObjectWrapper(beanFactory);
        }

    }

}
SeataContextBeanPostProcessor 和 SeataBeanPostProcessor 分别为feignContext,feignClient装饰,这两个都是利用了beanPostProcessor这个钩子所有spring管理的bean都会判断是否需要处理,如果是想要处理的bean则进行装饰

下面来看看SeataFeignObjectWrapper装饰逻辑

// 首先是一个包内可调用的方法,我们自己业务服务内是调用不到的噢,这是封装的一种保护机制
    Object wrap(Object bean) {
// 分别有两种feignClient进行装饰
        if (bean instanceof Client && !(bean instanceof SeataFeignClient)) {
            if (bean instanceof LoadBalancerFeignClient) {
                LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);
                return new SeataLoadBalancerFeignClient(client.getDelegate(), factory(),
                        clientFactory(), this);
            }
            if (bean instanceof FeignBlockingLoadBalancerClient) {
                FeignBlockingLoadBalancerClient client = (FeignBlockingLoadBalancerClient) bean;
                return new SeataFeignBlockingLoadBalancerClient(client.getDelegate(),
                        beanFactory.getBean(BlockingLoadBalancerClient.class), this);
            }
            return new SeataFeignClient(this.beanFactory, (Client) bean);
        }
        return bean;
    }

feignContext的装饰最后也是为了继续装饰feign,这种逻辑见到了很多,例如TTL 等一层层的装饰为了覆盖所有情况,保证目标要被装饰的实例各种情况一定被装饰到
下面看看装饰后干了些啥

public class SeataFeignClient implements Client {

    private final Client delegate;

    private final BeanFactory beanFactory;

    private static final int MAP_SIZE = 16;

    SeataFeignClient(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
        this.delegate = new Client.Default(null, null);
    }
    SeataFeignClient(BeanFactory beanFactory, Client delegate) {
// 被装饰的client
        this.delegate = delegate;
        this.beanFactory = beanFactory;
    }
    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
    // 调用时增强方法
        Request modifiedRequest = getModifyRequest(request);
        return this.delegate.execute(modifiedRequest, options);
    }

    private Request getModifyRequest(Request request) {
    // 目的明确还是处理 全局事务id
        String xid = RootContext.getXID();

        if (StringUtils.isEmpty(xid)) {
            return request;
        }

        Map<String, Collection<String>> headers = new HashMap<>(MAP_SIZE);
        headers.putAll(request.headers());

        List<String> seataXid = new ArrayList<>();
        seataXid.add(xid);
        headers.put(RootContext.KEY_XID, seataXid);

        return Request.create(request.method(), request.url(), headers, request.body(),
                request.charset());
    }

}

RestTemplate的拦截器,我们发现seata在装饰feignClient的同时也对RestTemplate进行了拦截,防止我们的项目直接使用RestTemplate进行接口调用
@Configuration(proxyBeanMethods = false)
public class SeataRestTemplateAutoConfiguration {

    @Bean
    public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {
// 注入我们的拦截器
        return new SeataRestTemplateInterceptor();
    }
      // RestTemplate可能存在多个
    @Autowired(required = false)
    private Collection<RestTemplate> restTemplates;

    @Autowired
    private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

    @PostConstruct
    public void init() {
        if (this.restTemplates != null) {
// 对所有注入的RestTemplate 进行拦截
            for (RestTemplate restTemplate : restTemplates) {
                List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(
                        restTemplate.getInterceptors());
                interceptors.add(this.seataRestTemplateInterceptor);
                restTemplate.setInterceptors(interceptors);
            }
        }
    }

}

我们再来看看SeataRestTemplateInterceptor

public class SeataRestTemplateInterceptor implements ClientHttpRequestInterceptor {

    @Override
    public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,
            ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
        HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);
              // 目标还是非常明确,对全局事务id进行处理,放入header中
        String xid = RootContext.getXID();

        if (!StringUtils.isEmpty(xid)) {
            requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);
        }
        return clientHttpRequestExecution.execute(requestWrapper, bytes);
    }

}
最后我们再看一眼 被调用接口如何处理吧,还是我们非常熟悉的HandlerInterceptor
public class SeataHandlerInterceptor implements HandlerInterceptor {

    private static final Logger log = LoggerFactory
            .getLogger(SeataHandlerInterceptor.class);

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
            Object handler) {
// 请求进入前取出header的 全局事务id放入 threadLocal中
        String xid = RootContext.getXID();
        String rpcXid = request.getHeader(RootContext.KEY_XID);
        if (log.isDebugEnabled()) {
            log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);
        }

        if (StringUtils.isBlank(xid) && rpcXid != null) {
            RootContext.bind(rpcXid);
            if (log.isDebugEnabled()) {
                log.debug("bind {} to RootContext", rpcXid);
            }
        }

        return true;
    }
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
            Object handler, Exception e) {
// 请求完成清理 threadLocal信息
        if (StringUtils.isNotBlank(RootContext.getXID())) {
            String rpcXid = request.getHeader(RootContext.KEY_XID);
            if (StringUtils.isEmpty(rpcXid)) {
                return;
            }
            String unbindXid = RootContext.unbind();
            if (log.isDebugEnabled()) {
                log.debug("unbind {} from RootContext", unbindXid);
            }
// 如果解绑和绑定的 全局事务id不同,则对后面的全局事务id再次进行绑定,存入threadLocal
// 这里可能是处理全局事务冲突的特殊情况,目前不太了解
            if (!rpcXid.equalsIgnoreCase(unbindXid)) {
                log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);
                if (unbindXid != null) {
                    RootContext.bind(unbindXid);
                    log.warn("bind {} back to RootContext", unbindXid);
                }
            }
        }
    }

}

解决hystrix跨线程threadLocal丢失总结

  1. 通过HystrixCommandExecutionHook + spi 配置让自定义钩子被使用,然后通过不同的执行时机处理threadLocal
  2. 具体传递threadLocal利用 hystrix内部的threadLocal重放机制,即HystrixRequestContext和HystrixRequestVariableDefault 的使用
  3. 也可以学习seata集成的方式,利用线程策略HystrixConcurrencyStrategy 的Callable 装饰方法进行装饰增强
  4. 通过@Bean注入自定义HystrixConcurrencyStrategy 插件后重置原有的注册,并且留了一个可装饰的口子,可以让多个自定义插件层层装饰(hystrix的本身的插件只允许存在一个)

seata继承hystrix和feign

  1. 如果开启了hystrix/sentinal等注入自定义的client构造装饰器,否则使用默认的装饰器
  2. 对hystrix线程切换定义了一个可多层装饰的 自定义HystrixConcurrencyStrategy 插件,通过HystrixConcurrencyStrategy#wrapCallable 进行增强
  3. 对feignContext,feignClient进行装饰,最终目标装饰为SeataFeignClient类将全局事务id放入header
  4. 对所有注入到spring容器内的RestTemplate进行拦截,将全局事务id放入header
  5. 通过spring mvc的HandlerInterceptor取出header的事务id放入threadLocal
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,277评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,689评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,624评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,356评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,402评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,292评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,135评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,992评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,429评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,636评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,785评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,492评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,092评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,723评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,858评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,891评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,713评论 2 354