当前位置: 移动技术网 > IT编程>开发语言>c# > c#几种数据库的大数据批量插入(SqlServer、Oracle、SQLite和MySql)

c#几种数据库的大数据批量插入(SqlServer、Oracle、SQLite和MySql)

2019年07月18日  | 移动技术网IT编程  | 我要评论
在之前只知道sqlserver支持数据批量插入,殊不知道oracle、sqlite和mysql也是支持的,不过oracle需要使用orace.dataaccess驱动,今天

在之前只知道sqlserver支持数据批量插入,殊不知道oracle、sqlite和mysql也是支持的,不过oracle需要使用orace.dataaccess驱动,今天就贴出几种数据库的批量插入解决方法。

首先说一下,iprovider里有一个用于实现批量插入的插件服务接口ibatcherprovider,此接口在前一篇文章中已经提到过了。

/// <summary>
  /// 提供数据批量处理的方法。
  /// </summary>
  public interface ibatcherprovider : iproviderservice
  {
    /// <summary>
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
    /// </summary>
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
    /// <param name="batchsize">每批次写入的数据量。</param>
    void insert(datatable datatable, int batchsize = 10000);
  }

一、sqlserver数据批量插入

sqlserver的批量插入很简单,使用sqlbulkcopy就可以,以下是该类的实现:

/// <summary>
  /// 为 system.data.sqlclient 提供的用于批量操作的方法。
  /// </summary>
  public sealed class mssqlbatcher : ibatcherprovider
  {
    /// <summary>
    /// 获取或设置提供者服务的上下文。
    /// </summary>
    public servicecontext servicecontext { get; set; }

    /// <summary>
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
    /// </summary>
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
    /// <param name="batchsize">每批次写入的数据量。</param>
    public void insert(datatable datatable, int batchsize = 10000)
    {
      checker.argumentnull(datatable, "datatable");
      if (datatable.rows.count == 0)
      {
        return;
      }
      using (var connection = (sqlconnection)servicecontext.database.createconnection())
      {
        try
        {
          connection.tryopen();
          //给表名加上前后导符
          var tablename = dbutility.formatbyquote(servicecontext.database.provider.getservice<isyntaxprovider>(), datatable.tablename);
          using (var bulk = new sqlbulkcopy(connection, sqlbulkcopyoptions.keepidentity, null)
            {
              destinationtablename = tablename, 
              batchsize = batchsize
            })
          {
            //循环所有列,为bulk添加映射
            datatable.eachcolumn(c => bulk.columnmappings.add(c.columnname, c.columnname), c => !c.autoincrement);
            bulk.writetoserver(datatable);
            bulk.close();
          }
        }
        catch (exception exp)
        {
          throw new batcherexception(exp);
        }
        finally
        {
          connection.tryclose();
        }
      }
    }
  }

以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置sqlbulkcopyoptions.useinternaltransaction。

二、oracle数据批量插入

system.data.oracleclient不支持批量插入,因此只能使用oracle.dataaccess组件来作为提供者。

/// <summary>
  /// oracle.data.access 组件提供的用于批量操作的方法。
  /// </summary>
  public sealed class oracleaccessbatcher : ibatcherprovider
  {
    /// <summary>
    /// 获取或设置提供者服务的上下文。
    /// </summary>
    public servicecontext servicecontext { get; set; }

    /// <summary>
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
    /// </summary>
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
    /// <param name="batchsize">每批次写入的数据量。</param>
    public void insert(datatable datatable, int batchsize = 10000)
    {
      checker.argumentnull(datatable, "datatable");
      if (datatable.rows.count == 0)
      {
        return;
      }
      using (var connection = servicecontext.database.createconnection())
      {
        try
        {
          connection.tryopen();
          using (var command = servicecontext.database.provider.dbproviderfactory.createcommand())
          {
            if (command == null)
            {
              throw new batcherexception(new argumentexception("command"));
            }
            command.connection = connection;
            command.commandtext = generateinsersql(servicecontext.database, command, datatable);
            command.executenonquery();
          }
        }
        catch (exception exp)
        {
          throw new batcherexception(exp);
        }
        finally
        {
          connection.tryclose();
        }
      }
    }

    /// <summary>
    /// 生成插入数据的sql语句。
    /// </summary>
    /// <param name="database"></param>
    /// <param name="command"></param>
    /// <param name="table"></param>
    /// <returns></returns>
    private string generateinsersql(idatabase database, dbcommand command, datatable table)
    {
      var names = new stringbuilder();
      var values = new stringbuilder();
      //将一个datatable的数据转换为数组的数组
      var data = table.toarray();

      //设置arraybindcount属性
      command.gettype().getproperty("arraybindcount").setvalue(command, table.rows.count, null);

      var syntax = database.provider.getservice<isyntaxprovider>();
      for (var i = 0; i < table.columns.count; i++)
      {
        var column = table.columns[i];

        var parameter = database.provider.dbproviderfactory.createparameter();
        if (parameter == null)
        {
          continue;
        }
        parameter.parametername = column.columnname;
        parameter.direction = parameterdirection.input;
        parameter.dbtype = column.datatype.getdbtype();
        parameter.value = data[i];

        if (names.length > 0)
        {
          names.append(",");
          values.append(",");
        }
        names.appendformat("{0}", dbutility.formatbyquote(syntax, column.columnname));
        values.appendformat("{0}{1}", syntax.parameterprefix, column.columnname);

        command.parameters.add(parameter);
      }
      return string.format("insert into {0}({1}) values ({2})", dbutility.formatbyquote(syntax, table.tablename), names, values);
    }
  }

 以上最重要的一步,就是将datatable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环columns将后数组作为parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。

三、sqlite数据批量插入

sqlite的批量插入只需开启事务就可以了,这个具体的原理不得而知。

public sealed class sqlitebatcher : ibatcherprovider
  {
    /// <summary>
    /// 获取或设置提供者服务的上下文。
    /// </summary>
    public servicecontext servicecontext { get; set; }

    /// <summary>
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
    /// </summary>
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
    /// <param name="batchsize">每批次写入的数据量。</param>
    public void insert(datatable datatable, int batchsize = 10000)
    {
      checker.argumentnull(datatable, "datatable");
      if (datatable.rows.count == 0)
      {
        return;
      }
      using (var connection = servicecontext.database.createconnection())
      {
        dbtransaction transcation = null;
        try
        {
          connection.tryopen();
          transcation = connection.begintransaction();
          using (var command = servicecontext.database.provider.dbproviderfactory.createcommand())
          {
            if (command == null)
            {
              throw new batcherexception(new argumentexception("command"));
            }
            command.connection = connection;

            command.commandtext = generateinsersql(servicecontext.database, datatable);
            if (command.commandtext == string.empty)
            {
              return;
            }

            var flag = new assertflag();
            datatable.eachrow(row =>
              {
                var first = flag.asserttrue();
                processcommandparameters(datatable, command, row, first);
                command.executenonquery();
              });
          }
          transcation.commit();
        }
        catch (exception exp)
        {
          if (transcation != null)
          {
            transcation.rollback();
          }
          throw new batcherexception(exp);
        }
        finally
        {
          connection.tryclose();
        }
      }
    }

    private void processcommandparameters(datatable datatable, dbcommand command, datarow row, bool first)
    {
      for (var c = 0; c < datatable.columns.count; c++)
      {
        dbparameter parameter;
        //首次创建参数,是为了使用缓存
        if (first)
        {
          parameter = servicecontext.database.provider.dbproviderfactory.createparameter();
          parameter.parametername = datatable.columns[c].columnname;
          command.parameters.add(parameter);
        }
        else
        {
          parameter = command.parameters[c];
        }
        parameter.value = row[c];
      }
    }

    /// <summary>
    /// 生成插入数据的sql语句。
    /// </summary>
    /// <param name="database"></param>
    /// <param name="table"></param>
    /// <returns></returns>
    private string generateinsersql(idatabase database, datatable table)
    {
      var syntax = database.provider.getservice<isyntaxprovider>();
      var names = new stringbuilder();
      var values = new stringbuilder();
      var flag = new assertflag();
      table.eachcolumn(column =>
        {
          if (!flag.asserttrue())
          {
            names.append(",");
            values.append(",");
          }
          names.append(dbutility.formatbyquote(syntax, column.columnname));
          values.appendformat("{0}{1}", syntax.parameterprefix, column.columnname);
        });
      return string.format("insert into {0}({1}) values ({2})", dbutility.formatbyquote(syntax, table.tablename), names, values);
    }
  }

四、mysql数据批量插入

/// <summary>
  /// 为 mysql.data 组件提供的用于批量操作的方法。
  /// </summary>
  public sealed class mysqlbatcher : ibatcherprovider
  {
    /// <summary>
    /// 获取或设置提供者服务的上下文。
    /// </summary>
    public servicecontext servicecontext { get; set; }

    /// <summary>
    /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。
    /// </summary>
    /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param>
    /// <param name="batchsize">每批次写入的数据量。</param>
    public void insert(datatable datatable, int batchsize = 10000)
    {
      checker.argumentnull(datatable, "datatable");
      if (datatable.rows.count == 0)
      {
        return;
      }
      using (var connection = servicecontext.database.createconnection())
      {
        try
        {
          connection.tryopen();
          using (var command = servicecontext.database.provider.dbproviderfactory.createcommand())
          {
            if (command == null)
            {
              throw new batcherexception(new argumentexception("command"));
            }
            command.connection = connection;

            command.commandtext = generateinsersql(servicecontext.database, command, datatable);
            if (command.commandtext == string.empty)
            {
              return;
            }
            command.executenonquery();
          }
        }
        catch (exception exp)
        {
          throw new batcherexception(exp);
        }
        finally
        {
          connection.tryclose();
        }
      }
    }

    /// <summary>
    /// 生成插入数据的sql语句。
    /// </summary>
    /// <param name="database"></param>
    /// <param name="command"></param>
    /// <param name="table"></param>
    /// <returns></returns>
    private string generateinsersql(idatabase database, dbcommand command, datatable table)
    {
      var names = new stringbuilder();
      var values = new stringbuilder();
      var types = new list<dbtype>();
      var count = table.columns.count;
      var syntax = database.provider.getservice<isyntaxprovider>();
      table.eachcolumn(c =>
        {
          if (names.length > 0)
          {
            names.append(",");
          }
          names.appendformat("{0}", dbutility.formatbyquote(syntax, c.columnname));
          types.add(c.datatype.getdbtype());
        });

      var i = 0;
      foreach (datarow row in table.rows)
      {
        if (i > 0)
        {
          values.append(",");
        }
        values.append("(");
        for (var j = 0; j < count; j++)
        {
          if (j > 0)
          {
            values.append(", ");
          }
          var isstrtype = isstringtype(types[j]);
          var parameter = createparameter(database.provider, isstrtype, types[j], row[j], syntax.parameterprefix, i, j);
          if (parameter != null)
          {
            values.append(parameter.parametername);
            command.parameters.add(parameter);
          }
          else if (isstrtype)
          {
            values.appendformat("'{0}'", row[j]);
          }
          else
          {
            values.append(row[j]);
          }
        }
        values.append(")");
        i++;
      }
      return string.format("insert into {0}({1}) values {2}", dbutility.formatbyquote(syntax, table.tablename), names, values);
    }

    /// <summary>
    /// 判断是否为字符串类别。
    /// </summary>
    /// <param name="dbtype"></param>
    /// <returns></returns>
    private bool isstringtype(dbtype dbtype)
    {
      return dbtype == dbtype.ansistring || dbtype == dbtype.ansistringfixedlength || dbtype == dbtype.string || dbtype == dbtype.stringfixedlength;
    }

    /// <summary>
    /// 创建参数。
    /// </summary>
    /// <param name="provider"></param>
    /// <param name="isstrtype"></param>
    /// <param name="dbtype"></param>
    /// <param name="value"></param>
    /// <param name="parprefix"></param>
    /// <param name="row"></param>
    /// <param name="col"></param>
    /// <returns></returns>
    private dbparameter createparameter(iprovider provider, bool isstrtype, dbtype dbtype, object value, char parprefix, int row, int col)
    {
      //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数
      if ((isstrtype && value.tostring().indexof('\'') != -1) || dbtype == dbtype.datetime)
      {
        var name = string.format("{0}p_{1}_{2}", parprefix, row, col);
        var parameter = provider.dbproviderfactory.createparameter();
        parameter.parametername = name;
        parameter.direction = parameterdirection.input;
        parameter.dbtype = dbtype;
        parameter.value = value;
        return parameter;
      }
      return null;
    }
  }

mysql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。

五、测试

接下来写一个测试用例来看一下使用批量插入的效果。    

 public void testbatchinsert()
    {
      console.writeline(timewatcher.watch(() =>
        invoketest(database =>
          {
            var table = new datatable("batcher");
            table.columns.add("id", typeof(int));
            table.columns.add("name1", typeof(string));
            table.columns.add("name2", typeof(string));
            table.columns.add("name3", typeof(string));
            table.columns.add("name4", typeof(string));

            //构造100000条数据
            for (var i = 0; i < 100000; i++)
            {
              table.rows.add(i, i.tostring(), i.tostring(), i.tostring(), i.tostring());
            }

            //获取 ibatcherprovider
            var batcher = database.provider.getservice<ibatcherprovider>();
            if (batcher == null)
            {
              console.writeline("不支持批量插入。");
            }
            else
            {
              batcher.insert(table);
            }

            //输出batcher表的数据量
            var sql = new sqlcommand("select count(1) from batcher");
            console.writeline("当前共有 {0} 条数据", database.executescalar(sql));

          })));
    }

以下表中列出了四种数据库生成10万条数据各耗用的时间

数据库

耗用时间

mssql 00:00:02.9376300
oracle 00:00:01.5155959
sqlite 00:00:01.6275634
mysql 00:00:05.4166891

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网