当前位置: 移动技术网 > IT编程>开发语言>Java > 深入理解MyBatis-Executor执行体系

深入理解MyBatis-Executor执行体系

2020年07月21日  | 移动技术网IT编程  | 我要评论

深入理解MyBatis-Executor执行体系


序言

​ MyBatis做为一个半ORM框架,现阶段在国内可谓如日中天,不论是前几年的SSM框架体系还是如今的微服务,企业级系统与数据库的交互基本使用的都是MyBatis。MyBatis作为一款轻量级框架,其架构设计与代码质量上,都十分的优秀。学习难度与成本均不算高,对于想学习源代码的同学来说,选择MyBatis作为入门十分适合。出于兴趣与爱好,同时也作为学习,提升自己。笔者将开始以深入理解MyBatis为主题的连载,经过一段时间的学习和规划,笔者将关于MyBatis的系列文章分为8个章节,作为对这段时间的学习与源码阅读的总结。若有感兴趣的同学看到这里,欢迎留言讨论。如果文章中有任何错误的地方,也感谢同学们帮忙指正。


概要

  • 回顾JDBC
  • MyBatis执行过程
  • Executor执行器体系

1、回顾JDBC

1、概述

​ JDBC(Java Data Base Connectivity)是一种用于执行SQL语句的Java API,从根本上来说JDBC是一种规范,为多种关系数据库提供统一的访问接口,允许便携式访问底层数据库。对于从业Java的同学来说,都不会对JDBC感到陌生。

2、执行流程

​ 以往不论是学习还是使用JDBC时,我们的代码基本都根据以下流程来完成一次与数据库的交互。

  1. 通过DriverManager获取数据库连接
  2. 预编译SQL语句,进行SQL参数填充
  3. 执行PrepareStatement并获取执行结果
  4. 将SQL的执行结果解析成我们需要的Bean对象

在这里插入图片描述

​ 下面的代码演示了使用JDBC执行一次简单的查询操作

/**
 * @author Emove
 * @date 2020/5/30
 */
public class JdbcTest {

    private final static String URL = "jdbc:mysql://localhost:3306/test";
    private final static String USER = "test";
    private final static String PWD = "123456";

    private Connection connection;

    @Before
    public void connect() throws SQLException {
        connection = DriverManager.getConnection(URL, USER, PWD);
    }

    @After
    public void close() throws SQLException {
        connection.close();
    }

    @Test
    public void run() throws SQLException {
        String sql = "SELECT * FROM users WHERE `name`=?";
        PreparedStatement psm = connection.prepareStatement(sql);
        psm.setString(1, "Emove");
        psm.execute();
        ResultSet resultSet = psm.getResultSet();
        while (resultSet.next()) {
            System.out.println(resultSet.getString(1));
            System.out.println(resultSet.getString(2));
        }
        resultSet.close();
        psm.close();
    }
}

​ 但是如果每次使用数据库,都需要使用这样的一段代码,自己获取一次数据库连接,拼接SQL、设置参数、处理结果集、再关闭连接。如果参数一多,整个流程下来,时间与人工成功都非常的高,也非常的繁琐,特别是在复杂SQL的情况下,同时使代码看起来十分的臃肿。MyBatis的出现,为的就是解决上述的问题,MyBatis通过封装上述的所有步骤,允许程序员自己编写SQL,配置好参数映射等,即可随时完成与数据库的交互,将程序员从繁琐的查询与处理SQL中解放出来。

3、JDBC中的三种执行器

​ 在Java的sql包中,有三个SQL执行器接口Statement(简单执行器)、PreparedStatement(预处理执行器)和CallableStatement(存储过程执行器)。

  1. Statement:执行静态SQL、批处理和设置加载行数
  2. PreparedStatement:设置预编译参数,用于实现防止SQL注入
  3. Callable Statement:与存储过程相关,可设置出参,读取出参
    在这里插入图片描述

2、MyBatis执行过程

查询流程

​ 上图为MyBatis查询执行过程图,图中包含一次查询过程中,MyBatis的逻辑流程及分支。将以上流程进行划分,可大致分为以下三个部分:

  1. 从查询节点开始,到会话查询节点为止,为dao层与MyBatis交互逻辑,在此过程中,会获取一个SqlSession对象,该对象作用域为一次查询会话。
  2. 从会话查询节点开始,到查询数据库节点为止,为MyBatis的执行器(Executor)逻辑执行过程。在这段流程中,主要为MyBatis以执行的statementid(即执行的sql映射ID)为目标对象进行的缓存、事务管理。(本文主要就是讲解Executor的执行体系,至于具体的逻辑代码,下文会有源码分析)
  3. 从查询数据库节点开始,到剩余的其他分支。为第一小节中,对JDBC的执行流程的封装及处理。

下图是MyBatis查询的一次执行流程

执行流程

SqlSession

​ SqlSession是MyBatis中的最重要的构建之一,是MyBatis对外提供的门面,用于提供增删改查、事务管理等功能,类似于JDBC里的Connection。它是应用层及MyBatis与DB之间执行交互操作的一个单线程对象,也是MyBatis执行持久化操作的关键对象。但由于SqlSession的实例不是线程安全的,因此是不能被共享的,所以它的最佳的作用域是一次请求或方法作用域。

​ 在MyBatis中,DefaultSqlSession继承了SqlSession接口,是SqlSession的默认实现,在该实现类中,包含了两个重要对象ConfigurationExecutor。Configuration也被称为MyBatis的大管家,贯穿了MyBatis的一次执行流程。而Executor则是后续查询逻辑操作的主要载体对象之一,下面将主要分析Executor的执行体系。

/**
 * The default implementation for {@link SqlSession}. 
 * Note that this class is not Thread-Safe.
 *
 * @author Clinton Begin
 */
public class DefaultSqlSession implements SqlSession {

  private final Configuration configuration;
  private final Executor executor;

  private final boolean autoCommit;
  private boolean dirty;
  private List<Cursor<?>> cursorList;

  public DefaultSqlSession(Configuration configuration, Executor executor, boolean autoCommit) {
    this.configuration = configuration;
    this.executor = executor;
    this.dirty = false;
    this.autoCommit = autoCommit;
  }

  public DefaultSqlSession(Configuration configuration, Executor executor) {
    this(configuration, executor, false);
  }
  
  ...
  
}

3、Executor执行器体系

​ 在前文的简单介绍中,我们已经知道,SqlSession对象只是一个门面,其后续的执行工作都交给了Executor。

​ SqlSession作为MyBatis的门面,用于提供MyBatis对应用提供的操作接口,其中除了基本的增删改查之外,还有事务相关的提交回滚,会话关闭,清除缓存,获取configuration和获取数据连接等辅助API,主要是方便我们对方法的调用。在SqlSession对外提供的这些接口中,大部分都是Executor在提供支持。下面我们来看下Executor接口定义的方法。

Executor接口

​ 可以看到,Executor中定义的方法大致都是与数据交互相关的方法,大致可以分为以下三类:

  1. 增删改查:增删改都包含在update方法中统一处理
  2. 事务相关:获取事务管理、提交回滚等
  3. 缓存相关:其中包含了MyBatis提供的一级、二级缓存等。

​ 接下来,我们简单看下Executor的继承关系:

Executor继承关系

​ MyBatis中,Executor提供了四个实现,其中ClosedExecutor为一个内部类,只实现了isClosed方法,忽略不计。先简单介绍下各个处理器的功能:

  1. BaseExecutor:基础执行器,是一个抽象类,提供了关于SimpleExecutor、ReuseExecutor、BatchExecutor共性相关的功能实现,此外,BaseExecutor还与MyBatis的一级缓存和事务管理相关
  2. SimpleExecutor:简单执行器,每次都会创建一个新的Statement(默认是PreparedStatement)
  3. ReuseExecutor:重用执行器,相同的Sql语句会使用同一个Statement
  4. BatchExecutor:批处理执行器,批处理提交修改,必须执行flushStatement才会生效
  5. CachingExecutor:缓存执行器,与MyBatis二级缓存相关

1、BaseExecutor

​ 通过前文第二小节的介绍,我们可以知道,在每次通过代理调用SqlSession时,获取到的SqlSession都是一个新的对象。SqlSession中的属性Executor,与SqlSession是一对一的关系,同样是线程不安全的。

​ BaseExecutor作为SimpleExecutor、ReuseExecutor、BatchExecutor的直接父类,其实现了子类中的一些共性功能,下面先来简单的分析下BaseExecutor的源码。BaseExecutor中代码较多,先挑比较简单和重要的代码讲解,目前不是很重要的代码均删除以减轻看代码时的压力。之后有必要时会详述,如果感兴趣也可以阅读源码。

public abstract class BaseExecutor implements Executor {
  // 事务管理接口、定义了与JDBC执行相关的功能,如获取连接、提交、回滚、关闭连接等操作
  protected Transaction transaction;
  protected Executor wrapper;

  /** 下方三个对象均是与缓存相关的属性, 作用域此次执行对象的生命周期范围 */
  protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
  protected PerpetualCache localCache;
  protected PerpetualCache localOutputParameterCache;
    
  protected Configuration configuration;

  /** queryStack为查询栈,与动态sql中的嵌套子查询相关 */
  protected int queryStack;
  private boolean closed;
    
  protected BaseExecutor(Configuration configuration, Transaction transaction) {
    ...
  }

  @Override
  public Transaction getTransaction() {
  	...
  }

  /**
   * 关闭当前对象、关闭之前判断是否需要回滚
   */
  @Override
  public void close(boolean forceRollback) {
    try {
      try {
        rollback(forceRollback);
      } finally {
        if (transaction != null) {
          transaction.close();
        }
      }
    } catch (SQLException e) {
      // Ignore.  There's nothing that can be done at this point.
    } finally {
      // 清空本地属性
    }
  }

  @Override
  public boolean isClosed() {
    return closed;
  }

  @Override
  public int update(MappedStatement ms, Object parameter) throws SQLException {
    // 省略与错误记录相关的代码, 详情请看ErrorContext类的实现,其使用LocalThread记录线程执行时的错误信息。同样逻辑在query方法中也有
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    clearLocalCache();
    return doUpdate(ms, parameter);
  }

  @Override
  public List<BatchResult> flushStatements() throws SQLException {
    return flushStatements(false);
  }

  public List<BatchResult> flushStatements(boolean isRollBack) throws SQLException {
    // ...
    return doFlushStatements(isRollBack);
  }

  @Override
  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    BoundSql boundSql = ms.getBoundSql(parameter);
    CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
    return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
  }

  @SuppressWarnings("unchecked")
  @Override
  public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    if (closed) {
      throw new ExecutorException("Executor was closed.");
    }
    if (queryStack == 0 && ms.isFlushCacheRequired()) {
      // 如果不是嵌套查询,并且配置了flushCache为true,清空一级缓存
      clearLocalCache();
    }
    List<E> list;
    try {
      queryStack++;
      // 如果resultHandler为空,尝试从一级缓存中获取数据
      list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
      if (list != null) {
        // 该方法实现与存储过程相关,开发中基本不会用到
        handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
      } else {
        // 一级缓存中没有对应数据,从数据库加载
        list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
      }
    } finally {
      queryStack--;
    }
    if (queryStack == 0) {
      // 延迟装载相关逻辑,后文会讲
      for (DeferredLoad deferredLoad : deferredLoads) {
        deferredLoad.load();
      }
      deferredLoads.clear();
      if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
        // 如果全局配置的一级缓存作用域为statement,清除本地缓存
        clearLocalCache();
      }
    }
    return list;
  }

  @Override
  public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    // 创建CacheKey逻辑,主要与mappedStatement的ID,参数,分页参数和sql有关
    ...
    return cacheKey;
  }

  @Override
  public boolean isCached(MappedStatement ms, CacheKey key) {
    return localCache.getObject(key) != null;
  }

  @Override
  public void commit(boolean required) throws SQLException {
    if (closed) {
      throw new ExecutorException("Cannot commit, transaction is already closed");
    }
    clearLocalCache();
    flushStatements();
    if (required) {
      transaction.commit();
    }
  }

  @Override
  public void rollback(boolean required) throws SQLException {
    if (!closed) {
      try {
        clearLocalCache();
        flushStatements(true);
      } finally {
        if (required) {
          transaction.rollback();
        }
      }
    }
  }

  /**
   * 清空本地缓存、在BaseExecutor中,update、query、commit、rollback方法中,均有调用回滚的逻辑,具体请看上述对应方法的实现
   */
  @Override
  public void clearLocalCache() {
    if (!closed) {
      localCache.clear();
      localOutputParameterCache.clear();
    }
  }

  /** 执行更新操作 */  
  protected abstract int doUpdate(MappedStatement ms, Object parameter)
      throws SQLException;
  /** 执行Statem,填充结果、与批量执行相关 */  
  protected abstract List<BatchResult> doFlushStatements(boolean isRollback)
      throws SQLException;
  /** 执行查询操作 */
  protected abstract <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
      throws SQLException;
    
  /** 关闭Statement对象 */
  protected void closeStatement(Statement statement) {
    if (statement != null) {
      try {
        statement.close();
      } catch (SQLException e) {
        // ignore
      }
    }
  }

  /**
   * Apply a transaction timeout. 设置事务超时
   * @param statement a current statement
   * @throws SQLException if a database access error occurs, this method is called on a closed <code>Statement</code>
   * @since 3.4.0
   * @see StatementUtil#applyTransactionTimeout(Statement, Integer, Integer)
   */
  protected void applyTransactionTimeout(Statement statement) throws SQLException {
    StatementUtil.applyTransactionTimeout(statement, statement.getQueryTimeout(), transaction.getTimeout());
  }

  private void handleLocallyCachedOutputParameters(MappedStatement ms, CacheKey key, Object parameter, BoundSql boundSql) {
    // 
  }

   /** 查询数据库 */
  private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
    List<E> list;
    // 插入key和占位符,为一级缓存做准备
    localCache.putObject(key, EXECUTION_PLACEHOLDER);
    try {
      list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
    } finally {
      localCache.removeObject(key);
    }
    // 保存执行结果作为以及缓存
    localCache.putObject(key, list);
    if (ms.getStatementType() == StatementType.CALLABLE) {
      localOutputParameterCache.putObject(key, parameter);
    }
    return list;
  }
 
  /** 获取连接 */  
  protected Connection getConnection(Log statementLog) throws SQLException {
    Connection connection = transaction.getConnection();
    if (statementLog.isDebugEnabled()) {
      return ConnectionLogger.newInstance(connection, statementLog, queryStack);
    } else {
      return connection;
    }
  }
}

​ 从代码中可以看出来,BaseExecutor主要为了子类提供了一些共性方法的实现,如事务相关的提交和回滚,一级缓存和获取数据库连接等。同时,定义了doQuery、doUpdate、doFlushStatement等抽象方法,由对应的子类去实现具体逻辑。以query方法为例,在queryFormDatabase中最终调用doQuery执行查询。除此之外,BaseExecutor中的方法实现逻辑都比较的简单清晰,建议仔细多看几遍。关于一级缓存、嵌套子查询等,后续会有文章单独的仔细介绍。这节将重点放在Executor的整体实现上,不必放大某些细节实现。

2、SimpleExecutor

​ 顾名思义,SimpleExecutor为简单执行器,MyBatis默认配置的执行器,实现上,在很多实际开发中,如果没有配置指定执行器,我们使用的都是SimpleExecutor。

​ 下面我们先来看下SimpleExecutor的代码。

public class SimpleExecutor extends BaseExecutor {

  public SimpleExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  //update
  @Override
  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Statement stmt = null;
    try {
      // 通过MappedStatement对象来获取configuration,但是获取到的configuration对象与SimpleExecutor构造器传进来的configuration是同一个对象
      Configuration configuration = ms.getConfiguration();
      // 新建一个StatementHandler
      // 这里看到ResultHandler传入的是null
      StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
      // 准备语句
      stmt = prepareStatement(handler, ms.getStatementLog());
      // StatementHandler.update
      return handler.update(stmt);
    } finally {
      closeStatement(stmt);
    }
  }

  //select
  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Statement stmt = null;
    try {
      Configuration configuration = ms.getConfiguration();
      // 新建一个StatementHandler
      // 这里看到ResultHandler传入了
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
      // 准备语句
      stmt = prepareStatement(handler, ms.getStatementLog());
      // StatementHandler.query
      return handler.<E>query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }

  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
	// doFlushStatements只是给batch用的,所以这里返回空
    return Collections.emptyList();
  }

  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    Connection connection = getConnection(statementLog);
    // 调用StatementHandler.prepare
    stmt = handler.prepare(connection);
    // 调用StatementHandler.parameterize
    handler.parameterize(stmt);
    return stmt;
  }
}

​ 以查询为例,当调用query方法时,经过BaseExecutor的处理后,调用queryFormDataBase时,其内部又调用了doQuery方法。所以实际上,真正去执行查询的方法为对应执行器的duQuery实现。但是在SimpleExecutor的doQuery方法中,我们可以看到,在executor层其实只做了三件事。

  1. 通过当前绑定执行的MappedStatement去获取一个对应的StatementHandler对象
  2. 创建Connection连接并生成Statement对象,并进行参数处理,每次使用SimpleExecutor执行查询或更新时,都会创建一个Statement对象
  3. 交由对应的StatementHandler去执行Statement

3、ReuseExecutor

​ ReuseExecutor名为重用执行器。重用执行器的作用是在一次会话中,重复使用同一个Statement对象以提升性能。以查询为例,每次执行查询时,都会去Statement缓存中查找,如果已经存在相同SQL的Statement对象,就会重复使用同一个Statement对象。那么ReuseExecutor是如何实现的呢?让我们来看下ReuseExecutor的代码。

public class ReuseExecutor extends BaseExecutor {

  //可重用的执行器内部用了一个map,用来缓存SQL语句对应的Statement,即key为SQL
  private final Map<String, Statement> statementMap = new HashMap<String, Statement>();

  public ReuseExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  @Override
  public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    //和SimpleExecutor一样,新建一个StatementHandler
    //这里看到ResultHandler传入的是null
    StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
    //准备语句
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.update(stmt);
  }

  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
    Configuration configuration = ms.getConfiguration();
    StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
    Statement stmt = prepareStatement(handler, ms.getStatementLog());
    return handler.<E>query(stmt, resultHandler);
  }

  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    // flush的时候清除缓存
    for (Statement stmt : statementMap.values()) {
      closeStatement(stmt);
    }
    statementMap.clear();
    return Collections.emptyList();
  }

  private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
    Statement stmt;
    //得到绑定的SQL语句
    BoundSql boundSql = handler.getBoundSql();
    String sql = boundSql.getSql();
    //如果缓存中已经有了,直接得到Statement
    if (hasStatementFor(sql)) {
      stmt = getStatement(sql);
    } else {
      //如果缓存没有找到,则和SimpleExecutor处理完全一样,然后加入缓存
      Connection connection = getConnection(statementLog);
      stmt = handler.prepare(connection);
      putStatement(sql, stmt);
    }
    handler.parameterize(stmt);
    return stmt;
  }

  private boolean hasStatementFor(String sql) {
    try {
      return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
    } catch (SQLException e) {
      return false;
    }
  }

  private Statement getStatement(String s) {
    return statementMap.get(s);
  }

  private void putStatement(String sql, Statement stmt) {
    statementMap.put(sql, stmt);
  }

}

​ ReuseExecutor实现Statement对象的复用逻辑很简单,在类内部维护一个Map,使用对应的SQL作为key来缓存同个会话中的Statement对象,每次需要执行时,都先去本地的缓存Map中查找,如果存在则直接使用之前的Statement对象,如果不存在再去创建Statement对象。

​ 那么在同一个会话中,如果StatementId不同,能否命中ReuseExecutor中的Statement缓存?答案是可以的,只要最终的SQL一致,就可以复用缓存中的Statement对象,但也仅限于同一会话内。

4、BatchExecutor

​ BatchExecutor名为批量处理器,用来批量执行SQL,但是注意,批量处理只对更新(增删改)语句有效,对查询无效。在同一个会话内,增删改SQL会一次性处理好SQL和参数然后发送给数据库,从而减少与数据库的交互次数,减少IO消耗,提升性能。

public class BatchExecutor extends BaseExecutor {

  public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;

  /** 批量执行的Statement对象 **/
  private final List<Statement> statementList = new ArrayList<Statement>();
  /** 批量执行的结果对象 **/
  private final List<BatchResult> batchResultList = new ArrayList<BatchResult>();
  /** 上一次处理的SQL语句 **/
  private String currentSql;
  /** 上一次处理的MappedStatement对象 **/
  private MappedStatement currentStatement;

  public BatchExecutor(Configuration configuration, Transaction transaction) {
    super(configuration, transaction);
  }

  @Override
  public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
    final Configuration configuration = ms.getConfiguration();
    final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
    final BoundSql boundSql = handler.getBoundSql();
    // 获取SQL语句
    final String sql = boundSql.getSql();
    final Statement stmt;
    // 判断当前要处理的sql语句是否等于上一次执行的sql,MappedStatement也是同理
    // 只有这两个对象都满足时,才能复用上一次的Statement对象
    if (sql.equals(currentSql) && ms.equals(currentStatement)) {
      int last = statementList.size() - 1;
      // 即使满足上述的两个条件,也只能从statement缓存list中获取最后一个对象
      // 由此也可知,想要实现复用,相同的sql还必须满足连贯顺序
      stmt = statementList.get(last);
      BatchResult batchResult = batchResultList.get(last);
      // 将参数保存到当前BatchResult中,稍后会处理
      batchResult.addParameterObject(parameterObject);
    } else {
      // 创建和保存新的Statement对象和BatchResult对象
      // 并将当前sql和MappedStatement设置为该次执行的对应对象
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection);
      currentSql = sql;
      currentStatement = ms;
      statementList.add(stmt);
      batchResultList.add(new BatchResult(ms, sql, parameterObject));
    }
    handler.parameterize(stmt);
    // 底层使用的时Statement接口来实现批量处理
    handler.batch(stmt); 
    return BATCH_UPDATE_RETURN_VALUE;
  }

  @Override
  public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
      throws SQLException {
    Statement stmt = null;
    try {
      flushStatements();
      Configuration configuration = ms.getConfiguration();
      StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
      Connection connection = getConnection(ms.getStatementLog());
      stmt = handler.prepare(connection);
      handler.parameterize(stmt);
      return handler.<E>query(stmt, resultHandler);
    } finally {
      closeStatement(stmt);
    }
  }

  @Override
  public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
    try {
      List<BatchResult> results = new ArrayList<BatchResult>();
      if (isRollback) {
        return Collections.emptyList();
      }
      for (int i = 0, n = statementList.size(); i < n; i++) {
        Statement stmt = statementList.get(i);
        BatchResult batchResult = batchResultList.get(i);
        try {
          // 执行并记录批量执行的数量
          batchResult.setUpdateCounts(stmt.executeBatch());
          MappedStatement ms = batchResult.getMappedStatement();
          List<Object> parameterObjects = batchResult.getParameterObjects();
          KeyGenerator keyGenerator = ms.getKeyGenerator();
          if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
            // 以下逻辑与执行insert时为映射的pojo对象设置自增长ID相关
            // 核心是使用JDBC3的Statement.getGeneratedKeys
            Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
            jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
          } else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { 
            for (Object parameter : parameterObjects) {
              keyGenerator.processAfter(this, ms, stmt, parameter);
            }
          }
        } catch (BatchUpdateException e) {
          StringBuilder message = new StringBuilder();
          message.append(batchResult.getMappedStatement().getId())
              .append(" (batch index #")
              .append(i + 1)
              .append(")")
              .append(" failed.");
          if (i > 0) {
            message.append(" ")
                .append(i)
                .append(" prior sub executor(s) completed successfully, but will be rolled back.");
          }
          throw new BatchExecutorException(message.toString(), e, results, batchResult);
        }
        results.add(batchResult);
      }
      return results;
    } finally {
      for (Statement stmt : statementList) {
        closeStatement(stmt);
      }
      currentSql = null;
      statementList.clear();
      batchResultList.clear();
    }
  }
}

​ 可以看出BatchExecutor类的代码相对与前面的两个类的实现要稍微复杂一些,复杂的地方主要集中在如何实现批量处理SQL及参数与执行insert语句后的填充key值处理。

​ 在以上代码中,值得注意的时,在执行BatchResult的时候,如果是相同的SQL语句,会进行合并使用同一个Statement对象,但是有一个前提是,相同的SQL语句必须是相同的MappedStatement,并且SQL必须满足连贯的先后顺序。

​ 说到这里,相信不少人都会有疑问,那么ReuseExecutor和BatchExecutor的区别是什么呢?

  1. ReuseExecutor是使用同一个Statement对象,多次设置参数,多次执行,多次处理结果。BatchExecutor是多次设置参数,一次执行、一次性把SQL及参数发送给数据库,并进行一次结果处理

  2. ReuseExecutor对查询、更新均有效,BatchExecutor只作用于更新

  3. 前面代码中注释到,BatchExecutor执行时,使用的时Statement接口的批量处理实现的。这是JDBC规范提供的批量处理支持。下面以PreparedStatement的batch方法实现为例。

    @Override
      public void batch(Statement statement) throws SQLException {
        PreparedStatement ps = (PreparedStatement) statement;
        ps.addBatch();
      }
    

5、CachingExecutor

​ CachingExecutor是缓存执行器,用于实现MyBatis的二级缓存,直接继承与Executor。关于二级缓存到底是怎么实现的,我将在后续单独写一篇文章介绍。在这个章节中,我们先来看看CachingExecutor中做了哪些事情,以及它是如何实现的。

public class CachingExecutor implements Executor {

  /** 内部持有一个Executor对象,通过装饰着模式来增强原有executor的功能 **/
  private Executor delegate;
  /** 事务缓存管理器 **/
  private TransactionalCacheManager tcm = new TransactionalCacheManager();

  public CachingExecutor(Executor delegate) {
    this.delegate = delegate;
    delegate.setExecutorWrapper(this);
  }

  @Override
  public Transaction getTransaction() {
    return delegate.getTransaction();
  }

  @Override
  public void close(boolean forceRollback) {
    try {
      //issues #499, #524 and #573
      if (forceRollback) { 
        tcm.rollback();
      } else {
        tcm.commit();
      }
    } finally {
      delegate.close(forceRollback);
    }
  }

  @Override
  public boolean isClosed() {
    return delegate.isClosed();
  }

  @Override
  public int update(MappedStatement ms, Object parameterObject) throws SQLException {
	// 刷新缓存完再update
    flushCacheIfRequired(ms);
    return delegate.update(ms, parameterObject);
  }

  @Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
    BoundSql boundSql = ms.getBoundSql(parameterObject);
	// query时传入一个cachekey参数
    CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
    return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  }

  //被ResultLoader.selectList调用
  @Override
  public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
      throws SQLException {
    Cache cache = ms.getCache();
    // 默认情况下是没有开启缓存的(二级缓存).要开启二级缓存,你需要在你的 SQL 映射文件中添加一行: <cache/>
    // 简单的说,就是先查CacheKey,查不到再委托给实际的执行器去查
    if (cache != null) {
      flushCacheIfRequired(ms);
      if (ms.isUseCache() && resultHandler == null) {
        ensureNoOutParams(ms, parameterObject, boundSql);
        @SuppressWarnings("unchecked")
        List<E> list = (List<E>) tcm.getObject(cache, key);
        if (list == null) {
          list = delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
          tcm.putObject(cache, key, list); 
        }
        return list;
      }
    }
    return delegate.<E> query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
  }

  @Override
  public List<BatchResult> flushStatements() throws SQLException {
    return delegate.flushStatements();
  }

  @Override
  public void commit(boolean required) throws SQLException {
    delegate.commit(required);
    tcm.commit();
  }

  @Override
  public void rollback(boolean required) throws SQLException {
    try {
      delegate.rollback(required);
    } finally {
      if (required) {
        tcm.rollback();
      }
    }
  }

  private void ensureNoOutParams(MappedStatement ms, Object parameter, BoundSql boundSql) {
    if (ms.getStatementType() == StatementType.CALLABLE) {
      for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
        if (parameterMapping.getMode() != ParameterMode.IN) {
          throw new ExecutorException("Caching stored procedures with OUT params is not supported.  Please configure useCache=false in " + ms.getId() + " statement.");
        }
      }
    }
  }

  @Override
  public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
    return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
  }

  @Override
  public boolean isCached(MappedStatement ms, CacheKey key) {
    return delegate.isCached(ms, key);
  }

  @Override
  public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
    delegate.deferLoad(ms, resultObject, property, key, targetType);
  }

  @Override
  public void clearLocalCache() {
    delegate.clearLocalCache();
  }

  private void flushCacheIfRequired(MappedStatement ms) {
    Cache cache = ms.getCache();
    if (cache != null && ms.isFlushCacheRequired()) {
      // 如果配置了flushCache, 则清空缓存
      tcm.clear(cache);
    }
  }

  @Override
  public void setExecutorWrapper(Executor executor) {
    throw new UnsupportedOperationException("This method should not be called");
  }

}

​ CachingExecutor的实现,在设计上通过装饰者模式,在内部持有一个Executor对象,来给Executor的方法实现包装上缓存实现逻辑。同时,CachingExecutor中还有一个事务缓存管理器来实现事务数据的隔离性。

4、总结

​ 在看完以上Executor执行体系的设计与实现后,我们来捋一捋一次查询时MyBatis的Executor方法执行栈。

  1. 当查询时,通过SqlSession的selectList方法进行开始执行(不管是执行selectOne方法还是selectList方法,最终调用的方法都是selectList)。SqlSession再调用内部持有的Executor的query方法执行。

    @Override
    public <E> List<E> selectList(String statement, Object parameter, RowBounds rowBounds) {
      try {
        //根据statement id找到对应的MappedStatement
        MappedStatement ms = configuration.getMappedStatement(statement);
        //转而用执行器来查询结果,注意这里传入的ResultHandler是null
        return executor.query(ms, wrapCollection(parameter), rowBounds, Executor.NO_RESULT_HANDLER);
      } catch (Exception e) {
        throw ExceptionFactory.wrapException("Error querying database.  Cause: " + e, e);
      } finally {
        ErrorContext.instance().reset();
      }
    }
    
  2. 在开启二级缓存的时候,SqlSession内部的Executor对象实际上时CachingExecutor对象,在执行完相应的缓存逻辑后,调用CachingExecutor内部Executor对象的query方法。

  3. 由于在query方法的实现只在BaseExecutor中,也就是说,当CachingExecutor调用query方法时,实际上是在调用BaseExecutor的query方法。在前文我们说过BaseExecutor最终执行数据库查询时,调用的是子类的doQuery方法实现。

  4. 所以在最终执行查询时,会进入SimpleExecutor的doQuery方法,去完成一次真正的数据库查询

​ 最后我们通过一张图来对本章节的Executor执行体系做一个回顾。

Executor继承体系

5、原文地址

深入理解MyBatis-Executor执行体系

本文地址:https://blog.csdn.net/weixin_40516653/article/details/107449258

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网