EF批量插入太慢?那是你的姿势不对

大概所有的程序员应该都接触过批量插入的场景,我也相信任何的程序员都能写出可正常运行的批量插入的代码。但怎样实现一个高效、快速插入的批量插入功能呢?

由于每个人的工作履历,工作年限的不同,在实现这样的一个需求时,可能技术选型各有不同,有直接生成insert语句的,有用EF的或者其他的orm框架的。其实不管是手写insert还是使用EF,最终交给数据库执行的还是insert语句。下面是EF批量插入的示例代码:

var list = new List<Student>();for (int i = 0; i < 100; i++){list.Add(new Student { CreateTime = DateTime.Now, Name = "zjjjjjj" });}await _context.Students.AddRangeAsync(list);await _context.SaveChangesAsync();

生成的脚本截图如下:

这种实现方式在数据量100以内时,耗时还算可以。但如果要批量导入的数据达到万级的时候,那耗时简直是灾难。我测试的数据如下(测试数据库为mysql,具体配置不详):

数据量耗时(s)
100.028
1w3.929
10w31.280

10w的数据已经耗时超过了30s,我没有勇气测试100w数据的耗时,有兴趣的可以自行测试下。

下面就应该进入正题了,对于较大数据量(1000以上)场景下的批量插入,各个数据库应该都提供了相关的解决方案,由于工作所限,目前笔者仅接触过mysql和mssql。

mysql的实现方案是LOAD DATA命令,此命令接收一个csv文件,然后将文件上传到数据库服务器后,解析数据后插入。好在MySqlConnector提供了相关的封装,不用咱们去熟悉那么复杂的命令参数。

mssql实现的方案是使用SqlBulkCopy类,不过此类仅接收DataTable类型的数据,所以,在批量插入的时候,需要将数据源转换成DataTable。

综上所示,不管是mysql,还是mssql,均需要将数据源转换成指定的格式才可以使用批量导入的功能,所以这一块的主要核心就是转换数据源格式。mysql需要转换成csv,mssql需要转换成DataTable。下面就来一起看看具体的转换的方法。

以下代码是转换csv和DataTable相关方法:

namespace FL.DbBulk{public static class Extension{/// <summary>/// 获取实体影射的表名/// </summary>/// <param name="type"></param>/// <returns></returns>public static string GetMappingName(this System.Type type){var key = $"batch{type.FullName}";var tableName = CacheService.Get(key);if (string.IsNullOrEmpty(tableName)){var tableAttr = type.GetCustomAttribute<TableAttribute>();if (tableAttr != null){tableName = tableAttr.Name;}else{tableName = type.Name;}CacheService.Add(key, tableName);}return tableName;}public static List<EntityInfo> GetMappingProperties(this System.Type type){var key = $"ICH.King.DbBulk{type.Name}";var list = CacheService.Get<List<EntityInfo>>(key);if (list == null){list = new List<EntityInfo>();foreach (var propertyInfo in type.GetProperties()){if (!propertyInfo.PropertyType.IsValueType &&propertyInfo.PropertyType.Name != "Nullable`1" && propertyInfo.PropertyType != typeof(string)) continue;var temp = new EntityInfo();temp.PropertyInfo = propertyInfo;temp.FieldName = propertyInfo.Name;var attr = propertyInfo.GetCustomAttribute<ColumnAttribute>();if (attr != null){temp.FieldName = attr.Name;}temp.GetMethod = propertyInfo.CreateGetter();list.Add(temp);}CacheService.Add(key, list);}return list;}/// <summary>/// 创建cvs字符串/// </summary>/// <typeparam name="T"></typeparam>/// <param name="entities"></param>/// <param name="primaryKey"></param>/// <returns></returns>public static string CreateCsv<T>(this IEnumerable<T> entities, string primaryKey = ""){var sb = new StringBuilder();var properties = typeof(T).GetMappingProperties().ToArray();foreach (var entity in entities){for (int i = 0; i < properties.Length; i++){var ele = properties[i];if (i != 0) sb.Append(",");var value = ele.Get(entity);if (ele.PropertyInfo.PropertyType.Name == "Nullable`1"){if (ele.PropertyInfo.PropertyType.GenericTypeArguments[0] == typeof(DateTime)){if (value == null){sb.Append("NULL");}else{sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));}continue;}}if (ele.PropertyInfo.PropertyType == typeof(DateTime)){sb.Append(Convert.ToDateTime(value).ToString("yyyy-MM-dd HH:mm:ss"));continue;}//如果是主键&&string类型,且值不为空if (ele.FieldName == primaryKey && ele.PropertyInfo.PropertyType == typeof(string)){sb.Append(Guid.NewGuid().ToString());continue;}if (value == null){continue;}if (ele.PropertyInfo.PropertyType == typeof(string)){var vStr = value.ToString();if (vStr.Contains("\"")){vStr = vStr.Replace("\"", "\"\"");}if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n")){vStr = $"\"{vStr}\"";}sb.Append(vStr);}else sb.Append(value);}sb.Append(IsWin() ? "\r\n" : "\n");//sb.AppendLine();}return sb.ToString();}public static bool IsWin(){return RuntimeInformation.IsOSPlatform(OSPlatform.Windows);}public static string CreateCsv(this DataTable table){StringBuilder sb = new StringBuilder();DataColumn colum;foreach (DataRow row in table.Rows){for (int i = 0; i < table.Columns.Count; i++){colum = table.Columns[i];if (i != 0) sb.Append(",");if (colum.DataType == typeof(string)){var vStr = row[colum].ToString();if (vStr.Contains("\"")){vStr = vStr.Replace("\"", "\"\"");}if (vStr.Contains(",") || vStr.Contains("\r\n") || vStr.Contains("\n")){vStr = $"\"{vStr}\"";}sb.Append(vStr);}else sb.Append(row[colum]);}sb.Append(IsWin() ? "\r\n" : "\n");}return sb.ToString();}public static DataTable ToDataTable<T>(this IEnumerable<T> list, string primaryKey = ""){var type = typeof(T);//获取实体映射的表名var mappingName = type.GetMappingName();var dt = new DataTable(mappingName);//获取实体映射的属性列表var columns = type.GetMappingProperties();dt.Columns.AddRange(columns.Select(x => new DataColumn(x.FieldName)).ToArray());foreach (var data in list){var row = dt.NewRow();foreach (var entityInfo in columns){var value = entityInfo.Get(data);if (primaryKey == entityInfo.FieldName && entityInfo.PropertyInfo.PropertyType == typeof(string)){row[entityInfo.FieldName] = value ?? Guid.NewGuid().ToString();}else{row[entityInfo.FieldName] = value;}}dt.Rows.Add(row);}return dt;}}}

转换成DataTable方法相对简单,但这里我做了个优化下,当判断主键是string类型,且值为空时,会自动生成一个GUID,并给其赋值,这样做的目的是为了和EF原生的插入功能兼容。

生成Csv的相对比较麻烦,因为Csv是用逗号以及其他符号来区分每一行、每一列数据,但经常会存在要插入的数据包含了csv的特殊符号,这样情况下就需要做转义。另外,还有一个需要考虑的问题,linux和windows默认的换行符是有区别的,windows的换行符为\r\n,而linux默认的是\n,所以在生成csv时,需要根据不同的系统进行处理。

下面来看下具体怎么调用相关的插入方法,首先看下mysql的,主要代码如下所示:

private async Task InsertCsvAsync(string csv, string tableName, List<string> columns){var fileName = Path.GetTempFileName();await File.WriteAllTextAsync(fileName, csv);var conn = _context.Database.GetDbConnection() as MySqlConnection;var loader = new MySqlBulkLoader(conn){FileName = fileName,Local = true,LineTerminator = Extension.IsWin() ? "\r\n" : "\n",FieldTerminator = ",",TableName = tableName,FieldQuotationCharacter = '"',EscapeCharacter = '"',CharacterSet = "UTF8"};loader.Columns.AddRange(columns);await loader.LoadAsync();}

在上述的代码中,首先创建一个临时文件,然后将其他数据源转换的csv内容写入到文件中,获取数据库连接,再然后创建MySqlBulkLoader类的实例,将相关参数进行复制后,还需要配置字段列表,最后执行LoadAsync命令。

下面是mssql的批量插入的核心代码:

public async Task InsertAsync(DataTable table){if (table == null){throw new ArgumentNullException();}if (string.IsNullOrEmpty(table.TableName)){throw new ArgumentNullException("DataTable的TableName属性不能为空");}var conn = (SqlConnection)_context.Database.GetDbConnection();await conn.OpenAsync();using (var bulk = new SqlBulkCopy(conn)){bulk.DestinationTableName = table.TableName;foreach (DataColumn column in table.Columns){bulk.ColumnMappings.Add(column.ColumnName, column.ColumnName);}await bulk.WriteToServerAsync(table);}}

以上方法相对简单,在此不做更多解释。

至此,mysql和mssql批量的导入的方案已经介绍完毕,但可能就会有人说了,这跟EF好像也没什么关系呀。
其实如果你有仔细看的话,或许能发现,我在代码中使用了一个名为_context字段,此字段其实就是EF的DbContext的实例。但文章内容到此时也没有完全的和EF结合,下面就来介绍下如何更优雅的将此功能集成到EF中。

在.net core中,接入EF的时候其实已经指定了使用的数据库类型,实例代码如下:

services.AddDbContext<MyDbContext>(opt => opt.UseMySql("server=10.0.0.146;Database=demo;Uid=root;Pwd=123456;Port=3306;AllowLoadLocalInfile=true"))

既然以及指定了数据库类型,那么在调用批量插入的时候,应该就不需要让调用者判断是使用mysql的方法,还是mssql的方法。具体怎么设计呢?且耐心往下看。

首先分别定义接口ISqlBulk,IMysqlBulk,ISqlServerBulk代码如下:

namespace FL.DbBulk{public interface ISqlBulk{/// <summary>/// 批量导入数据/// </summary>/// <param name="table">数据源</param>void Insert(DataTable table);/// <summary>/// 批量导入数据/// </summary>/// <param name="table">数据源</param>Task InsertAsync(DataTable table);void Insert<T>(IEnumerable<T> enumerable) where T : class;Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class;}}

IMysqlBulk,ISqlServerBulk接口继承ISqlBulk,代码如下:

namespace FL.DbBulk{public interface IMysqlBulk : ISqlBulk{Task InsertAsync<T>(string csvPath, string tableName = "") where T : class;}}
namespace FL.DbBulk{public interface ISqlServerBulk:ISqlBulk{}}

然后创建ISqlBulk实现类:

namespace FL.DbBulk{public class SqlBulk : ISqlBulk{private ISqlBulk _bulk;public SqlBulk(DbContext context, IServiceProvider provider){if (context.Database.IsMySql()){_bulk = provider.GetService<IMysqlBulk>();}else if (context.Database.IsSqlServer()){_bulk = provider.GetService<ISqlServerBulk>();}}public void Insert(DataTable table){_bulk.Insert(table);}public async Task InsertAsync(DataTable table){await _bulk.InsertAsync(table);}public void Insert<T>(IEnumerable<T> enumerable) where T : class{_bulk.Insert(enumerable);}public async Task InsertAsync<T>(IEnumerable<T> enumerable) where T : class{await _bulk.InsertAsync(enumerable);}}}

在SqlBulk的构造函数中,通过context.Database的扩展方法判断数据库的类型,然后再获取相应的接口的实例。再然后就是实现IMysqlBulk和ISqlServerBulk的实现类。上文已经把核心代码贴出,再此为了篇幅,就不贴完整代码了。

再然后,就是提供一个注入services的方法,代码如下:

namespace Microsoft.Extensions.DependencyInjection{public static class ServiceCollectionExtension{public static IServiceCollection AddBatchDB<T>(this IServiceCollection services) where  T:DbContext{services.TryAddScoped<IMysqlBulk, MysqlBulk>();services.TryAddScoped<ISqlServerBulk, SqlServerBulk>();services.TryAddScoped<ISqlBulk, SqlBulk>();services.AddScoped<DbContext, T>();return services;}}}

有了以上代码,我们就可以通过在Startup中很方便的启用批量插入的功能了。

最后,贴出两种插入方式对比的测试数据:

数据量EF默认耗时(s)ISqlBulk耗时(s)
100.0280.030
1w3.9291.581
10w31.28015.408

以上测试数据均是使用同一个mysql数据库,不同配置以及网络环境下,测试的数据会有差异,有兴趣的可以自己试试。

至此,本人内容已完毕。


最后,贴出git地址,如果思路或代码可以帮到你,欢迎点赞,点star
https://github.com/fuluteam/FL.DbBulk.git

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/308678.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[RabbitMQ]什么是MQ

什么是MQ MQ(message queue)&#xff0c;从字面意思上看&#xff0c;本质是个队列&#xff0c;FIFO 先入先出&#xff0c;只不过队列中存放的内容是message 而已&#xff0c;还是一种跨进程的通信机制&#xff0c;用于上下游传递消息。在互联网架构中&#xff0c;MQ 是一种非常…

.NET Core 实现基于Websocket的在线聊天室

什么是Websocket我们在传统的客户端程序要实现实时双工通讯第一想到的技术就是socket通讯&#xff0c;但是在web体系是用不了socket通讯技术的&#xff0c;因为http被设计成无状态&#xff0c;每次跟服务器通讯完成后就会断开连接。在没有websocket之前web系统如果要做双工通讯…

用 Natasha 写个类型调用的架子

一、想法自上篇文章&#xff0c;我一直琢磨整个好点的例子来展示 Natasha 动态编程能力, 于是就写了一个简单的类型调用的架子&#xff0c;耗时40分钟左右, 项目地址&#xff1a;https://github.com/NMSAzulX/TypeCaller二、功能特点a)、简单的注入功能支持无参构造注入支持递…

Natasha v4.0.0.0 动态编程新篇章

一、简介Natasha 基于 Roslyn 的 C# 动态程序集构建库&#xff0c;该库允许开发者在运行时使用 C# 代码构建域 / 程序集 / 类 / 结构体 / 枚举 / 接口 / 方法等&#xff0c;使得程序在运行的时候可以增加新的模块及功能。Natasha 集成了域管理/插件管理&#xff0c;可以实现域隔…

[RabbitMQ]RabbitMQ概念_四大核心概念

RabbitMQ RabbitMQ 的概念 RabbitMQ 是一个消息中间件&#xff1a;它接受并转发消息。你可以把它当做一个快递站点&#xff0c;当你要发送一个包裹时&#xff0c;你把你的包裹放到快递站&#xff0c;快递员最终会把你的快递送到收件人那里&#xff0c;按照这种逻辑 RabbitMQ …

.Net Core in Docker极简入门(下篇)

点击上方蓝字"小黑在哪里"关注我吧Docker-Compose代码修改yml fileup & down镜像仓库前言上一篇【.Net Core in Docker极简入门&#xff08;上篇&#xff09;】讲解了docker的一些基本命令和操作&#xff0c;并成功构建了自己的asp.net core web应用的镜像&#…

这么多Apache顶级项目,SkyWalking为何一枝独秀?

吴晟读完需要5分钟速读仅需 2 分钟吴晟Apache基金会会员&#xff0c;Apache SkyWalking创始人、项目VP和PMC成员&#xff0c;Apache孵化器PMC成员&#xff0c;Apache ShardingSphere PMC成员&#xff0c;Apache APISIX (incubating) PPMC成员&#xff0c;Apache ECharts (incub…

[RabbitMQ]工作原理_原理名词解释

RabbitMQ 核心部分 各个名词介绍 RabbitMQ工作原理 Broker&#xff1a; 接收和分发消息的应用&#xff0c;RabbitMQ Server 就是 Message Broker Virtual host&#xff1a; 出于多租户和安全因素设计的&#xff0c;把 AMQP 的基本组件划分到一个虚拟的分组中&#xff0c;类…

Istio 中的授权策略详解

本文节选自 ServiceMesher 社区出品的开源电子书《Istio Handbook——Istio 服务网格进阶实践》&#xff0c;阅读地址&#xff1a;https://www.servicemesher.com/istio-handbook/授权功能是 Istio 中安全体系的一个重要组成部分&#xff0c;它用来实现访问控制的功能&#xff…

[RabbitMQ]创建Java开发环境_消费者_生产者

我们将用 Java 编写两个程序。发送单个消息的生产者和接收消息并打印出来的消费者。我们将介绍 Java API 中的一些细节。 在下图中&#xff0c;“ P”是我们的生产者&#xff0c;“ C”是我们的消费者。中间的框是一个队列-RabbitMQ 代表使用者保留的消息缓冲区 引入依赖 <…

如何利用Gitlab-CI持续部署到远程机器?

长话短说&#xff0c;今天聊一聊使用Gitlab-CI 自动部署到远程服务器。如果看过《基于docker-compose的Gitlab CI/CD实践&排坑指南》这篇文章的朋友&#xff0c;会注意到我是在 Gitlab-Runner服务器上自动部署的站点&#xff0c;本次我们结合ssh部署到远程机器(将CI服务器和…

[RabbitMQ]工作队列原理_代码实现

Work Queues 工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务&#xff0c;而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时&#xff0c;这些工作…

使用ImpromptuInterface反射方便的创建自定义DfaGraphWriter

在本文中&#xff0c;我为创建的自定义的DfaGraphWriter实现奠定了基础。DfaGraphWriter是公开的&#xff0c;因此您可以如上一篇文章《将终结点图添加到你的ASP.NET Core应用程序中》中所示在应用程序中使用它&#xff0c;但它使用的所有类均已标记为internal。这使得创建自己…

[RabbitMQ]消息应答概念_消息手动应答代码

消息应答 概念 消费者完成一个任务可能需要一段时间&#xff0c;如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了&#xff0c;会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息&#xff0c;便立即将该消 息标记为删除。在这种情况下&#xff0c;突然有…

rust火箭基地主楼开启方法_Rust 为什么能成为 Stack Overflow 最受欢迎的语言?

每年&#xff0c;开发者问答网站 Stack Overflow 都会对程序员社区展开年度调查&#xff0c;包括他们最喜爱的技术到工作偏好的所有内容。 在2017 年和2018 年Stack Overflow 年度开发者调查中&#xff0c;Rust语言已经连续两年成为最受欢迎语言Top 1。2018 年 Stack Overflow …

[RabbitMQ]队列持久化

RabbitMQ持久化 概念 如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时&#xff0c;它忽视队列和消息&#xff0c;除非告知它不要这样做。确保消息不会丢失需要做两件事&#xff1a;我们需要将队列和消息都标…

微服务认证架构如何演进来的?

【答疑解惑】| 作者 / Edison Zhou这是恰童鞋骚年的第267篇原创内容之前有同事问为何要用基于JWT令牌的认证架构&#xff0c;然后近期又有童鞋在后台留言问微服务安全认证架构的实践&#xff0c;因此我决定花两篇推文来解答一下。为了答好这个话题&#xff0c;我们先来看看微服…

maskrcnn还可以加网络吗_绿茶加蜂蜜的功效,绿茶可以加蜂蜜吗?

绿茶是我国的主要茶类之一&#xff0c;是一种天然健康的饮料&#xff0c;蜂蜜也是一种营养丰富的滋补食品&#xff0c;有些人不喜欢绿茶的苦味&#xff0c;想放点蜂蜜中和一下&#xff0c;但是不知道能不能这样做。那么绿茶能不能加蜂蜜呢?蜂蜜的主要成分是葡萄糖、果糖&#…

三分钟Docker-镜像、容器实战篇

本文主要内容&#xff1a;Docker 镜像、容器 常用命令整理使用Docker常见命令&#xff0c;搭建Consul集群通过创建自定义镜像&#xff0c;把.NetCore Api运行在Docker中1.镜像、容器命令镜像序号命令描述1docker image build基于Dockerfile创建镜像2docker image history显示镜…

手机键鼠映射软件_吃鸡,我最专业!---盖世小鸡键鼠吃鸡套装评测

Hello大家好&#xff0c;欢迎浏览这篇评测贴。首先很荣幸能够参与本期的评测&#xff0c;毕竟如此炫酷富有科技感的装备是可遇而不可求的&#xff0c;所以不论是得知入选还是收到快递开箱的时候&#xff0c;心情都是无比激动。话不多说&#xff0c;接下来就让我带你走进这个不一…