C#中几种数据库的大数据批量插入

C#语言中对SqlServer、Oracle、SQLite和MySql中的数据批量插入是支持的,不过Oracle需要使用Orace.DataAccess驱动。

 

 IProvider里有一个用于实现批量插入的插件服务接口IBatcherProvider。批量插入的实现可以通过实现该接口来实现。

    /// <summary>/// 提供数据批量处理的方法。/// </summary>public interface IBatcherProvider : IProviderService{/// <summary>///<see cref="DataTable"/> 的数据批量插入到数据库中。/// </summary>/// <param name="dataTable">要批量插入的 <see cref="DataTable"/></param>/// <param name="batchSize">每批次写入的数据量。</param>void Insert(DataTable dataTable, int batchSize = 10000);}

 

一、SqlServer数据批量插入

    SqlServer的批量插入可使用SqlBulkCopy实现:

    /// <summary>/// 为 System.Data.SqlClient 提供的用于批量操作的方法。/// </summary>public sealed class MsSqlBatcher : IBatcherProvider{/// <summary>/// 获取或设置提供者服务的上下文。/// </summary>public ServiceContext ServiceContext { get; set; }/// <summary>///<see cref="DataTable"/> 的数据批量插入到数据库中。/// </summary>/// <param name="dataTable">要批量插入的 <see cref="DataTable"/></param>/// <param name="batchSize">每批次写入的数据量。</param>public void Insert(DataTable dataTable, int batchSize = 10000){Checker.ArgumentNull(dataTable, "dataTable");if (dataTable.Rows.Count == 0){return;}using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()){try{connection.TryOpen();//给表名加上前后导符var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName);using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null){DestinationTableName = tableName, BatchSize = batchSize}){//循环所有列,为bulk添加映射dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement);bulk.WriteToServer(dataTable);bulk.Close();}}catch (Exception exp){throw new BatcherException(exp);}finally{connection.TryClose();}}}}

    SqlBulkCopy的ColumnMappings中列的名称受大小写敏感限制,因此在构造DataTable的时候应请注意列名要与表一致。

     以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置SqlBulkCopyOptions.UseInternalTransaction。

 

     二、Oracle数据批量插入

     System.Data.OracleClient不支持批量插入,因此只能使用Oracle.DataAccess组件来作为提供者。

    /// <summary>/// Oracle.Data.Access 组件提供的用于批量操作的方法。/// </summary>public sealed class OracleAccessBatcher : IBatcherProvider{/// <summary>/// 获取或设置提供者服务的上下文。/// </summary>public ServiceContext ServiceContext { get; set; }/// <summary>///<see cref="DataTable"/> 的数据批量插入到数据库中。/// </summary>/// <param name="dataTable">要批量插入的 <see cref="DataTable"/></param>/// <param name="batchSize">每批次写入的数据量。</param>public void Insert(DataTable dataTable, int batchSize = 10000){Checker.ArgumentNull(dataTable, "dataTable");if (dataTable.Rows.Count == 0){return;}using (var connection = ServiceContext.Database.CreateConnection()){try{connection.TryOpen();using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()){if (command == null){throw new BatcherException(new ArgumentException("command"));}command.Connection = connection;command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);command.ExecuteNonQuery();}}catch (Exception exp){throw new BatcherException(exp);}finally{connection.TryClose();}}}/// <summary>/// 生成插入数据的sql语句。/// </summary>/// <param name="database"></param>/// <param name="command"></param>/// <param name="table"></param>/// <returns></returns>private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table){var names = new StringBuilder();var values = new StringBuilder();//将一个DataTable的数据转换为数组的数组var data = table.ToArray();//设置ArrayBindCount属性command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null);var syntax = database.Provider.GetService<ISyntaxProvider>();for (var i = 0; i < table.Columns.Count; i++){var column = table.Columns[i];var parameter = database.Provider.DbProviderFactory.CreateParameter();if (parameter == null){continue;}parameter.ParameterName = column.ColumnName;parameter.Direction = ParameterDirection.Input;parameter.DbType = column.DataType.GetDbType();parameter.Value = data[i];if (names.Length > 0){names.Append(",");values.Append(",");}names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName));values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);command.Parameters.Add(parameter);}return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);}}

    以上最重要的一步,就是将DataTable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环Columns将后数组作为

    Parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。

 

     三、SQLite数据批量插入

     SQLite的批量插入只需开启事务就可以了。

 public sealed class SQLiteBatcher : IBatcherProvider{/// <summary>/// 获取或设置提供者服务的上下文。/// </summary>public ServiceContext ServiceContext { get; set; }/// <summary>///<see cref="DataTable"/> 的数据批量插入到数据库中。/// </summary>/// <param name="dataTable">要批量插入的 <see cref="DataTable"/></param>/// <param name="batchSize">每批次写入的数据量。</param>public void Insert(DataTable dataTable, int batchSize = 10000){Checker.ArgumentNull(dataTable, "dataTable");if (dataTable.Rows.Count == 0){return;}using (var connection = ServiceContext.Database.CreateConnection()){DbTransaction transcation = null;try{connection.TryOpen();transcation = connection.BeginTransaction();using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()){if (command == null){throw new BatcherException(new ArgumentException("command"));}command.Connection = connection;command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable);if (command.CommandText == string.Empty){return;}var flag = new AssertFlag();dataTable.EachRow(row =>{var first = flag.AssertTrue();ProcessCommandParameters(dataTable, command, row, first);command.ExecuteNonQuery();});}transcation.Commit();}catch (Exception exp){if (transcation != null){transcation.Rollback();}throw new BatcherException(exp);}finally{connection.TryClose();}}}private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first){for (var c = 0; c < dataTable.Columns.Count; c++){DbParameter parameter;//首次创建参数,是为了使用缓存if (first){parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter();parameter.ParameterName = dataTable.Columns[c].ColumnName;command.Parameters.Add(parameter);}else{parameter = command.Parameters[c];}parameter.Value = row[c];}}/// <summary>/// 生成插入数据的sql语句。/// </summary>/// <param name="database"></param>/// <param name="table"></param>/// <returns></returns>private string GenerateInserSql(IDatabase database, DataTable table){var syntax = database.Provider.GetService<ISyntaxProvider>();var names = new StringBuilder();var values = new StringBuilder();var flag = new AssertFlag();table.EachColumn(column =>{if (!flag.AssertTrue()){names.Append(",");values.Append(",");}names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName));values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName);});return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values);}}

   四、MySql数据批量插入

 /// <summary>/// 为 MySql.Data 组件提供的用于批量操作的方法。/// </summary>public sealed class MySqlBatcher : IBatcherProvider{/// <summary>/// 获取或设置提供者服务的上下文。/// </summary>public ServiceContext ServiceContext { get; set; }/// <summary>///<see cref="DataTable"/> 的数据批量插入到数据库中。/// </summary>/// <param name="dataTable">要批量插入的 <see cref="DataTable"/></param>/// <param name="batchSize">每批次写入的数据量。</param>public void Insert(DataTable dataTable, int batchSize = 10000){Checker.ArgumentNull(dataTable, "dataTable");if (dataTable.Rows.Count == 0){return;}using (var connection = ServiceContext.Database.CreateConnection()){try{connection.TryOpen();using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()){if (command == null){throw new BatcherException(new ArgumentException("command"));}command.Connection = connection;command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable);if (command.CommandText == string.Empty){return;}command.ExecuteNonQuery();}}catch (Exception exp){throw new BatcherException(exp);}finally{connection.TryClose();}}}/// <summary>/// 生成插入数据的sql语句。/// </summary>/// <param name="database"></param>/// <param name="command"></param>/// <param name="table"></param>/// <returns></returns>private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table){var names = new StringBuilder();var values = new StringBuilder();var types = new List<DbType>();var count = table.Columns.Count;var syntax = database.Provider.GetService<ISyntaxProvider>();table.EachColumn(c =>{if (names.Length > 0){names.Append(",");}names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName));types.Add(c.DataType.GetDbType());});var i = 0;foreach (DataRow row in table.Rows){if (i > 0){values.Append(",");}values.Append("(");for (var j = 0; j < count; j++){if (j > 0){values.Append(", ");}var isStrType = IsStringType(types[j]);var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j);if (parameter != null){values.Append(parameter.ParameterName);command.Parameters.Add(parameter);}else if (isStrType){values.AppendFormat("'{0}'", row[j]);}else{values.Append(row[j]);}}values.Append(")");i++;}return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values);}/// <summary>/// 判断是否为字符串类别。/// </summary>/// <param name="dbType"></param>/// <returns></returns>private bool IsStringType(DbType dbType){return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength;}/// <summary>/// 创建参数。/// </summary>/// <param name="provider"></param>/// <param name="isStrType"></param>/// <param name="dbType"></param>/// <param name="value"></param>/// <param name="parPrefix"></param>/// <param name="row"></param>/// <param name="col"></param>/// <returns></returns>private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col){//如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数if ((isStrType && value.ToString().IndexOf('\'') != -1) || dbType == DbType.DateTime){var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col);var parameter = provider.DbProviderFactory.CreateParameter();parameter.ParameterName = name;parameter.Direction = ParameterDirection.Input;parameter.DbType = dbType;parameter.Value = value;return parameter;}return null;}}

    MySql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。

 

     五、测试

     接下来写一个测试用例来看一下使用批量插入的效果。 

 [Test]public void TestBatchInsert(){Console.WriteLine(TimeWatcher.Watch(() =>InvokeTest(database =>{var table = new DataTable("Batcher");table.Columns.Add("Id", typeof(int));table.Columns.Add("Name1", typeof(string));table.Columns.Add("Name2", typeof(string));table.Columns.Add("Name3", typeof(string));table.Columns.Add("Name4", typeof(string));//构造100000条数据for (var i = 0; i < 100000; i++){table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString());}//获取 IBatcherProvidervar batcher = database.Provider.GetService<IBatcherProvider>();if (batcher == null){Console.WriteLine("不支持批量插入。");}else{batcher.Insert(table);}//输出batcher表的数据量var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher");Console.WriteLine("当前共有 {0} 条数据", database.ExecuteScalar(sql));})));}

  以下表中列出了四种数据库生成10万条数据各耗用的时间

 

数据库

耗用时间

MsSql00:00:02.9376300
Oracle00:00:01.5155959
SQLite00:00:01.6275634
MySql00:00:05.4166891

转载于:https://www.cnblogs.com/kurt/p/3840018.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/461133.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

求背包问题所有解(C++实现)

这是我学习数据结构时的一道上机作业&#xff0c;那时还没养成写注释的习惯&#xff0c;所以各位得受点苦了。 只是简易背包问题。 代码&#xff1a; 展开 1 // 背包问题所有解2 // 作者:王锦 3 // 邮箱:jinkswvip.qq.com4 5 #include "stdafx.h"6 #include <iost…

I2C子系统详解3——I2C总线驱动层代码分析

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 一、前言 由I2C总线设备的驱动框架可知&#xff0c;I2C总线设备的驱动框架涉及的文件如下&#xff1a; &#xff08;1&#xff09;I2C设备驱动层相关的文件 x210开发板的电容触摸屏gslX680采用I2C接…

java join 异常_Java:守护进程:thread.join()没有完成,当在一个线程中抛出异常时...

我写了一个Java守护进程(一个实现守护进程和Runnable的类)&#xff0c;现在我遇到了以下问题&#xff1a;在init()中&#xff0c;我创建了一个新线程 . Thread thread new Thread(this); 在start()中我启动新线程 . thread.start() . 在运行中我做了很多不同的事情......然后发…

硬链接与符号链接的比较?

今天就说说硬链接&#xff08;实体链接&#xff09;与符号链接&#xff08;类似Windows的快捷方式&#xff09;的不同?首先我们应知道&#xff1a;每个档案都会占用一个inode ,档案内容由 inode记录来指向;想要读取该档案&#xff0c;必须要经过目录记录的文件名来指向正确的i…

JAVA--自制斐波那契数列输出

累了&#xff0c;写点简单的。 1 public class hello {2 3 /**4 * param args5 */6 public static void main(String[] args) {7 int Fabnum 10;8 int sum 0;9 System.out.print("Serial:\t"); 10 for(int i 1…

I2C子系统详解4——I2C设备驱动层代码分析

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 一、前言 由I2C总线设备的驱动框架可知&#xff0c;I2C总线设备的驱动框架涉及的文件如下&#xff1a; &#xff08;1&#xff09;I2C设备驱动层相关的文件 x210开发板的电容触摸屏gslX680采用I2C接…

golang java rpc_golang两种调用rpc的方法

本文实例讲述了golang两种调用rpc的方法。分享给大家供大家参考&#xff0c;具体如下&#xff1a;golang的rpc有两种方法进行调用&#xff0c;一种是rpc例子中给的&#xff1a;package mainimport ("net/rpc""net/http""log""net"&quo…

9、C语言 —— 指针的用处

为什么80%的码农都做不了架构师&#xff1f;>>> 1、用函数实现两个数的交换 ‍‍在没用函数之前&#xff0c;可以这样实现‍‍#include <stdio.h>int main() {int a 3;int b 7;int c;printf("交换前&#xff0c;a%d&#xff0c;b%d\n", a, b); …

内核中的竞争状态和互斥(简述)

以下内容源于朱有鹏《物联网大讲堂》课程的学习&#xff0c;如有侵权&#xff0c;请告知删除。 一、概念 &#xff08;1&#xff09;竞争状态&#xff08;简称竟态&#xff09;&#xff1b; &#xff08;2&#xff09;临界段&#xff08;某一段代码&#xff0c;该代码有可能…

MenuetOS

MenuetOS由芬兰人Ville Turjanmaa开发&#xff0c;是一个操作系统&#xff0c;用于和IBM PC兼容的电脑。 它由汇编语言写成&#xff0c;可以存入一只1.44MB的软盘中。 Menuet OS 的32位版本Menuet32使用GNU通用公共许可证发放&#xff0c;但64位版本Menuet64使用自己的协议发放…

php是一种,PHP是一种什么型的语言:()

案例分析一&#xff1a;假定CPU的主频是500MHz。硬盘采用DMA方式进行数据传送&#xff0c;其数据传输率为4MB/s, 每次DMA传输的数据量为8KB, 要求没有任何数据传输被错过。如果CPU在DMA初始化设置和启动硬盘操作等方面用了1000个时钟周期&#xff0c;并且在DMA传送完成后的中断…

java动态代理二cglib

2019独角兽企业重金招聘Python工程师标准>>> java动态代理 转载于:https://my.oschina.net/u/1430510/blog/290215

spring心得6--自动装配知识点讲解及案例分析

1.自动装配&#xff1a; spring3.2以上版本有四种自动装配类型&#xff1a; 1&#xff09;.byName:寻找和属性名相同的bean,若找不到&#xff0c;则装不上。 2&#xff09;.byType:寻找和属性类型相同的bean,找不到,装不上,找到多个抛异常。 3&#xff09;.constructor:按照参数…

中断的上下半部

以下内容源于朱有鹏嵌入式课程的学习与整理&#xff0c;如有侵权请告知删除。 前言 因为输入类设备的输入都是异步事件&#xff0c;因此一般使用中断来处理和响应。 中断处理程序处于中断上下文中&#xff0c;不能和用户空间数据交互&#xff08;不能使用copy_to(from)_usr函数…

arrayPointer

1,分别使用指针加减 int wages[2] {100000000,20000000}; int *pw wages or int *pw &wages[0] 表示指针指向数组的首地址; pw表示地址,*pw表示取值,new分配的动态数组时 指针名称当数组名称使用eg pw[0],pw[2]分别表示指向数组wages的2个数组的元素值; 我们已知pw表示此…

php抓取动态数据,php+ajax实现无刷新动态加载数据技术

我们浏览有些网页的时候&#xff0c;当拉动浏览器的滚动条时到页底时&#xff0c;页面会继续自动加载更多内容供用户浏览。这种技术我暂且称它为滚屏加载技术。我们发现很多网站用到这种技术&#xff0c;必应图片搜索、新浪微博、QQ空间等将该技术应用得淋漓尽致。滚屏加载技术…

图片播放器小项目(详解)

以下内容源于朱有鹏《物联网大讲堂》课程的学习整理&#xff0c;如有侵权&#xff0c;请告知删除。一、开始动手写代码 1、Makefile介绍 &#xff08;1&#xff09;这是一个通用的项目管理的Makefile体系&#xff0c;自己写的&#xff08;有子文件夹组织的&#xff09;项目可以…

基于linux-2.6.32.2的servfox移植

说明&#xff1a;这篇文章是本人在做基于web客户端的远程监控系统课题期间&#xff0c;在移植servfox应用服务程序费了很大周折&#xff0c;所以写下的。 只是介绍了基于arm的servfox移植【因为其他部分的移植没有遇到过错误】 前言&#xff1a; 如何移植基于linux的USB摄像头驱…

Telnet远程访问思科交换机、路由器

一、实验目的Telnet远程访问思科交换机、路由器二、实验拓扑三、实验步骤1、PC1远程管理S11&#xff09;配置交换机的管理IPS1(config)#int vlan 1S1(config-if)#ip add 192.168.1.100 255.255.255.0S1(config-if)#no shu2&#xff09;开启S1的telnet远程管理服务S1(config)#li…

php redis 队列,Redis 实现队列

## Redis 实现队列Redis 实现队列场景说明&#xff1a;用于处理比较耗时的请求&#xff0c;例如批量发送邮件&#xff0c;如果直接在网页触发执行发送&#xff0c;程序会出现超时高并发场景&#xff0c;当某个时刻请求瞬间增加时&#xff0c;可以把请求写入到队列&#xff0c;后…