当前位置: 移动技术网 > IT编程>开发语言>Java > Hystrix工作流程解析

Hystrix工作流程解析

2019年11月04日  | 移动技术网IT编程  | 我要评论
搭建hystrix源码阅读环境

引入依赖

        <dependency>
            <groupid>com.netflix.hystrix</groupid>
            <artifactid>hystrix-core</artifactid>
            <version>1.5.12</version>
        </dependency>

创建command

public class hellocommand extends hystrixcommand<string> {

    public hellocommand() {
        super(hystrixcommandgroupkey.factory.askey("test"));
    }

    @override
    protected string run() throws exception {
        thread.sleep(800);
        return "sucess";
    }

    @override
    protected string getfallback() {
        system.out.println("执行了回退方法");
        return "error";
    }

}

创建测试类

public class commandtest {
    public static void main(string[] args) {
        hellocommand command = new hellocommand();
        string result = command.execute();
        system.out.println(result);
    }
}
hystrix工作流程

file

首先我们看一下上方的这张图,这个图完整的描述了hystrix的工作流程:
1.每次调用都会创建一个hystrixcommand
2.执行execute或queue做同步\异步调用
3.判断熔断器是否打开,如果打开跳到步骤8,否则进入步骤4
4.判断线程池/信号量是否跑满,如果跑满进入步骤8,否则进入步骤5
5.调用hystrixcommand的run方法,如果调用超时进入步骤8
6.判断是否调用成功,返回成功调用结果,如果失败进入步骤8
7.计算熔断器状态,所有的运行状态(成功, 失败, 拒绝,超时)上报给熔断器,用于统计从而判断熔断器状态
8.降级处理逻辑,根据上方的步骤可以得出以下四种情况会进入降级处理:

  1. 熔断器打开
  2. 线程池/信号量跑满
  3. 调用超时
  4. 调用失败

9.返回执行成功结果

创建hystrixcommand

接着我们结合源码看一下这个调用流程,直接执行测试类的main方法,可以看到入口就在execute方法上

    public r execute() {
        try {
            return queue().get();
        } catch (exception e) {
            throw exceptions.sneakythrow(decomposeexception(e));
        }
    }
执行同步方法
public future<r> queue() {
        final future<r> delegate = toobservable().toblocking().tofuture();
        //省略。。。
};

接着看toobservable()方法

 public observable<r> toobservable() {
        //省略。。。
        return observable.defer(new func0<observable<r>>() {
            @override
            public observable<r> call() {
                if (!commandstate.compareandset(commandstate.not_started, commandstate.observable_chain_created)) {
                    illegalstateexception ex = new illegalstateexception("this instance can only be executed once. please instantiate a new instance.");
                    //todo make a new error type for this
                    throw new hystrixruntimeexception(failuretype.bad_request_exception, _cmd.getclass(), getlogmessageprefix() + " command executed multiple times - this is not permitted.", ex, null);
                }

                commandstarttimestamp = system.currenttimemillis();

                if (properties.requestlogenabled().get()) {
                    // log this command execution regardless of what happened
                    if (currentrequestlog != null) {
                        currentrequestlog.addexecutedcommand(_cmd);
                    }
                }

                final boolean requestcacheenabled = isrequestcachingenabled();
                final string cachekey = getcachekey();
                //如果开启请求缓存则查询缓存是否存在
                if (requestcacheenabled) {
                    hystrixcommandresponsefromcache<r> fromcache = (hystrixcommandresponsefromcache<r>) requestcache.get(cachekey);
                    if (fromcache != null) {
                        isresponsefromcache = true;
                        return handlerequestcachehitandemitvalues(fromcache, _cmd);
                    }
                }

                observable<r> hystrixobservable =
                        observable.defer(applyhystrixsemantics)
                                .map(wrapwithallonnexthooks);

                observable<r> aftercache;

                if (requestcacheenabled && cachekey != null) { 
                    hystrixcachedobservable<r> tocache = hystrixcachedobservable.from(hystrixobservable, _cmd);
                    hystrixcommandresponsefromcache<r> fromcache = (hystrixcommandresponsefromcache<r>) requestcache.putifabsent(cachekey, tocache);
                    if (fromcache != null) {
                        // another thread beat us so we'll use the cached value instead
                        tocache.unsubscribe();
                        isresponsefromcache = true;
                        return handlerequestcachehitandemitvalues(fromcache, _cmd);
                    } else {
                        // we just created an observablecommand so we cast and return it
                        aftercache = tocache.toobservable();
                    }
                } else {
                    aftercache = hystrixobservable;
                }

                return aftercache
                        .doonterminate(terminatecommandcleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                        .doonunsubscribe(unsubscribecommandcleanup) // perform cleanup once
                        .dooncompleted(fireoncompletedhook);
            }
        });
    }

在上面这个方法中会有一个缓存的判断,如果存在缓存的话直接返回结果,否则进入方法applyhystrixsemantics方法

判断熔断器和线程池以及信号量
private observable<r> applyhystrixsemantics(final abstractcommand<r> _cmd) {
        executionhook.onstart(_cmd);

        /* determine if we're allowed to execute */
        //判断是否开启熔断器
        if (circuitbreaker.attemptexecution()) {
            //获取信号量实例
            final tryablesemaphore executionsemaphore = getexecutionsemaphore();
            final atomicboolean semaphorehasbeenreleased = new atomicboolean(false);
            final action0 singlesemaphorerelease = new action0() {
                @override
                public void call() {
                    if (semaphorehasbeenreleased.compareandset(false, true)) {
                        executionsemaphore.release();
                    }
                }
            };

            final action1<throwable> markexceptionthrown = new action1<throwable>() {
                @override
                public void call(throwable t) {
                    eventnotifier.markevent(hystrixeventtype.exception_thrown, commandkey);
                }
            };
            //尝试获取信号量
            if (executionsemaphore.tryacquire()) {
                try {
                    /* used to track userthreadexecutiontime */
                    executionresult = executionresult.setinvocationstarttime(system.currenttimemillis());
                    return executecommandandobserve(_cmd)
                            .doonerror(markexceptionthrown)
                            .doonterminate(singlesemaphorerelease)
                            .doonunsubscribe(singlesemaphorerelease);
                } catch (runtimeexception e) {
                    return observable.error(e);
                }
            } else {
                //拒绝
                return handlesemaphorerejectionviafallback();
            }
        } else {
            //失败
            return handleshortcircuitviafallback();
        }
    }

applyhystrixsemantics方法中,首先会判断是否开启熔断器,如果开启则直接进入失败处理的逻辑。否则会尝试获取信号量(如果使用的是线程池的模式则默认获取成功),获取成功进入executecommandandobserve方法

判断超时
private observable<r> executecommandandobserve(final abstractcommand<r> _cmd) {
        final hystrixrequestcontext currentrequestcontext = hystrixrequestcontext.getcontextforcurrentthread();
        //省略...

        //判断是否开启超时设置
        if (properties.executiontimeoutenabled().get()) {
           //list添加超时操作
            execution = executecommandwithspecifiedisolation(_cmd)
                    .lift(new hystrixobservabletimeoutoperator<r>(_cmd));
        } else {
            execution = executecommandwithspecifiedisolation(_cmd);
        }

这里如果设置超时时间的话则会加上一个超时的处理,接着看executecommandwithspecifiedisolation方法

private observable<r> executecommandwithspecifiedisolation(final abstractcommand<r> _cmd) {
        if (properties.executionisolationstrategy().get() == executionisolationstrategy.thread) {
            return observable.defer(new func0<observable<r>>() {
                @override
                public observable<r> call() {
                    executionresult = executionresult.setexecutionoccurred();
                    if (!commandstate.compareandset(commandstate.observable_chain_created, commandstate.user_code_executed)) {
                        return observable.error(new illegalstateexception("execution attempted while in state : " + commandstate.get().name()));
                    }
                    metrics.markcommandstart(commandkey, threadpoolkey, executionisolationstrategy.thread);

                    if (iscommandtimedout.get() == timedoutstatus.timed_out) {
                        return observable.error(new runtimeexception("timed out before executing run()"));
                    }
                    if (threadstate.compareandset(threadstate.not_using_thread, threadstate.started)) {
                        hystrixcounters.incrementglobalconcurrentthreads();
                        threadpool.markthreadexecution();
                        // store the command that is being run
                        endcurrentthreadexecutingcommand = hystrix.startcurrentthreadexecutingcommand(getcommandkey());
                        executionresult = executionresult.setexecutedinthread();

                        try {
                            executionhook.onthreadstart(_cmd);
                            executionhook.onrunstart(_cmd);
                            executionhook.onexecutionstart(_cmd);
                            return getuserexecutionobservable(_cmd);
                        } catch (throwable ex) {
                            return observable.error(ex);
                        }
                    } else {
                        return observable.empty();
                    }
                }
            }).doonterminate(new action0() {
                @override
                public void call() {
                    if (threadstate.compareandset(threadstate.started, threadstate.terminal)) {
                        handlethreadend(_cmd);
                    }
                    if (threadstate.compareandset(threadstate.not_using_thread, threadstate.terminal)) {
                    }
                }
            }).doonunsubscribe(new action0() {
                @override
                public void call() {
                    if (threadstate.compareandset(threadstate.started, threadstate.unsubscribed)) {
                        handlethreadend(_cmd);
                    }
                    if (threadstate.compareandset(threadstate.not_using_thread, threadstate.unsubscribed)) {
                    }
                }
            }).subscribeon(threadpool.getscheduler(new func0<boolean>() {
                @override
                public boolean call() {
                    return properties.executionisolationthreadinterruptontimeout().get() && _cmd.iscommandtimedout.get() == timedoutstatus.timed_out;
                }
            }));
        } else {
            return observable.defer(new func0<observable<r>>() {
                @override
                public observable<r> call() {
                    executionresult = executionresult.setexecutionoccurred();
                    if (!commandstate.compareandset(commandstate.observable_chain_created, commandstate.user_code_executed)) {
                        return observable.error(new illegalstateexception("execution attempted while in state : " + commandstate.get().name()));
                    }

                    metrics.markcommandstart(commandkey, threadpoolkey, executionisolationstrategy.semaphore);
                    // semaphore isolated
                    // store the command that is being run
                    endcurrentthreadexecutingcommand = hystrix.startcurrentthreadexecutingcommand(getcommandkey());
                    try {
                        executionhook.onrunstart(_cmd);
                        executionhook.onexecutionstart(_cmd);
                        return getuserexecutionobservable(_cmd);  //the getuserexecutionobservable method already wraps sync exceptions, so this shouldn't throw
                    } catch (throwable ex) {
                        //if the above hooks throw, then use that as the result of the run method
                        return observable.error(ex);
                    }
                }
            });
        }
    }

这段代码比较长,具体的执行逻辑为:

  1. 进入方法会首先判断隔离策略,如果是使用的信号量模式则在当前线程上执行,否则进入下方的线程池逻辑
  2. 更改hystrixcommand的状态为user_code_executed
  3. 判断hystrixcommand的超时状态,如果超时则抛出异常
  4. 更改当前command的线程执行状态为started
  5. 调用getuserexecutionobservable执行具体的业务逻辑,也就是我们实现的那个run方法
  6. doonterminate:执行完毕后更改线程状态为terminal
  7. doonunsubscribe:当observable被取消订阅,更改线程状态为terminal
  8. subscribeon:指定scheduler

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网