当前位置: 移动技术网 > IT编程>开发语言>Java > Tomcat源码分析 (八)----- HTTP请求处理过程(一)

Tomcat源码分析 (八)----- HTTP请求处理过程(一)

2019年08月20日  | 移动技术网IT编程  | 我要评论

终于进行到connector的分析阶段了,这也是tomcat里面最复杂的一块功能了。connector中文名为连接器,既然是连接器,它肯定会连接某些东西,连接些什么呢?

connector用于接受请求并将请求封装成request和response,然后交给container进行处理,container处理完之后再交给connector返回给客户端。

要理解connector,我们需要问自己4个问题。

  • (1)connector如何接受请求的?
  • (2)如何将请求封装成request和response的?
  • (3)封装完之后的request和response如何交给container进行处理的?
  • (4)container处理完之后如何交给connector并返回给客户端的?

先来一张connector的整体结构图

【注意】:不同的协议、不同的通信方式,protocolhandler会有不同的实现。在tomcat8.5中,protocolhandler的类继承层级如下图所示。

 

针对上述的类继承层级图,我们做如下说明:

  1. ajp和http11是两种不同的协议
  2. nio、nio2和apr是不同的通信方式
  3. 协议和通信方式可以相互组合。

protocolhandler包含三个部件:endpointprocessoradapter

  1. endpoint用来处理底层socket的网络连接,processor用于将endpoint接收到的socket封装成request,adapter用于将request交给container进行具体的处理。
  2. endpoint由于是处理底层的socket网络连接,因此endpoint是用来实现tcp/ip协议的,而processor用来实现http协议的,adapter将请求适配到servlet容器进行具体的处理。
  3. endpoint的抽象实现类abstractendpoint里面定义了acceptorasynctimeout两个内部类和一个handler接口acceptor用于监听请求,asynctimeout用于检查异步request的超时,handler用于处理接收到的socket,在内部调用processor进行处理。

至此,我们已经明白了问题(1)、(2)和(3)。至于(4),当我们了解了container自然就明白了,前面章节内容已经详细分析过了。

connector源码分析入口

 我们在service标准实现standardservice的源码中发现,其init()start()stop()destroy()方法分别会对connectors的同名方法进行调用。而一个service对应着多个connector

service.init()

@override
protected void initinternal() throws lifecycleexception {
    super.initinternal();

    if (engine != null) {
        engine.init();
    }

    // initialize any executors
    for (executor executor : findexecutors()) {
        if (executor instanceof jmxenabled) {
            ((jmxenabled) executor).setdomain(getdomain());
        }
        executor.init();
    }

    // initialize mapper listener
    mapperlistener.init();

    // initialize our defined connectors
    synchronized (connectorslock) {
        for (connector connector : connectors) {
            try {
                connector.init();
            } catch (exception e) {
                string message = sm.getstring(
                        "standardservice.connector.initfailed", connector);
                log.error(message, e);

                if (boolean.getboolean("org.apache.catalina.startup.exit_on_init_failure"))
                    throw new lifecycleexception(message);
            }
        }
    }
}

service.start()

@override
protected void startinternal() throws lifecycleexception {
    if(log.isinfoenabled())
        log.info(sm.getstring("standardservice.start.name", this.name));
    setstate(lifecyclestate.starting);

    // start our defined container first
    if (engine != null) {
        synchronized (engine) {
            engine.start();
        }
    }

    synchronized (executors) {
        for (executor executor: executors) {
            executor.start();
        }
    }

    mapperlistener.start();

    // start our defined connectors second
    synchronized (connectorslock) {
        for (connector connector: connectors) {
            try {
                // if it has already failed, don't try and start it
                if (connector.getstate() != lifecyclestate.failed) {
                    connector.start();
                }
            } catch (exception e) {
                log.error(sm.getstring(
                        "standardservice.connector.startfailed",
                        connector), e);
            }
        }
    }
}

我们知道connector实现了lifecycle接口,所以它是一个生命周期组件。所以connector的启动逻辑入口在于init()start()

connector构造方法

在分析之前,我们看看server.xml,该文件已经体现出了tomcat中各个组件的大体结构。

<?xml version='1.0' encoding='utf-8'?>
<server port="8005" shutdown="shutdown">
  <listener classname="org.apache.catalina.startup.versionloggerlistener" />
  <listener classname="org.apache.catalina.core.aprlifecyclelistener" sslengine="on" />
  <listener classname="org.apache.catalina.core.jrememoryleakpreventionlistener" />
  <listener classname="org.apache.catalina.mbeans.globalresourceslifecyclelistener" />
  <listener classname="org.apache.catalina.core.threadlocalleakpreventionlistener" />

  <globalnamingresources>
    <resource name="userdatabase" auth="container"
              type="org.apache.catalina.userdatabase"
              description="user database that can be updated and saved"
              factory="org.apache.catalina.users.memoryuserdatabasefactory"
              pathname="conf/tomcat-users.xml" />
  </globalnamingresources>

  <service name="catalina">
    <connector port="8080" protocol="http/1.1" connectiontimeout="20000" redirectport="8443" />
    <connector port="8009" protocol="ajp/1.3" redirectport="8443" />

    <engine name="catalina" defaulthost="localhost">
      <realm classname="org.apache.catalina.realm.lockoutrealm">
        <realm classname="org.apache.catalina.realm.userdatabaserealm"
               resourcename="userdatabase"/>
      </realm>

      <host name="localhost"  appbase="webapps"
            unpackwars="true" autodeploy="true">
        <valve classname="org.apache.catalina.valves.accesslogvalve" directory="logs"
               prefix="localhost_access_log" suffix=".txt"
               pattern="%h %l %u %t &quot;%r&quot; %s %b" />
      </host>
    </engine>
  </service>
</server>

在这个文件中,我们看到一个connector有几个关键属性,portprotocol是其中的两个。server.xml默认支持两种协议:http/1.1ajp/1.3。其中http/1.1用于支持http1.1协议,而ajp/1.3用于支持对apache服务器的通信。

接下来我们看看构造方法。

public connector() {
    this(null); // 1. 无参构造方法,传入参数为空协议,会默认使用`http/1.1`
}

public connector(string protocol) {
    setprotocol(protocol);
    // instantiate protocol handler
    // 5. 使用protocolhandler的类名构造protocolhandler的实例
    protocolhandler p = null;
    try {
        class<?> clazz = class.forname(protocolhandlerclassname);
        p = (protocolhandler) clazz.getconstructor().newinstance();
    } catch (exception e) {
        log.error(sm.getstring(
                "coyoteconnector.protocolhandlerinstantiationfailed"), e);
    } finally {
        this.protocolhandler = p;
    }

    if (globals.strict_servlet_compliance) {
        uricharset = standardcharsets.iso_8859_1;
    } else {
        uricharset = standardcharsets.utf_8;
    }
}

@deprecated
public void setprotocol(string protocol) {
    boolean aprconnector = aprlifecyclelistener.isapravailable() &&
            aprlifecyclelistener.getuseaprconnector();

    // 2. `http/1.1`或`null`,protocolhandler使用`org.apache.coyote.http11.http11nioprotocol`,不考虑apr
    if ("http/1.1".equals(protocol) || protocol == null) {
        if (aprconnector) {
            setprotocolhandlerclassname("org.apache.coyote.http11.http11aprprotocol");
        } else {
            setprotocolhandlerclassname("org.apache.coyote.http11.http11nioprotocol");
        }
    }
    // 3. `ajp/1.3`,protocolhandler使用`org.apache.coyote.ajp.ajpnioprotocol`,不考虑apr
    else if ("ajp/1.3".equals(protocol)) {
        if (aprconnector) {
            setprotocolhandlerclassname("org.apache.coyote.ajp.ajpaprprotocol");
        } else {
            setprotocolhandlerclassname("org.apache.coyote.ajp.ajpnioprotocol");
        }
    }
    // 4. 其他情况,使用传入的protocol作为protocolhandler的类名
    else {
        setprotocolhandlerclassname(protocol);
    }
}

从上面的代码我们看到构造方法主要做了下面几件事情:

  1. 无参构造方法,传入参数为空协议,会默认使用http/1.1
  2. http/1.1null,protocolhandler使用org.apache.coyote.http11.http11nioprotocol,不考虑apr
  3. ajp/1.3,protocolhandler使用org.apache.coyote.ajp.ajpnioprotocol,不考虑apr
  4. 其他情况,使用传入的protocol作为protocolhandler的类名
  5. 使用protocolhandler的类名构造protocolhandler的实例

connector.init()

@override
protected void initinternal() throws lifecycleexception {
    super.initinternal();

    // initialize adapter
    // 1. 初始化adapter
    adapter = new coyoteadapter(this);
    protocolhandler.setadapter(adapter);

    // make sure parsebodymethodsset has a default
    // 2. 设置接受body的method列表,默认为post
    if (null == parsebodymethodsset) {
        setparsebodymethods(getparsebodymethods());
    }

    if (protocolhandler.isaprrequired() && !aprlifecyclelistener.isapravailable()) {
        throw new lifecycleexception(sm.getstring("coyoteconnector.protocolhandlernoapr",
                getprotocolhandlerclassname()));
    }
    if (aprlifecyclelistener.isapravailable() && aprlifecyclelistener.getuseopenssl() &&
            protocolhandler instanceof abstracthttp11jsseprotocol) {
        abstracthttp11jsseprotocol<?> jsseprotocolhandler =
                (abstracthttp11jsseprotocol<?>) protocolhandler;
        if (jsseprotocolhandler.issslenabled() &&
                jsseprotocolhandler.getsslimplementationname() == null) {
            // openssl is compatible with the jsse configuration, so use it if apr is available
            jsseprotocolhandler.setsslimplementationname(opensslimplementation.class.getname());
        }
    }

    // 3. 初始化protocolhandler
    try {
        protocolhandler.init();
    } catch (exception e) {
        throw new lifecycleexception(
                sm.getstring("coyoteconnector.protocolhandlerinitializationfailed"), e);
    }
}

init()方法做了3件事情

  1. 初始化adapter
  2. 设置接受body的method列表,默认为post
  3. 初始化protocolhandler

protocolhandler类继承层级我们知道protocolhandler的子类都必须实现abstractprotocol抽象类,而protocolhandler.init();的逻辑代码正是在这个抽象类里面。我们来分析一下。

@override
public void init() throws exception {
    if (getlog().isinfoenabled()) {
        getlog().info(sm.getstring("abstractprotocolhandler.init", getname()));
    }

    if (oname == null) {
        // component not pre-registered so register it
        oname = createobjectname();
        if (oname != null) {
            registry.getregistry(null, null).registercomponent(this, oname, null);
        }
    }

    if (this.domain != null) {
        rgoname = new objectname(domain + ":type=globalrequestprocessor,name=" + getname());
        registry.getregistry(null, null).registercomponent(
                gethandler().getglobal(), rgoname, null);
    }

    // 1. 设置endpoint的名字,默认为:http-nio-{port}
    string endpointname = getname();
    endpoint.setname(endpointname.substring(1, endpointname.length()-1));
    endpoint.setdomain(domain);
    
    // 2. 初始化endpoint
    endpoint.init();
}

我们接着分析一下endpoint.init()里面又做了什么。该方法位于abstactendpoint抽象类,该类是基于模板方法模式实现的,主要调用了子类的bind()方法。

public abstract void bind() throws exception;
public abstract void unbind() throws exception;
public abstract void startinternal() throws exception;
public abstract void stopinternal() throws exception;

public void init() throws exception {
    // 执行bind()方法
    if (bindoninit) {
        bind();
        bindstate = bindstate.bound_on_init;
    }
    if (this.domain != null) {
        // register endpoint (as threadpool - historical name)
        oname = new objectname(domain + ":type=threadpool,name=\"" + getname() + "\"");
        registry.getregistry(null, null).registercomponent(this, oname, null);

        objectname socketpropertiesoname = new objectname(domain +
                ":type=threadpool,name=\"" + getname() + "\",subtype=socketproperties");
        socketproperties.setobjectname(socketpropertiesoname);
        registry.getregistry(null, null).registercomponent(socketproperties, socketpropertiesoname, null);

        for (sslhostconfig sslhostconfig : findsslhostconfigs()) {
            registerjmx(sslhostconfig);
        }
    }
}

继续分析bind()方法,我们终于看到了我们想要看的东西了。关键的代码在于serversock.socket().bind(addr,getacceptcount());,用于绑定serversocket到指定的ip和端口。

@override
public void bind() throws exception {

    if (!getuseinheritedchannel()) {
        serversock = serversocketchannel.open();
        socketproperties.setproperties(serversock.socket());
        inetsocketaddress addr = (getaddress()!=null?new inetsocketaddress(getaddress(),getport()):new inetsocketaddress(getport()));
        //绑定serversocket到指定的ip和端口
        serversock.socket().bind(addr,getacceptcount());
    } else {
        // retrieve the channel provided by the os
        channel ic = system.inheritedchannel();
        if (ic instanceof serversocketchannel) {
            serversock = (serversocketchannel) ic;
        }
        if (serversock == null) {
            throw new illegalargumentexception(sm.getstring("endpoint.init.bind.inherited"));
        }
    }

    serversock.configureblocking(true); //mimic apr behavior

    // initialize thread count defaults for acceptor, poller
    if (acceptorthreadcount == 0) {
        // fixme: doesn't seem to work that well with multiple accept threads
        acceptorthreadcount = 1;
    }
    if (pollerthreadcount <= 0) {
        //minimum one poller thread
        pollerthreadcount = 1;
    }
    setstoplatch(new countdownlatch(pollerthreadcount));

    // initialize ssl if needed
    initialisessl();

    selectorpool.open();
}

好了,我们已经分析完了init()方法,接下来我们分析start()方法。关键代码就一行,调用protocolhandler.start()方法。

connector.start()

@override
protected void startinternal() throws lifecycleexception {

    // validate settings before starting
    if (getport() < 0) {
        throw new lifecycleexception(sm.getstring(
                "coyoteconnector.invalidport", integer.valueof(getport())));
    }

    setstate(lifecyclestate.starting);

    try {
        protocolhandler.start();
    } catch (exception e) {
        throw new lifecycleexception(
                sm.getstring("coyoteconnector.protocolhandlerstartfailed"), e);
    }
}

我们深入protocolhandler.start()方法。

  1. 调用endpoint.start()方法
  2. 开启异步超时线程,线程执行单元为asynctimeout
@override
public void start() throws exception {
    if (getlog().isinfoenabled()) {
        getlog().info(sm.getstring("abstractprotocolhandler.start", getname()));
    }

    // 1. 调用`endpoint.start()`方法
    endpoint.start();

    // start async timeout thread
    // 2. 开启异步超时线程,线程执行单元为`asynctimeout`
    asynctimeout = new asynctimeout();
    thread timeoutthread = new thread(asynctimeout, getnameinternal() + "-asynctimeout");
    int priority = endpoint.getthreadpriority();
    if (priority < thread.min_priority || priority > thread.max_priority) {
        priority = thread.norm_priority;
    }
    timeoutthread.setpriority(priority);
    timeoutthread.setdaemon(true);
    timeoutthread.start();
}

这儿我们重点关注endpoint.start()方法

public final void start() throws exception {
    // 1. `bind()`已经在`init()`中分析过了
    if (bindstate == bindstate.unbound) {
        bind();
        bindstate = bindstate.bound_on_start;
    }
    startinternal();
}

@override
public void startinternal() throws exception {
    if (!running) {
        running = true;
        paused = false;

        processorcache = new synchronizedstack<>(synchronizedstack.default_size,
                socketproperties.getprocessorcache());
        eventcache = new synchronizedstack<>(synchronizedstack.default_size,
                        socketproperties.geteventcache());
        niochannels = new synchronizedstack<>(synchronizedstack.default_size,
                socketproperties.getbufferpool());

        // create worker collection
        // 2. 创建工作者线程池
        if ( getexecutor() == null ) {
            createexecutor();
        }
        
        // 3. 初始化连接latch,用于限制请求的并发量
        initializeconnectionlatch();

        // start poller threads
        // 4. 开启poller线程。poller用于对接受者线程生产的消息(或事件)进行处理,poller最终调用的是handler的代码
        pollers = new poller[getpollerthreadcount()];
        for (int i=0; i<pollers.length; i++) {
            pollers[i] = new poller();
            thread pollerthread = new thread(pollers[i], getname() + "-clientpoller-"+i);
            pollerthread.setpriority(threadpriority);
            pollerthread.setdaemon(true);
            pollerthread.start();
        }
        // 5. 开启acceptor线程
        startacceptorthreads();
    }
}

protected final void startacceptorthreads() {
    int count = getacceptorthreadcount();
    acceptors = new acceptor[count];

    for (int i = 0; i < count; i++) {
        acceptors[i] = createacceptor();
        string threadname = getname() + "-acceptor-" + i;
        acceptors[i].setthreadname(threadname);
        thread t = new thread(acceptors[i], threadname);
        t.setpriority(getacceptorthreadpriority());
        t.setdaemon(getdaemon());
        t.start();
    }
}
  1. bind()已经在init()中分析过了
  2. 创建工作者线程池
  3. 初始化连接latch,用于限制请求的并发量
  4. 创建轮询poller线程。poller用于对接受者线程生产的消息(或事件)进行处理,poller最终调用的是handler的代码
  5. 创建acceptor线程

connector请求逻辑

分析完了connector的启动逻辑之后,我们就需要进一步分析一下http的请求逻辑,当请求从客户端发起之后,需要经过哪些操作才能真正地得到执行?

acceptor

acceptor线程主要用于监听套接字,将已连接套接字转给poller线程。acceptor线程数由abstracendpoint的acceptorthreadcount成员变量控制,默认值为1

abstractendpoint.acceptor是abstractendpoint类的静态抽象类,实现了runnable接口,部分代码如下:
public abstract static class acceptor implements runnable {
    public enum acceptorstate {
        new, running, paused, ended
    }

    protected volatile acceptorstate state = acceptorstate.new;
    public final acceptorstate getstate() {
        return state;
    }

    private string threadname;
    protected final void setthreadname(final string threadname) {
        this.threadname = threadname;
    }
    protected final string getthreadname() {
        return threadname;
    }
}

nioendpoint的acceptor成员内部类继承了abstractendpoint.acceptor:

protected class acceptor extends abstractendpoint.acceptor {
    @override
    public void run() {
        int errordelay = 0;

        // loop until we receive a shutdown command
        while (running) {

            // loop if endpoint is paused
            // 1. 运行过程中,如果`endpoint`暂停了,则`acceptor`进行自旋(间隔50毫秒) `       
            while (paused && running) {
                state = acceptorstate.paused;
                try {
                    thread.sleep(50);
                } catch (interruptedexception e) {
                    // ignore
                }
            }
            // 2. 如果`endpoint`终止运行了,则`acceptor`也会终止
            if (!running) {
                break;
            }
            state = acceptorstate.running;

            try {
                //if we have reached max connections, wait
                // 3. 如果请求达到了最大连接数,则wait直到连接数降下来
                countuporawaitconnection();

                socketchannel socket = null;
                try {
                    // accept the next incoming connection from the server
                    // socket
                    // 4. 接受下一次连接的socket
                    socket = serversock.accept();
                } catch (ioexception ioe) {
                    // we didn't get a socket
                    countdownconnection();
                    if (running) {
                        // introduce delay if necessary
                        errordelay = handleexceptionwithdelay(errordelay);
                        // re-throw
                        throw ioe;
                    } else {
                        break;
                    }
                }
                // successful accept, reset the error delay
                errordelay = 0;

                // configure the socket
                if (running && !paused) {
                    // setsocketoptions() will hand the socket off to
                    // an appropriate processor if successful
                    // 5. `setsocketoptions()`这儿是关键,会将socket以事件的方式传递给poller
                    if (!setsocketoptions(socket)) {
                        closesocket(socket);
                    }
                } else {
                    closesocket(socket);
                }
            } catch (throwable t) {
                exceptionutils.handlethrowable(t);
                log.error(sm.getstring("endpoint.accept.fail"), t);
            }
        }
        state = acceptorstate.ended;
    }
}

从以上代码可以看到:

  • countuporawaitconnection函数检查当前最大连接数,若未达到maxconnections则加一,否则等待;
  • socket = serversock.accept()这一行中的serversock正是nioendpoint的bind函数中打开的serversocketchannel。为了引用这个变量,nioendpoint的acceptor类是成员而不再是静态类;
  • setsocketoptions函数调用上的注释表明该函数将已连接套接字交给poller线程处理。

setsocketoptions方法接着处理已连接套接字:

protected boolean setsocketoptions(socketchannel socket) {
    // process the connection
    try {
        //disable blocking, apr style, we are gonna be polling it
        socket.configureblocking(false);
        socket sock = socket.socket();
        socketproperties.setproperties(sock);

        niochannel channel = niochannels.pop();
        if (channel == null) {
            socketbufferhandler bufhandler = new socketbufferhandler(
                    socketproperties.getappreadbufsize(),
                    socketproperties.getappwritebufsize(),
                    socketproperties.getdirectbuffer());
            if (issslenabled()) {
                channel = new secureniochannel(socket, bufhandler, selectorpool, this);
            } else {
                channel = new niochannel(socket, bufhandler);
            }
        } else {
            channel.setiochannel(socket);
            channel.reset();
        }
        // 将channel注册到poller,注意关键的两个方法,`getpoller0()`和`poller.register()`
        getpoller0().register(channel);
    } catch (throwable t) {
        exceptionutils.handlethrowable(t);
        try {
            log.error("",t);
        } catch (throwable tt) {
            exceptionutils.handlethrowable(tt);
        }
        // tell to close the socket
        return false;
    }
    return true;
}
  • 从niochannel栈中出栈一个,若能重用(即不为null)则重用对象,否则新建一个niochannel对象;
  • getpoller0方法利用轮转法选择一个poller线程,利用poller类的register方法将上述niochannel对象注册到该poller线程上;
  • 若成功转给poller线程该函数返回true,否则返回false。返回false后,acceptor类的closesocket函数会关闭通道和底层socket连接并将当前最大连接数减一。

poller

poller线程主要用于以较少的资源轮询已连接套接字以保持连接,当数据可用时转给工作线程。

poller线程数由nioendpoint的pollerthreadcount成员变量控制,默认值为2与可用处理器数二者之间的较小值。
poller实现了runnable接口,可以看到构造函数为每个poller打开了一个新的selector。

public class poller implements runnable {
    private selector selector;
    private final synchronizedqueue<pollerevent> events =
            new synchronizedqueue<>();
    // 省略一些代码
    public poller() throws ioexception {
        this.selector = selector.open();
    }

    public selector getselector() { return selector;}
    // 省略一些代码
}

将channel注册到poller,注意关键的两个方法,getpoller0()poller.register()。先来分析一下getpoller0(),该方法比较关键的一个地方就是以取模的方式对poller数量进行轮询获取。

/**
 * the socket poller.
 */
private poller[] pollers = null;
private atomicinteger pollerrotater = new atomicinteger(0);
/**
 * return an available poller in true round robin fashion.
 *
 * @return the next poller in sequence
 */
public poller getpoller0() {
    int idx = math.abs(pollerrotater.incrementandget()) % pollers.length;
    return pollers[idx];
}

接下来我们分析一下poller.register()方法。因为poller维持了一个events同步队列,所以acceptor接受到的channel会放在这个队列里面,放置的代码为events.offer(event);

public class poller implements runnable {

    private final synchronizedqueue<pollerevent> events = new synchronizedqueue<>();

    /**
     * registers a newly created socket with the poller.
     *
     * @param socket    the newly created socket
     */
    public void register(final niochannel socket) {
        socket.setpoller(this);
        niosocketwrapper ka = new niosocketwrapper(socket, nioendpoint.this);
        socket.setsocketwrapper(ka);
        ka.setpoller(this);
        ka.setreadtimeout(getsocketproperties().getsotimeout());
        ka.setwritetimeout(getsocketproperties().getsotimeout());
        ka.setkeepaliveleft(nioendpoint.this.getmaxkeepaliverequests());
        ka.setsecure(issslenabled());
        ka.setreadtimeout(getconnectiontimeout());
        ka.setwritetimeout(getconnectiontimeout());
        pollerevent r = eventcache.pop();
        ka.interestops(selectionkey.op_read);//this is what op_register turns into.
        if ( r==null) r = new pollerevent(socket,ka,op_register);
        else r.reset(socket,ka,op_register);
        addevent(r);
    }

    private void addevent(pollerevent event) {
        events.offer(event);
        if ( wakeupcounter.incrementandget() == 0 ) selector.wakeup();
    }
}

pollerevent

接下来看一下pollerevent,pollerevent实现了runnable接口,用来表示一个轮询事件,代码如下:

public static class pollerevent implements runnable {
    private niochannel socket;
    private int interestops;
    private niosocketwrapper socketwrapper;

    public pollerevent(niochannel ch, niosocketwrapper w, int intops) {
        reset(ch, w, intops);
    }

    public void reset(niochannel ch, niosocketwrapper w, int intops) {
        socket = ch;
        interestops = intops;
        socketwrapper = w;
    }

    public void reset() {
        reset(null, null, 0);
    }

    @override
    public void run() {
        if (interestops == op_register) {
            try {
                socket.getiochannel().register(
                        socket.getpoller().getselector(), selectionkey.op_read, socketwrapper);
            } catch (exception x) {
                log.error(sm.getstring("endpoint.nio.registerfail"), x);
            }
        } else {
            final selectionkey key = socket.getiochannel().keyfor(socket.getpoller().getselector());
            try {
                if (key == null) {
                    socket.socketwrapper.getendpoint().countdownconnection();
                    ((niosocketwrapper) socket.socketwrapper).closed = true;
                } else {
                    final niosocketwrapper socketwrapper = (niosocketwrapper) key.attachment();
                    if (socketwrapper != null) {
                        //we are registering the key to start with, reset the fairness counter.
                        int ops = key.interestops() | interestops;
                        socketwrapper.interestops(ops);
                        key.interestops(ops);
                    } else {
                        socket.getpoller().cancelledkey(key);
                    }
                }
            } catch (cancelledkeyexception ckx) {
                try {
                    socket.getpoller().cancelledkey(key);
                } catch (exception ignore) {}
            }
        }
    }

}

在run函数中:

  • 若感兴趣集是自定义的op_register,则说明该事件表示的已连接套接字通道尚未被轮询线程处理过,那么将该通道注册到poller线程的selector上,感兴趣集是op_read,通道注册的附件是一个niosocketwrapper对象。从poller的register方法添加事件即是这样的过程;
  • 否则获得已连接套接字通道注册到poller线程的selector上的selectionkey,为key添加新的感兴趣集。

重访poller

上文提到poller类实现了runnable接口,其重写的run方法如下所示。

public boolean events() {
    boolean result = false;
    pollerevent pe = null;
    for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) {
        result = true;
        try {
            //直接调用run方法
            pe.run();
            pe.reset();
            if (running && !paused) {
                eventcache.push(pe);
            }
        } catch ( throwable x ) {
            log.error("",x);
        }
    }
    return result;
}

@override
public void run() {
    // loop until destroy() is called
    while (true) {
        boolean hasevents = false;

        try {
            if (!close) {
                /执行pollerevent的run方法
                hasevents = events();
                if (wakeupcounter.getandset(-1) > 0) {
                    //if we are here, means we have other stuff to do
                    //do a non blocking select
                    keycount = selector.selectnow();
                } else {
                    keycount = selector.select(selectortimeout);
                }
                wakeupcounter.set(0);
            }
            if (close) {
                events();
                timeout(0, false);
                try {
                    selector.close();
                } catch (ioexception ioe) {
                    log.error(sm.getstring("endpoint.nio.selectorclosefail"), ioe);
                }
                break;
            }
        } catch (throwable x) {
            exceptionutils.handlethrowable(x);
            log.error("",x);
            continue;
        }
        //either we timed out or we woke up, process events first
        if ( keycount == 0 ) hasevents = (hasevents | events());

        // 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
        iterator<selectionkey> iterator =
            keycount > 0 ? selector.selectedkeys().iterator() : null;
        // walk through the collection of ready keys and dispatch
        // any active event.
        // 对已经准备好的key进行处理
        while (iterator != null && iterator.hasnext()) {
            selectionkey sk = iterator.next();
            niosocketwrapper attachment = (niosocketwrapper)sk.attachment();
            // attachment may be null if another thread has called
            // cancelledkey()
            if (attachment == null) {
                iterator.remove();
            } else {
                iterator.remove();
                // 真正处理key的地方
                processkey(sk, attachment);
            }
        }//while

        //process timeouts
        timeout(keycount,hasevents);
    }//while

    getstoplatch().countdown();
}
  • 若队列里有元素则会先把队列里的事件均执行一遍,pollerevent的run方法会将通道注册到poller的selector上;
  • 对select返回的selectionkey进行处理,由于在pollerevent中注册通道时带上了niosocketwrapper附件,因此这里可以用selectionkey的attachment方法得到,接着调用processkey去处理已连接套接字通道。

我们接着分析processkey(),该方法又会根据key的类型,来分别处理读和写。

  1. 处理读事件,比如生成request对象
  2. 处理写事件,比如将生成的response对象通过socket写回客户端
protected void processkey(selectionkey sk, niosocketwrapper attachment) {
    try {
        if ( close ) {
            cancelledkey(sk);
        } else if ( sk.isvalid() && attachment != null ) {
            if (sk.isreadable() || sk.iswritable() ) {
                if ( attachment.getsendfiledata() != null ) {
                    processsendfile(sk,attachment, false);
                } else {
                    unreg(sk, attachment, sk.readyops());
                    boolean closesocket = false;
                    // 1. 处理读事件,比如生成request对象
                    // read goes before write
                    if (sk.isreadable()) {
                        if (!processsocket(attachment, socketevent.open_read, true)) {
                            closesocket = true;
                        }
                    }
                    // 2. 处理写事件,比如将生成的response对象通过socket写回客户端
                    if (!closesocket && sk.iswritable()) {
                        if (!processsocket(attachment, socketevent.open_write, true)) {
                            closesocket = true;
                        }
                    }
                    if (closesocket) {
                        cancelledkey(sk);
                    }
                }
            }
        } else {
            //invalid key
            cancelledkey(sk);
        }
    } catch ( cancelledkeyexception ckx ) {
        cancelledkey(sk);
    } catch (throwable t) {
        exceptionutils.handlethrowable(t);
        log.error("",t);
    }
}

我们继续来分析方法processsocket()

  1. processorcache里面拿一个processor来处理socket,processor的实现为socketprocessor
  2. processor放到工作线程池中执行
public boolean processsocket(socketwrapperbase<s> socketwrapper,
        socketevent event, boolean dispatch) {
    try {
        if (socketwrapper == null) {
            return false;
        }
        // 1. 从`processorcache`里面拿一个`processor`来处理socket,`processor`的实现为`socketprocessor`
        socketprocessorbase<s> sc = processorcache.pop();
        if (sc == null) {
            sc = createsocketprocessor(socketwrapper, event);
        } else {
            sc.reset(socketwrapper, event);
        }
        // 2. 将`processor`放到工作线程池中执行
        executor executor = getexecutor();
        if (dispatch && executor != null) {
            executor.execute(sc);
        } else {
            sc.run();
        }
    } catch (rejectedexecutionexception ree) {
        getlog().warn(sm.getstring("endpoint.executor.fail", socketwrapper) , ree);
        return false;
    } catch (throwable t) {
        exceptionutils.handlethrowable(t);
        // this means we got an oom or similar creating a thread, or that
        // the pool and its queue are full
        getlog().error(sm.getstring("endpoint.process.fail"), t);
        return false;
    }
    return true;
}

dispatch参数表示是否要在另外的线程中处理,上文processkey各处传递的参数都是true。

  • dispatch为true且工作线程池存在时会执行executor.execute(sc),之后是由工作线程池处理已连接套接字;
  • 否则继续由poller线程自己处理已连接套接字。

abstractendpoint类的createsocketprocessor是抽象方法,nioendpoint类实现了它:

@override
protected socketprocessorbase<niochannel> createsocketprocessor(
        socketwrapperbase<niochannel> socketwrapper, socketevent event) {
    return new socketprocessor(socketwrapper, event);
}

接着我们分析socketprocessor.dorun()方法(socketprocessor.run()方法最终调用此方法)。该方法将处理逻辑交给handler处理,当event为null时,则表明是一个open_read事件。

该类的注释说明socketprocessor与worker的作用等价。

/**
 * this class is the equivalent of the worker, but will simply use in an
 * external executor thread pool.
 */
protected class socketprocessor extends socketprocessorbase<niochannel> {

    public socketprocessor(socketwrapperbase<niochannel> socketwrapper, socketevent event) {
        super(socketwrapper, event);
    }

    @override
    protected void dorun() {
        niochannel socket = socketwrapper.getsocket();
        selectionkey key = socket.getiochannel().keyfor(socket.getpoller().getselector());

        try {
            int handshake = -1;

            try {
                if (key != null) {
                    if (socket.ishandshakecomplete()) {
                        // no tls handshaking required. let the handler
                        // process this socket / event combination.
                        handshake = 0;
                    } else if (event == socketevent.stop || event == socketevent.disconnect ||
                            event == socketevent.error) {
                        // unable to complete the tls handshake. treat it as
                        // if the handshake failed.
                        handshake = -1;
                    } else {
                        handshake = socket.handshake(key.isreadable(), key.iswritable());
                        // the handshake process reads/writes from/to the
                        // socket. status may therefore be open_write once
                        // the handshake completes. however, the handshake
                        // happens when the socket is opened so the status
                        // must always be open_read after it completes. it
                        // is ok to always set this as it is only used if
                        // the handshake completes.
                        event = socketevent.open_read;
                    }
                }
            } catch (ioexception x) {
                handshake = -1;
                if (log.isdebugenabled()) log.debug("error during ssl handshake",x);
            } catch (cancelledkeyexception ckx) {
                handshake = -1;
            }
            if (handshake == 0) {
                socketstate state = socketstate.open;
                // process the request from this socket
                // 将处理逻辑交给`handler`处理,当event为null时,则表明是一个`open_read`事件
                if (event == null) {
                    state = gethandler().process(socketwrapper, socketevent.open_read);
                } else {
                    state = gethandler().process(socketwrapper, event);
                }
                if (state == socketstate.closed) {
                    close(socket, key);
                }
            } else if (handshake == -1 ) {
                close(socket, key);
            } else if (handshake == selectionkey.op_read){
                socketwrapper.registerreadinterest();
            } else if (handshake == selectionkey.op_write){
                socketwrapper.registerwriteinterest();
            }
        } catch (cancelledkeyexception cx) {
            socket.getpoller().cancelledkey(key);
        } catch (virtualmachineerror vme) {
            exceptionutils.handlethrowable(vme);
        } catch (throwable t) {
            log.error("", t);
            socket.getpoller().cancelledkey(key);
        } finally {
            socketwrapper = null;
            event = null;
            //return to cache
            if (running && !paused) {
                processorcache.push(this);
            }
        }
    }
}

handler的关键方法是process(),虽然这个方法有很多条件分支,但是逻辑却非常清楚,主要是调用processor.process()方法。

@override
public socketstate process(socketwrapperbase<s> wrapper, socketevent status) {
    try {
     
        if (processor == null) {
            processor = getprotocol().createprocessor();
            register(processor);
        }

        processor.setsslsupport(
                wrapper.getsslsupport(getprotocol().getclientcertprovider()));

        // associate the processor with the connection
        connections.put(socket, processor);

        socketstate state = socketstate.closed;
        do {
            // 关键的代码,终于找到你了
            state = processor.process(wrapper, status);

        } while ( state == socketstate.upgrading);
        return state;
    } 
    catch (throwable e) {
        exceptionutils.handlethrowable(e);
        // any other exception or error is odd. here we log it
        // with "error" level, so it will show up even on
        // less-than-verbose logs.
        getlog().error(sm.getstring("abstractconnectionhandler.error"), e);
    } finally {
        containerthreadmarker.clear();
    }

    // make sure socket/processor is removed from the list of current
    // connections
    connections.remove(socket);
    release(processor);
    return socketstate.closed;
}

processor

createprocessor 

protected http11processor createprocessor() {                          
    // 构建 http11processor
    http11processor processor = new http11processor(
            proto.getmaxhttpheadersize(), (jioendpoint)proto.endpoint, // 1. http header 的最大尺寸
            proto.getmaxtrailersize(),proto.getmaxextensionsize());
    processor.setadapter(proto.getadapter());
    // 2. 默认的 keepalive 情况下, 每个 socket 处理的最多的 请求次数
    processor.setmaxkeepaliverequests(proto.getmaxkeepaliverequests());
    // 3. 开启 keepalive 的 timeout
    processor.setkeepalivetimeout(proto.getkeepalivetimeout());      
    // 4. http 当遇到文件上传时的 默认超时时间 (300 * 1000)    
    processor.setconnectionuploadtimeout(
            proto.getconnectionuploadtimeout());                      
    processor.setdisableuploadtimeout(proto.getdisableuploadtimeout());
    // 5. 当 http 请求的 body size超过这个值时, 通过 gzip 进行压缩
    processor.setcompressionminsize(proto.getcompressionminsize());  
    // 6. http 请求是否开启 compression 处理    
    processor.setcompression(proto.getcompression());                  
    processor.setnocompressionuseragents(proto.getnocompressionuseragents());
    // 7. http body里面的内容是 "text/html,text/xml,text/plain" 才会进行 压缩处理
    processor.setcompressablemimetypes(proto.getcompressablemimetypes());
    processor.setrestricteduseragents(proto.getrestricteduseragents());
    // 8. socket 的 buffer, 默认 9000
    processor.setsocketbuffer(proto.getsocketbuffer());       
    // 9. 最大的 post 处理尺寸的大小 4 * 1000    
    processor.setmaxsavepostsize(proto.getmaxsavepostsize());          
    processor.setserver(proto.getserver());
    processor.setdisablekeepalivepercentage(
            proto.getdisablekeepalivepercentage());                    
    register(processor);                                               
    return processor;
}

这儿我们主要关注的是processor对于读的操作,也只有一行代码。调用service()方法。

public abstract class abstractprocessorlight implements processor {

    @override
    public socketstate process(socketwrapperbase<?> socketwrapper, socketevent status)
            throws ioexception {

        socketstate state = socketstate.closed;
        iterator<dispatchtype> dispatches = null;
        do {
            if (dispatches != null) {
                dispatchtype nextdispatch = dispatches.next();
                state = dispatch(nextdispatch.getsocketstatus());
            } else if (status == socketevent.disconnect) {
                // do nothing here, just wait for it to get recycled
            } else if (isasync() || isupgrade() || state == socketstate.async_end) {
                state = dispatch(status);
                if (state == socketstate.open) {
                    // there may be pipe-lined data to read. if the data isn't
                    // processed now, execution will exit this loop and call
                    // release() which will recycle the processor (and input
                    // buffer) deleting any pipe-lined data. to avoid this,
                    // process it now.
                    state = service(socketwrapper);
                }
            } else if (status == socketevent.open_write) {
                // extra write event likely after async, ignore
                state = socketstate.long;
            } else if (status == socketevent.open_read){
                // 调用`service()`方法
                state = service(socketwrapper);
            } else {
                // default to closing the socket if the socketevent passed in
                // is not consistent with the current state of the processor
                state = socketstate.closed;
            }

            if (getlog().isdebugenabled()) {
                getlog().debug("socket: [" + socketwrapper +
                        "], status in: [" + status +
                        "], state out: [" + state + "]");
            }

            if (state != socketstate.closed && isasync()) {
                state = asyncpostprocess();
                if (getlog().isdebugenabled()) {
                    getlog().debug("socket: [" + socketwrapper +
                            "], state after async post processing: [" + state + "]");
                }
            }

            if (dispatches == null || !dispatches.hasnext()) {
                // only returns non-null iterator if there are
                // dispatches to process.
                dispatches = getiteratorandcleardispatches();
            }
        } while (state == socketstate.async_end ||
                dispatches != null && state != socketstate.closed);

        return state;
    }
}

processor.service()方法比较重要的地方就两点。该方法非常得长,也超过了200行,在此我们不再拷贝此方法的代码。

  1. 生成request和response对象
  2. 调用adapter.service()方法,将生成的request和response对象传进去

adapter

adapter用于连接connectorcontainer,起到承上启下的作用。processor会调用adapter.service()方法。我们来分析一下,主要做了下面几件事情:

  1. 根据coyote框架的request和response对象,生成connector的request和response对象(是httpservletrequest和httpservletresponse的封装)
  2. 补充header
  3. 解析请求,该方法会出现代理服务器、设置必要的header等操作
  4. 真正进入容器的地方,调用engine容器下pipeline的阀门
  5. 通过request.finishrequest 与 response.finishresponse(刷outputbuffer中的数据到浏览器) 来完成整个请求
@override
public void service(org.apache.coyote.request req, org.apache.coyote.response res)
        throws exception {

    // 1. 根据coyote框架的request和response对象,生成connector的request和response对象(是httpservletrequest和httpservletresponse的封装)
    request request = (request) req.getnote(adapter_notes);
    response response = (response) res.getnote(adapter_notes);

    if (request == null) {
        // create objects
        request = connector.createrequest();
        request.setcoyoterequest(req);
        response = connector.createresponse();
        response.setcoyoteresponse(res);

        // link objects
        request.setresponse(response);
        response.setrequest(request);

        // set as notes
        req.setnote(adapter_notes, request);
        res.setnote(adapter_notes, response);

        // set query string encoding
        req.getparameters().setquerystringcharset(connector.geturicharset());
    }

    // 2. 补充header
    if (connector.getxpoweredby()) {
        response.addheader("x-powered-by", powered_by);
    }

    boolean async = false;
    boolean postparsesuccess = false;

    req.getrequestprocessor().setworkerthreadname(thread_name.get());

    try {
        // parse and set catalina and configuration specific
        // request parameters
        // 3. 解析请求,该方法会出现代理服务器、设置必要的header等操作
        // 用来处理请求映射 (获取 host, context, wrapper, uri 后面的参数的解析, sessionid )
        postparsesuccess = postparserequest(req, request, res, response);
        if (postparsesuccess) {
            //check valves if we support async
            request.setasyncsupported(
                    connector.getservice().getcontainer().getpipeline().isasyncsupported());
            // calling the container
            // 4. 真正进入容器的地方,调用engine容器下pipeline的阀门
            connector.getservice().getcontainer().getpipeline().getfirst().invoke(
                    request, response);
        }
        if (request.isasync()) {
            async = true;
            readlistener readlistener = req.getreadlistener();
            if (readlistener != null && request.isfinished()) {
                // possible the all data may have been read during service()
                // method so this needs to be checked here
                classloader oldcl = null;
                try {
                    oldcl = request.getcontext().bind(false, null);
                    if (req.sendalldatareadevent()) {
                        req.getreadlistener().onalldataread();
                    }
                } finally {
                    request.getcontext().unbind(false, oldcl);
                }
            }

            throwable throwable =
                    (throwable) request.getattribute(requestdispatcher.error_exception);

            // if an async request was started, is not going to end once
            // this container thread finishes and an error occurred, trigger
            // the async error process
            if (!request.isasynccompleting() && throwable != null) {
                request.getasynccontextinternal().seterrorstate(throwable, true);
            }
        } else {
            //5. 通过request.finishrequest 与 response.finishresponse(刷outputbuffer中的数据到浏览器) 来完成整个请求
            request.finishrequest();
            //将 org.apache.catalina.connector.response对应的 outputbuffer 中的数据 刷到 org.apache.coyote.response 对应的 internaloutputbuffer 中, 并且最终调用 socket对应的 outputstream 将数据刷出去( 这里会组装 http response 中的 header 与 body 里面的数据, 并且刷到远端 )
            response.finishresponse();
        }

    } catch (ioexception e) {
        // ignore
    } finally {
        atomicboolean error = new atomicboolean(false);
        res.action(actioncode.is_error, error);

        if (request.isasynccompleting() && error.get()) {
            // connection will be forcibly closed which will prevent
            // completion happening at the usual point. need to trigger
            // call to oncomplete() here.
            res.action(actioncode.async_post_process,  null);
            async = false;
        }

        // access log
        if (!async && postparsesuccess) {
            // log only if processing was invoked.
            // if postparserequest() failed, it has already logged it.
            context context = request.getcontext();
            // if the context is null, it is likely that the endpoint was
            // shutdown, this connection closed and the request recycled in
            // a different thread. that thread will have updated the access
            // log so it is ok not to update the access log here in that
            // case.
            if (context != null) {
                context.logaccess(request, response,
                        system.currenttimemillis() - req.getstarttime(), false);
            }
        }

        req.getrequestprocessor().setworkerthreadname(null);

        // recycle the wrapper request and response
        if (!async) {
            request.recycle();
            response.recycle();
        }
    }
}

请求预处理

postparserequest方法对请求做预处理,如对路径去除分号表示的路径参数、进行uri解码、规格化(点号和两点号)

 

protected boolean postparserequest(org.apache.coyote.request req, request request,
        org.apache.coyote.response res, response response) throws ioexception, servletexception {
    // 省略部分代码
    messagebytes decodeduri = req.decodeduri();

    if (undecodeduri.gettype() == messagebytes.t_bytes) {
        // copy the raw uri to the decodeduri
        decodeduri.duplicate(undecodeduri);

        // parse the path parameters. this will:
        //   - strip out the path parameters
        //   - convert the decodeduri to bytes
        parsepathparameters(req, request);

        // uri decoding
        // %xx decoding of the url
        try {
            req.geturldecoder().convert(decodeduri, false);
        } catch (ioexception ioe) {
            res.setstatus(400);
            res.setmessage("invalid uri: " + ioe.getmessage());
            connector.getservice().getcontainer().logaccess(
                    request, response, 0, true);
            return false;
        }
        // normalization
        if (!normalize(req.decodeduri())) {
            res.setstatus(400);
            res.setmessage("invalid uri");
            connector.getservice().getcontainer().logaccess(
                    request, response, 0, true);
            return false;
        }
        // character decoding
        converturi(decodeduri, request);
        // check that the uri is still normalized
        if (!checknormalize(req.decodeduri())) {
            res.setstatus(400);
            res.setmessage("invalid uri character encoding");
            connector.getservice().getcontainer().logaccess(
                    request, response, 0, true);
            return false;
        }
    } else {
        /* the uri is chars or string, and has been sent using an in-memory
            * protocol handler. the following assumptions are made:
            * - req.requesturi() has been set to the 'original' non-decoded,
            *   non-normalized uri
            * - req.decodeduri() has been set to the decoded, normalized form
            *   of req.requesturi()
            */
        decodeduri.tochars();
        // remove all path parameters; any needed path parameter should be set
        // using the request object rather than passing it in the url
        charchunk uricc = decodeduri.getcharchunk();
        int semicolon = uricc.indexof(';');
        if (semicolon > 0) {
            decodeduri.setchars
                (uricc.getbuffer(), uricc.getstart(), semicolon);
        }
    }

    // request mapping.
    messagebytes servername;
    if (connector.getuseipvhosts()) {
        servername = req.localname();
        if (servername.isnull()) {
            // well, they did ask for it
            res.action(actioncode.req_local_name_attribute, null);
        }
    } else {
        servername = req.servername();
    }

    // version for the second mapping loop and
    // context that we expect to get for that version
    string version = null;
    context versioncontext = null;
    boolean maprequired = true;

    while (maprequired) {
        // this will map the the latest version by default
        connector.getservice().getmapper().map(servername, decodeduri,
                version, request.getmappingdata());
        // 省略部分代码
    }
    // 省略部分代码
}

以messagebytes的类型是t_bytes为例:

  • parsepathparameters方法去除uri中分号表示的路径参数;
  • req.geturldecoder()得到一个udecoder实例,它的convert方法对uri解码,这里的解码只是移除百分号,计算百分号后两位的十六进制数字值以替代原来的三位百分号编码;
  • normalize方法规格化uri,解释路径中的“.”和“..”;
  • converturi方法利用connector的uriencoding属性将uri的字节转换为字符表示;
  • 注意connector.getservice().getmapper().map(servername, decodeduri, version, request.getmappingdata()) 这行,之前service启动时mapperlistener注册了该service内的各host和context。根据uri选择context时,mapper的map方法采用的是converturi方法解码后的uri与每个context的路径去比较

容器处理

如果请求可以被传给容器的pipeline即当postparserequest方法返回true时,则由容器继续处理,在service方法中有connector.getservice().getcontainer().getpipeline().getfirst().invoke(request, response)这一行:

  • connector调用getservice返回standardservice;
  • standardservice调用getcontainer返回standardengine;
  • standardengine调用getpipeline返回与其关联的standardpipeline;

 后续处理流程请看下一篇文章

 

 

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网