作为开发人员,能学会简单地对历史数据迁移是日常基本功。在上篇文章中,我们初步地解释了历史数据迁移的基本概念,以及如何使用SQL SERVER存储过程实现对历史数据迁移。
一般来说直接在数据库中写SQL语句(insert into ... select from)的方式进行迁移,仅适用于“停服状态”下的数据迁移场景,也就是数据库处于无用户使用的情况下,而且迁移场景有限。
当数据库在生产环境中实时在用,而且数据量较大的前提下,很显然会影响性能,上述方法并不可取。
今天我们来介绍一个新的思路,可将历史数据迁移对现有实时在用数据库的性能影响降到较低水平。
简单描述下思路:
1、先利用程序根据创建时间升序排序,直接查询TopN条数据,记录到程序缓存中。
【查询后就和正式数据库没关系了,只要TopN不大,这个简单的查询几乎对数据库没有任何影响。】
2、然后程序将缓存中的TopN条数据写入到历史库。
【这个阶段和正式数据库没有半点关系,哪怕写的再慢,也不影响正式数据库】
3、核对下历史数据库中的数据,确保没有问题,就可以删除正式环境中的TopN条数据了。
【这里删除操作稍微比查询影响大些,但是仅仅是小批量的数据删除同样影响不大。】
4、如果需要大批量数据迁移怎么办? 非常简单,分批执行,比如循环执行CountX次,那么迁移的总数据量就为TopN×CountX ,所以根本不怕数据量大,开启程序自动执行即可,也就是用耗时间来减少对数据库性能的影响。【执行时间换性能】
这个思路和insert into ... select from的方式,最大的区别就在用写入历史数据库的过程不影响正式在用的数据库,只剩下TopN条数据的查询和删除操作,影响很小。
特别说明:TopN不能太大,这个越小越好,根据表字段的多少,数据库的性能,以及用户连接数情况综合考虑,建议TopN取值在1000到5000条之间,这样对数据库影响几乎可以忽略。
为了实现这个思路,我这里也随便写个简单程序来试试效果,做个例子,仅供大家参考。
由于是最基础的例子,我就不用通用的底层框架了,免得大家看起来吃力。同时为了运行演示方便,直接写个exe可执行程序好了。
这个讲解的例子,用到的技术主要包括:
数据库:SQL SERVER,
数据库访问组件:Entity Framework Core
日志记录:Serialog.AspNetCore
开发语言:C#
技术框架:.NET 5
项目模版:控制台应用程序
按照EntityFramework Core的思路,先建个Model吧。
1 using System; 2 using System.Collections.Generic; 3 using System.ComponentModel.DataAnnotations; 4 using System.ComponentModel.DataAnnotations.Schema; 5 using System.Text; 6 7 namespace Tyingtech_glps.Entities.HDM 8 { 9 /// <summary> 10 /// 接口请求记录 11 /// </summary> 12 [Table("GLPS_APIREQUEST")] 13 public class GLPS_APIREQUEST 14 { 15 [Key] 16 public string FID { get; set; } 17 18 /// <summary> 19 /// 接口编号【固定】 20 /// </summary> 21 public string FAPICODE { get; set; } 22 23 /// <summary> 24 /// 请求方身份ID 25 /// </summary> 26 public string FAPPID { get; set; } 27 28 /// <summary> 29 /// 接口请求方URL地址 30 /// </summary> 31 public string FFORMURL { get; set; } 32 33 /// <summary> 34 /// 接口请求方IP 35 /// </summary> 36 public string FIP { get; set; } 37 38 /// <summary> 39 /// 请求参数(JSON字符串) 40 /// </summary> 41 public string FREQUESTDATA { get; set; } 42 43 /// <summary> 44 /// 请求时间点 45 /// </summary> 46 public DateTime FREQTIME { get; set; } 47 48 /// <summary> 49 ///响应参数(JSON字符串) 50 /// </summary> 51 public string FRESPONSE { get; set; } 52 53 /// <summary> 54 /// 响应时间点 55 /// </summary> 56 public DateTime FRESTIME { get; set; } 57 58 /// <summary> 59 /// 总毫秒数 60 /// </summary> 61 public int FMILLISECOND { get; set; } 62 63 /// <summary> 64 /// 接口请求结果(1:成功;0:失败;-1:接口内部异常) 65 /// </summary> 66 public int FISSUCCESS { get; set; } 67 68 /// <summary> 69 /// 失败详情,仅内部使用(如:内部报错异常信息;AppId错误;非法请求;参数不全;...) 70 /// </summary> 71 public string FRESULT { get; set; } 72 73 /// <summary> 74 /// 约定格式数据1 (如:车牌号) 75 /// </summary> 76 public string FDATA1 { get; set; } 77 78 /// <summary> 79 /// 约定格式数据2 (如:进场、离场) 80 /// </summary> 81 public string FDATA2 { get; set; } 82 83 /// <summary> 84 /// 约定格式数据3 (如:...) 85 /// </summary> 86 public string FDATA3 { get; set; } 87 88 /// <summary> 89 /// 约定格式数据4(如:...) 90 /// </summary> 91 public string FDATA4 { get; set; } 92 93 /// <summary> 94 /// 约定格式数据5(如:...) 95 /// </summary> 96 public string FDATA5 { get; set; } 97 98 /// <summary> 99 /// 创建时间 100 /// </summary> 101 public DateTime FCREATETIME { get; set; } 102 } 103 }
1 /// <summary> 2 /// 历史数据迁移【一次性迁移】 3 /// </summary> 4 /// <param name="n">数据迁移量(条数) top N</param> 5 /// <returns>实际迁移成功数量</returns> 6 public static int DataToHisExecOne(int n) 7 { 8 n = n > 10000 ? 10000 : n; //一次最多1万(再多可能会对性能有影响【迁移第一条:尽量不能影响现有在用数据库的业务】) 9 10 //测试环境 11 EnumWhichDB whichDB = EnumWhichDB.DevGlps; 12 EnumWhichDB whichDBHis = EnumWhichDB.DevGlpsHis; 13 14 string connStr = DBConnectionString.GetConnStr(whichDB); //当前需要迁移的数据库 15 string connStrHis = DBConnectionString.GetConnStr(whichDBHis); //历史数据库 16 17 //Serialog 记录日志 18 var logFileName = string.Format("ToHisOne_ApiRequest_{0}.txt", DateTime.Now.ToString("yyyyMMddHH")); 19 var log = new LoggerConfiguration() 20 .WriteTo.Console() 21 .WriteTo.File(logFileName) 22 .CreateLogger(); 23 24 log.Information($"开始迁移数据,计划迁移条数为:{n}"); 25 26 int beginCount = GetRowCount(whichDB); //当前数据库数据迁移前的记录行数 27 int beginCountHis = GetRowCount(whichDBHis); //历史数据库数据迁移前的记录行数 28 29 Stopwatch sw = new Stopwatch(); //检测运行时间(对每个阶段) 30 Stopwatch swAll = new Stopwatch(); //总耗时 31 swAll.Start(); 32 33 sw.Start(); 34 DateTime maxCreateTime = GetAscMaxCreateTimeTopN(whichDB,n); 35 sw.Stop(); 36 log.Information("对应迁移数据FCREATETIME为:" + maxCreateTime.ToString("yyyy-MM-dd HH:mm:ss.fffff") + $",查询maxCreateTime耗时:{sw.ElapsedMilliseconds} 毫秒。"); 37 38 sw.Restart(); //重新开始计时 39 List<GLPS_APIREQUEST> records = new List<GLPS_APIREQUEST>(); 40 //从数据库查最早的数据 41 using (var db = new GlpsDbContext(connStr)) 42 { 43 records = db.DS_ApiRequest 44 .Where(t => t.FCREATETIME <= maxCreateTime) 45 .OrderBy(t => t.FCREATETIME) 46 .AsNoTracking() //非跟踪查询(只读,提升效率) 47 .ToListAsync().Result; 48 } 49 sw.Stop(); //计时结束 50 log.Information($"按时间点实际查询到 {records.Count} 条数据,耗时:{sw.ElapsedMilliseconds} 毫秒。"); 51 52 sw.Restart(); //重新开始计时 53 //写数据到历史数据库 54 int newCount = 0; 55 using (var db = new GlpsDbContext(connStrHis)) 56 { 57 db.DS_ApiRequest.AddRange(records); 58 newCount = db.SaveChanges(); //最后保存数据 59 log.Information($"实际成功写入到历史数据库条数: {newCount}"); 60 } 61 sw.Stop(); //计时结束 62 log.Information($"实际成功写入到历史数据库条数:{newCount} , 写入耗时:{sw.ElapsedMilliseconds} 毫秒。"); 63 64 65 //最后删除数据 66 DateTime maxCreateTimeHis = GetMaxCreateTime(whichDBHis); //历史数据库最大的FCreateTime 67 68 //两个时间相同,则可以删除数据,否则不删除,直接预警(中间数据可能出错,需要人工干预) 69 if (maxCreateTime == maxCreateTimeHis) 70 { 71 sw.Restart(); 72 var rowCount = DeleteByFCreateTime(whichDB, maxCreateTime); 73 sw.Stop(); 74 log.Information($"迁移后删除数据条数:{rowCount} , 删除耗时:{sw.ElapsedMilliseconds} 毫秒。"); 75 } 76 else if (newCount < n && newCount == records.Count) //就是实际小于n,那么是:C#的datetime和数据库的datetime精度不同 77 { 78 sw.Restart(); 79 var rowCount = DeleteByFCreateTime(whichDB,maxCreateTimeHis); //需按历史数据库的日期来删除 80 sw.Stop(); 81 log.Information($"迁移后删除数据条数:{rowCount} , 删除耗时:{sw.ElapsedMilliseconds} 毫秒。"); 82 } 83 else 84 { 85 log.Error("数据对比出错:maxCreateTime != maxCreateTimeHis。 未执行最后的删除数据!!!请开发人员核对数据。"); 86 log.Information("maxCreateTime = " + maxCreateTime.ToString("yyyy-MM-dd HH:mm:ss.fff")); 87 log.Information("maxCreateTimeHis = " + maxCreateTimeHis.ToString("yyyy-MM-dd HH:mm:ss.fff")); 88 } 89 90 int endCount = GetRowCount(whichDB); //当前数据库数据迁移后的记录行数 91 int endCountHis = GetRowCount(whichDBHis); //历史数据库数据迁移后的记录行数 92 log.Information($"迁移前GLPS_APIREQUEST的数据条数:{beginCount} , 迁移后数据条数:{endCount}"); 93 log.Information($"迁移前GLPS_APIREQUESTHis的数据条数:{beginCountHis} , 迁移后数据条数:{endCountHis}"); 94 95 swAll.Stop(); 96 log.Information($"swAll:迁移总耗时:{swAll.ElapsedMilliseconds} 毫秒。"); 97 log.Information("------------------------------------------------"); 98 99 return newCount; 100 }
中间用到的几个单独逻辑的方法
1 /// <summary> 2 /// 获取总记录行数 3 /// </summary> 4 /// <param name="whichDB"></param> 5 /// <returns></returns> 6 private static int GetRowCount(EnumWhichDB whichDB) 7 { 8 string connStr = DBConnectionString.GetConnStr(whichDB); 9 int totalCount = 0; 10 using (var db = new GlpsDbContext(connStr)) 11 { 12 totalCount = db.DS_ApiRequest.Count(); 13 } 14 return totalCount; 15 } 16 17 18 /// <summary> 19 /// 查询数据库中最大的FCreateTime 20 /// </summary> 21 /// <param name="whichDB"></param> 22 /// <returns></returns> 23 private static DateTime GetMaxCreateTime(EnumWhichDB whichDB) 24 { 25 string connStr = DBConnectionString.GetConnStr(whichDB); 26 DateTime maxCreateTime = DateTime.MinValue; 27 using (var db = new GlpsDbContext(connStr)) 28 { 29 //查询历史数据库,最大的FCREATETIME 30 var record = db.DS_ApiRequest.OrderByDescending(t => t.FCREATETIME).Take(1).SingleOrDefault(); 31 if (record != null) 32 { 33 maxCreateTime = record.FCREATETIME; 34 } 35 } 36 return maxCreateTime; 37 } 38 39 /// <summary> 40 /// 按创建时间从小到大排序(FCreateTime Asc),取前N条数据的最大FCreateTime 41 /// 【即:取最早N条数据中,最大的创建时间】 42 /// </summary> 43 /// <param name="whichDB"></param> 44 /// <param name="topN"></param> 45 /// <returns></returns> 46 private static DateTime GetAscMaxCreateTimeTopN(EnumWhichDB whichDB,int topN) 47 { 48 string connStr = DBConnectionString.GetConnStr(whichDB); 49 DateTime maxCreateTime = DateTime.MinValue; 50 using (var db = new GlpsDbContext(connStr)) 51 { 52 //查询totalCount对应最大的FCREATETIME 53 var record = db.DS_ApiRequest.OrderBy(t => t.FCREATETIME) 54 .Skip(topN - 1).Take(1).SingleOrDefault(); 55 if (record != null) 56 { 57 maxCreateTime = record.FCREATETIME; 58 } 59 } 60 return maxCreateTime; 61 } 62 63 /// <summary> 64 /// 删除小于等于【某个创建时间】的数据 65 /// </summary> 66 /// <param name="whichDB"></param> 67 /// <param name="maxCreateTime"></param> 68 /// <returns>删除记录数</returns> 69 private static int DeleteByFCreateTime(EnumWhichDB whichDB, DateTime maxCreateTime) 70 { 71 int rowCount = 0; 72 string connStr = DBConnectionString.GetConnStr(whichDB); 73 using (var db = new GlpsDbContext(connStr)) 74 { 75 List<SqlParameter> listParams = new List<SqlParameter>{ 76 new SqlParameter("FCREATETIME", maxCreateTime) 77 }; 78 rowCount = db.Database.ExecuteSqlRaw(@"delete from GLPS_APIREQUEST where FCREATETIME<=@FCREATETIME", listParams); 79 } 80 return rowCount; 81 }
1 /// <summary> 2 /// 历史数据迁移【分批迁移】 3 /// </summary> 4 /// <param name="totalCount">任务总迁移条数</param> 5 /// <param name="prePageCount">分批迁移,单次查询数量</param> 6 public static void DataToHis(int totalCount=100,int prePageCount=10) 7 { 8 //临界值设定 9 totalCount = totalCount > 1000000 ? 1000000 : totalCount; //单次任务100万 10 prePageCount = prePageCount > 10000 ? 10000 : prePageCount; //每次1万 11 12 //Serialog 记录日志 13 var logFileName = string.Format("ToHis_ApiRequest_{0}.txt", DateTime.Now.ToString("yyyyMMddHH")); 14 var log = new LoggerConfiguration() 15 .WriteTo.Console() 16 .WriteTo.File(logFileName) 17 .CreateLogger(); 18 19 Stopwatch swAll = new Stopwatch(); //总耗时 20 swAll.Start(); 21 22 log.Information($"本次计划总迁移数据条数:{totalCount},分批单次执行条数:{prePageCount}"); 23 int okCount = 0; //迁移成功的条数 24 int runTimes = 0; 25 while (okCount < totalCount) 26 { 27 runTimes++; 28 if (runTimes % 20 == 0) 29 { 30 Console.Clear(); //每执行20次的时候,清除控制台 31 Console.WriteLine("控制台已被清理。"); 32 } 33 if (totalCount - okCount < prePageCount) 34 { 35 prePageCount = totalCount - okCount; //最后一次如果没有一页数据,只迁移部分 36 if (prePageCount == 1) break; //如果是1条的话,日期精度容易出现问题,特意控制不执行 37 } 38 okCount += DataToHisExecOne(prePageCount); 39 log.Information(@"已执行累计条数{0},累计耗时:{1}分{2}秒{3},累计执行次数{4}", okCount, swAll.Elapsed.TotalMinutes, swAll.Elapsed.Seconds, swAll.Elapsed.Milliseconds, runTimes); 40 } 41 42 swAll.Stop(); 43 log.Information($"本次实际总迁移数据条数:{okCount},共分批执行次数:{runTimes}"); 44 log.Information("================================================"); 45 46 }
配置的目的是为了方便执行,免得改程序。
1 "TyUseEnv": "0", //使用环境(0:测试环境;1:正式环境(沙箱)) 2 "TyTopN": "10", //每次迁移的数据条数 3 "TyTotalCount": "25", //分批总迁移数据量 4 "TyForTable": "GLPS_APIREQUEST", //需要迁移数据的表名(测试已支持:GLPS_APIREQUEST、GLPS_GATEENTRYREC)
其他的表以此类推,可以进行多表切换。
在控制台Main方法中通过依赖注入的方式,动态实例化要迁移的表结构。
历史数据迁移,只要支持此接口(IExecDataToHis)即可,说白就2个迁移方法而已:单次执行和分批循环执行。
其他的表以此类推,可以进行多表切换。
在控制台Main方法中通过依赖注入的方式,动态实例化要迁移的表结构。
历史数据迁移,只要支持此接口(IExecDataToHis)即可,说白就2个迁移方法而已:单次执行和分批循环执行。
1 using Twi.NET5.Core; 2 3 namespace Tyingtech_glps.Interface.HDM 4 { 5 /// <summary> 6 /// 可执行的数据迁移接口(单个和分批) 7 /// </summary> 8 public interface IExecDataToHis : IWhoAmI 9 { 10 /// <summary> 11 /// 历史数据迁移【分批迁移】 12 /// </summary> 13 /// <param name="totalCount">任务总迁移条数</param> 14 /// <param name="perPageCount">分批迁移,单次查询每页数量</param> 15 public TwiReturnBase DataToHis(int totalCount = 10000, int perPageCount = 1000); 16 17 /// <summary> 18 /// 历史数据迁移【一次性迁移】 19 /// </summary> 20 /// <param name="n">数据迁移量(条数) top N</param> 21 public TwiReturnBase DataToHisExecOne(int n); 22 } 23 }
TwiReturnBase 就是一个统一封装的返回类型,不用管它。
最后exe程序的界面效果如下。
然后就是执行命令了,单次迁移输入1,分批执行迁移输入2。
迁移哪张表,每次单次迁移多少条,总共迁移多少数据量,都可在appsettings.json中配置。
直接输入命令1,开始执行。
(哈哈,看来我直接本地电脑还是非常卡的,不过不影响思路的效果,等最后我们换台测试服务器看看1000万数据的效果。)
自动执行多次的效果,只要配置好,就会自动执行,分配执行不影响性能。
如果中间报错,会自动停止执行。
我们换台测试服务器,用此方法测试下有数据在跑的沙箱环境一千万数据量的效果,每天源源不断执行,测试迁移了快939万的数据问题都不大。
由于沙箱环境中也有数据在跑,所以业务数据库是实时增加数据量的。
这个思路能解决哪怕是生产环境都可以用,有事没事迁移点数据量,对数据库性能没有多大影响,而且支持不同数据库的迁移,比如SQL SERVER 到MySQL等等,但是缺点就是迁移起来比较慢些。适用于小企业小项目的常见场景,这个思路基本就够了。
【以后有空我还会再开篇文章讲别的大数据迁移思路】
当然最后要说的是,这个例子不是很靠谱,为什么这么说呢?首先这个是硬编码方式,局限性太大,如果要通用的话,程序应该再抽下,直接配置表名相关的即可。
实现通用工具模式,任何时候无需修改代码,直接简单改配置即可。这个我们下期教程继续改进思路。
原文:https://www.cnblogs.com/feitianxinhong/p/15310956.html