官方网站建设进度表店铺seo是什么意思
C#百万数据处理
在我们经验的不断增长中不可避免的会遇到一些数据量很大操作也复杂的业务
这种情况我们如何取优化如何去处理呢?一般都要根据业务逻辑和背景去进行合理的改进。
文章目录
- C#百万数据处理
- 前言
- 一、项目业务需求和开发背景
- 项目开发背景
- 数据量计算
- 业务需求
- 业务分析
- 二、老代码分析
- 运行逻辑
- 代码展示
- 伪代码展示
- 分析性能低原因
- 三、优化后的代码分析
- 运行逻辑
- 代码展示
- 伪代码展示
- 优化了那些问题
- 总结
前言
这里我给大家带来一个我自己所经历的百万数据处理案例,该案例中会拿优化前和优化后的代码进行对比,让大家更直观的感受优化给项目带来的效率提升。该项目优化用到了:线程同步,多线程,sqlSugar,异步,委托等知识。
一、项目业务需求和开发背景
项目开发背景
随着连锁门店数量的不断增加,门店商品的库存数据和商品信息的维护业务压力不断增加。数据量越来越大。
数据量计算
假如数据如下:
- 门店数量:160家
- 门店商品数量:6000+
- 还需要更新商品对码信息:6000+
- 数据接口最多每页查询1000条
- 那么按以上数据要处理的数据最少就为:966000条
业务需求
- 新的商品要做插入操作
- 旧的商品要做更新操作(更新库存信息和商品编码厂家等信息)
- 门店库存数据变更要在半小时内完成
业务分析
按以上条件我们可以分析出:需要有Insert和Update操作。其中经验丰富的程序员就会发现,其实像这种业务一般 Insert操作最多只占所有业务的20%,Update操作占80%。
注意:
很多小白程序员都是不进行业务分析就直接撸代码,这种方式是不可取的,等你撸完代码之后你就会发现效率根本就不行。后面又花大量时间去优化,还如不体现分析了再进行开发。
二、老代码分析
运行逻辑
老代码的操作流程如下
- 查询出所有门店
- 循环门店,将门店数据传入操作函数中
- 操作函数中再根据传入的门店信息循环去调用接口
- 当传入的这个门店的数据拉取完成之后再去操作数据库
代码展示
代码如下:
//查询所有门店商品信息List<StoreCheckCodeDO> storeInfos = b_Stores.GetStoreCheckCodes();//这里是用于更新对码的信息List<ProductInfoDO> infoDOs = new List<ProductInfoDO>();foreach (var storeInfo in storeInfos ){//GetProductInfo的作用是调用总部接口获取并更新商品数据InventoryHelper.GetProductInfo(storeInfo , ref infoDOs );}if(infoDOs.count>0){//更新对码信息InventoryHelper.ChangeProductCheckCode(infoDOs);}
GetProductInfo 的具体实现:
public static bool GetProductInfo(string orgid, ref List<ProductInfoDO> infoDOs, string goodsNo = "", string goodsName = ""){bool rel = false;int pageSize = 1000;int pageNum = 1;List<DslStoreStockDO> list =根据门店ID查询它的商品库存信息Console.WriteLine("开始更新门店:{0}", orgid);//定义个集合来存储拉取到的数据List<StoreStockDO> Stores = new List<StoreStockDO>();while (true){try{DslERPRes<QueryStockRes> res3 =调用总部数据接口每页查询1000条,返回的数据if (res3 != null && res3.code == 0){if (res3.data != null && res3.data.list != null){if (res3.data.total > 0){#region 模型转换//循环总部返回的数据集合,并转成我们需要的DO是实体foreach (var product in res3.data.list){DslStoreStockDO dsl = new DslStoreStockDO();把总部数据转为我们需要的实体存储我们的集合中dslStores.Add(dsl);ProductInfoDO infoDO = 根据总部商品ID查询本地对码表返回商品对码信息实体if (infoDO == null){//如果没有则new一个对象infoDO = new ProductInfoDO();infoDO.LocalSku = product.goodsNo.ToString();}//如果有这个商品就对其数据更新,没有就用新的对象存储后续做新增操作infoDO.Name = product.goodsName;infoDO.DslSku = product.goodsNo.ToString();infoDO.goodsType = product.goodsType;infoDO.proarea = product.prodArea;infoDO.barcode = product.barcode;infoDO.CreateTime = DateTime.Now;infoDO.UpdateTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");infoDO.Operator = "系统自动更新";infoDO.IsDelete = false;//查询集合中已存在的该商品ProductInfoDO b_there = infoDOs.Where(t => t.DslSku == product.goodsNo.ToString()).FirstOrDefault();if (b_there != null){//如果有则更新为现在从总部拉取到的商品信息if (infoDO.goodsType != b_there.goodsType){b_there.goodsType = infoDO.goodsType;}if (infoDO.proarea != b_there.proarea){b_there.proarea = infoDO.proarea;}if (infoDO.barcode != b_there.barcode){b_there.barcode = infoDO.barcode;}}else{//插入实体集合中infoDOs.Add(infoDO);}}#endregion//判断该门店商品信息是否拉完if (res3.data.pages == pageNum){//利用list的 Except 方法进行数据筛选var different = list.Except(dslStores, new StoreStockDOComparer()).ToList();//找到没拉到库存的different.ForEach(goodsInfo =>{//把没有从总部拉到商品的库存数据变更为0goodsInfo.goodsQty = 0;goodsInfo.warehouseGoodsQty = 0;});//清除容器内数据从新插入信息变更数据//list 这个list就是我们开头根据门店id查询该门店所有商品库存信息的数据,//所以要清空重新写入我们调整好的数据list = new List<DslStoreStockDO>();list.AddRange(dslStores);list.AddRange(different);//获取完毕,正常结束Console.WriteLine("请稍等正在提交事务...");//_BulkCopyModel方法是做insertOrUpadate操作的。//根据后面传入的主键【goodsNo,placePointNo 】去操作数据库的数据int count = _sqlsugar_DSL._BulkCopyModel(list, "门店库存表", t => new { t.goodsNo, t.placePointNo });rel = count > 0;Console.WriteLine("门店:{0}数据更新成功,操作数据条数:{1}", orgid, count);break;}pageNum++;}else{Console.WriteLine("未查询到更新数据");break;}}else{///获取完毕,正常结束Console.WriteLine("门店:{0}第{1}页数据拉取失败", orgid, pageNum.ToString());break;}}else{//获取异常,强制结束Console.WriteLine("门店:{0}第{1}页数据发起请求失败:{2}", orgid, pageNum.ToString(), res3.msg);break;}}catch (Exception ex){Console.WriteLine("门店:{0}第{1}页数据处理失败:{2}", orgid, pageNum.ToString(), ex.Message);break;}}return rel;}
伪代码展示
上述代码还是比较复杂很长,不太能直观的一眼看到问题所在下面我就写一段伪造代码来给大家展示一下逻辑
代码如下(示例):
门店集合=查询到所有门店商品对码信息收集集合 =new List<T>();foreach (var 单个门店实体 in 门店集合 ){//GetProductInfo的作用是调用总部接口获取并更新商品数据InventoryHelper.GetProductInfo(门店ID, ref 商品对码信息收集集合 );}GetProductInfo(门店ID,ref 门店对码信息收集集合){List<库存数据实体> 旧库存数据=根据门店ID获取库存信息(门店ID);List<库存数据实体> 新库存数据=new List<库存数据实体>();while(true){页码=1;每页查询数量=1000;总部数据返回数据集合=请求总部数据接口(页码,每页查询数量)foreach (var 数据返回数据实体 in 总部数据返回数据集合 ){数据返回数据实体 转为 库存数据实体 新库存数据.add(库存数据实体)数据返回数据实体数据 取出所需数据组成 商品对码实体门店对码信息收集集合.add(商品对码实体)}if(页码=总部返回数据集合.总页数){新库存数据 做Insert OR Update 操作break;}页码++;}}
分析性能低原因
- 单线程,需要一个门店一个门店的处理
- 每处理一个门店都需要去操作一次数据库,做Insert OR Update 操作
- 单线程请求总部接口线程要等待接口返回才继续往下处理没有达到效率最大化
- 每次都要去查询门店的所有商品库存,导致不必要的查询开销。
三、优化后的代码分析
运行逻辑
- 查询出所有门店,根据传入线程数做分组
- 多线程异步获取总部接口库存数据
- 当分组的总部数据获取完成后以组为单位更新数据库数据
代码展示
代码如下:
public static void GetProductInfo(List<DslStoreCheckCodeDO> orgids, int ThreadNumber = 1){if (ThreadNumber > orgids.Count){ThreadNumber = 1;}string startTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");List<List<DslStoreCheckCodeDO>> groupedLists = orgids.Select((value, index) => new { Index = index, Value = value }).GroupBy(x => x.Index / (orgids.Count() / ThreadNumber)).Select(x => x.Select(v => v.Value).ToList()).ToList();List<ProductInfoDO> outval = new List<ProductInfoDO>();int _ThreadCount = groupedLists.Count();int finishcount = 0;object locker = new object();foreach (var dslStores in groupedLists){new Thread(async () =>{string TID = Thread.CurrentThread.ManagedThreadId.ToString("00");int count = 0;List<ProductInfoDO> productInfos = new List<ProductInfoDO>();List<DslStoreStockDO> dslStock = new List<DslStoreStockDO>();foreach (var StoreInfo in dslStores){count++;Console.WriteLine("线程ID[{0}]剩余【{1}/{2}】", TID, dslStores.Count, count);string orgid = StoreInfo.DslStoreNo;List<ProductS> products = await GetProductInfo(orgid);#region 模型转换foreach (var product in products){DslStoreStockDO dsl = new DslStoreStockDO();dsl.placePointNo = orgid;dsl.goodsNo = product.goodsNo;dsl.goodsId = product.goodsId;dsl.goodsName = product.goodsName;dsl.goodsType = string.IsNullOrWhiteSpace(product.goodsType) ? "无" : product.goodsType;dsl.prodArea = product.prodArea ?? "无";dsl.barcode = string.IsNullOrWhiteSpace(product.barcode) ? "无" : product.barcode;dsl.goodsQty = product.goodsQty;dsl.warehouseGoodsQty = product.warehouseGoodsQty ?? 0;dsl.purPrice = product.purPrice ?? 0;dsl.retailPrice = product.retailPrice;dsl.salesTaxRate = product.salesTaxRate;dsl.cretime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");dsl.isChange = true;dslStock.Add(dsl);ProductInfoDO infoDO = dSL_ERP.GetCodeCahe(product.goodsNo.ToString());if (infoDO == null){infoDO = new ProductInfoDO();infoDO.LocalSku = product.goodsNo.ToString();}infoDO.Name = product.goodsName;infoDO.DslSku = product.goodsNo.ToString();infoDO.goodsType = product.goodsType;infoDO.proarea = product.prodArea;infoDO.barcode = product.barcode;infoDO.CreateTime = DateTime.Now;infoDO.UpdateTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");infoDO.Operator = "系统自动更新";infoDO.IsDelete = false;productInfos.Add(infoDO);}#endregion}if (count == dslStores.Count){ //分配的门店处理完成后再同一操作数据库List<string> SotoreIds = dslStores.Select(t => t.DslStoreNo).ToList();List<DslStoreStockDO> Oldstocks = _sqlsugar_DSL.GetModelList<DslStoreStockDO>("StoreStock", t => SotoreIds.Contains(t.placePointNo)); List<DslStoreStockDO> newStocks = dslStock.Except(Oldstocks, new StoreStockDOComparer()).ToList();List<DslStoreStockDO> commonStocks = dslStock.Except(newStocks, new StoreStockDOComparer()).ToList();try{if (Oldstocks.Count > 0){var diffent = Oldstocks.Except(dslStock, new StoreStockDOComparer()).ToList();diffent.ForEach(goodsInfo =>{goodsInfo.goodsQty = 0;goodsInfo.warehouseGoodsQty = 0;});commonStocks.AddRange(diffent);}Console.WriteLine("请稍等正在提交...");int InsNumber = 0;if (newStocks.Count > 0){InsNumber = await _sqlsugar_DSL.AsynBulkCopyModel(newStocks, "StoreStock", t => new { t.goodsNo, t.placePointNo });}int UpdNumber = await _sqlsugar_DSL.AsynBulkUpdateModel(commonStocks, "StoreStock", new string[] { "goodsNo", "placePointNo" }, new string[] { "placePointNo", "goodsNo", "goodsId", "goodsName", "goodsType", "prodArea", "barcode", "goodsQty", "warehouseGoodsQty", "purPrice", "retailPrice", "salesTaxRate", "cretime", "isChange" });Console.WriteLine("提交成功,更新{0}条数据,插入{1}条数据", UpdNumber, InsNumber);}catch (Exception ex){Console.WriteLine("线程{0}内部报错:{1}", TID, ex.Message);}}lock (locker){outval.AddRange(productInfos);finishcount++;Monitor.Pulse(locker); //完成,通知等待队列,告知已完,执行下一个。}}).Start();}lock (locker){while (finishcount != _ThreadCount){Monitor.Wait(locker);//等待}}IEnumerable<ProductInfoDO> infoDOs = outval.OrderByDescending(x => x.UpdateTime).GroupBy(x => new { x.DslSku, x.barcode }).Select(y => y.First());Console.WriteLine("开始更新商品对码表,待更新数:{0}", infoDOs.Count());if (infoDOs.Count() > 0){Console.WriteLine("正在提交...");ChangeProductCheckCode(infoDOs.ToList());Dictionary<string, string> pairs = new Dictionary<string, string>();pairs.Add("JsonStr", CommonFun.Base64Encode(JsonConvert.SerializeObject(infoDOs)));string rel = HttpHelper.HttpPost("...", pairs);Console.WriteLine("商品对码表跟新成功,商品信息表更新:{0}", rel);}Console.WriteLine("【全量更新所有门店】执行完毕线程挂起24小时...[Start:{0}|End:{1}]", startTime, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));#region 异步获取商品信息async Task<List<ProductS>> GetProductInfo(string orgid){List<ProductS> products = new List<ProductS>();string res = await Task.Run(() =>{int pageNum = 1;int pageSize = 1000;while (true){QueryStockReq req3 = new QueryStockReq();req3.goodsno = "";req3.goodsName = "";req3.pageNum = pageNum;req3.pageSize = pageSize;req3.placePointNo = orgid;Console.WriteLine("门店【{1}】开始获取第{0}页数据-线程ID【{2}】", req3.pageNum.ToString(), orgid, Thread.CurrentThread.ManagedThreadId.ToString("00"));DslERPReq dslERP23 = new DslERPReq(req3);DslERPRes<QueryStockRes> res3 = dSL_ERP.queryStock(dslERP23);if (res3 != null && res3.code == 0){if (res3.data != null && res3.data.list != null){if (res3.data.total > 0){products.AddRange(res3.data.list);if (res3.data.pages == pageNum){break;}pageNum++;}else{Console.WriteLine("未查询到更新数据,返回数据{0},数据JSON:【{1}】", res3.msg, JsonConvert.SerializeObject(res3));break;}}else{Console.WriteLine("门店:{0}第{1}页数据拉取失败返回信息{2}", orgid, req3.pageNum.ToString(), res3.msg);break;}}else{Console.WriteLine("门店:{0}第{1}页数据发起请求失败:{2}", orgid, req3.pageNum.ToString(), res3.msg);}}return "";});return products;}#endregion}
伪代码展示
public static void GetProductInfo(List<门店信息实体> 门店信息实体集合, int 处理线程数量= 1){if (处理线程数量 > 门店信息实体集合.Count){处理线程数量 = 1;}string startTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");List<List<门店信息实体>> 门店分组集合= 门店信息实体集合.Select((value, index) => new { Index = index, Value = value }).GroupBy(x => x.Index / (门店信息实体集合.Count() / 处理线程数量)).Select(x => x.Select(v => v.Value).ToList()).ToList();List<商品对码实体> outval = new List<商品对码实体>();int _ThreadCount = 门店分组集合.Count();int finishcount = 0;object locker = new object();//线程锁foreach (var 门店实体集合 in 门店分组集合){new Thread(async () =>{string TID = Thread.CurrentThread.ManagedThreadId.ToString("00");int count = 0;List<ProductInfoDO> productInfos = new List<ProductInfoDO>();List<库存数据实体> 总部库存数据集合= new List<库存数据实体>();foreach (var StoreInfo in 门店实体集合){count++;Console.WriteLine("线程ID[{0}]剩余【{1}/{2}】", TID, 门店实体集合.Count, count);string orgid = StoreInfo.门店ID;//这里是异步从总部库存信息接口获取商品库存信息List<ProductS> products = await GetProductInfo(orgid);#region 模型转换foreach (var product in products){库存数据实体dsl = new 库存数据实体();dsl.placePointNo = orgid;dsl.goodsNo = product.goodsNo;dsl.goodsId = product.goodsId;dsl.goodsName = product.goodsName;dsl.goodsType = string.IsNullOrWhiteSpace(product.goodsType) ? "无" : product.goodsType;dsl.prodArea = product.prodArea ?? "无";dsl.barcode = string.IsNullOrWhiteSpace(product.barcode) ? "无" : product.barcode;dsl.goodsQty = product.goodsQty;dsl.warehouseGoodsQty = product.warehouseGoodsQty ?? 0;dsl.purPrice = product.purPrice ?? 0;dsl.retailPrice = product.retailPrice;dsl.salesTaxRate = product.salesTaxRate;dsl.cretime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");dsl.isChange = true;总部库存数据集合.Add(dsl);ProductInfoDO infoDO = dSL_ERP.GetCodeCahe(product.goodsNo.ToString());if (infoDO == null){infoDO = new ProductInfoDO();infoDO.LocalSku = product.goodsNo.ToString();}infoDO.Name = product.goodsName;infoDO.DslSku = product.goodsNo.ToString();infoDO.goodsType = product.goodsType;infoDO.proarea = product.prodArea;infoDO.barcode = product.barcode;infoDO.CreateTime = DateTime.Now;infoDO.UpdateTime = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");infoDO.Operator = "系统自动更新";infoDO.IsDelete = false;productInfos.Add(infoDO);}#endregion}//注意这里我为什么要用count来判断这组是否执行完成,因为我们操作数据库的语句优化为了异步的//而Look语句内是不支持await 语句的,所有利用组合里的门店数和已处理数来判断if (count == 门店实体集合.Count){ //分配的门店处理完成后再同一操作数据库List<string> SotoreIds = 门店实体集合.Select(t => t.门店ID).ToList();// 查询出这个批次里所有门店的老的库存数据List<库存数据实体> Oldstocks = _sqlsugar_DSL.GetModelList<库存数据实体>("StoreStock", t => SotoreIds.Contains(t.门店ID)); //List<库存数据实体> newStocks = 总部库存数据集合.Except(Oldstocks, new StoreStockDOComparer()).ToList();List<库存数据实体> commonStocks = 总部库存数据集合.Except(newStocks, new StoreStockDOComparer()).ToList();try{if (Oldstocks.Count > 0){var diffent = Oldstocks.Except(总部库存数据集合, new StoreStockDOComparer()).ToList();diffent.ForEach(goodsInfo =>{goodsInfo.goodsQty = 0;goodsInfo.warehouseGoodsQty = 0;});commonStocks.AddRange(diffent);}Console.WriteLine("请稍等正在提交...");int InsNumber = 0;if (newStocks.Count > 0){InsNumber = await _sqlsugar_DSL.AsynBulkCopyModel(newStocks, "StoreStock", t => new { t.goodsNo, t.placePointNo });}int UpdNumber = await _sqlsugar_DSL.AsynBulkUpdateModel(commonStocks, "StoreStock", new string[] { "goodsNo", "placePointNo" }, new string[] { "placePointNo", "goodsNo", "goodsId", "goodsName", "goodsType", "prodArea", "barcode", "goodsQty", "warehouseGoodsQty", "purPrice", "retailPrice", "salesTaxRate", "cretime", "isChange" });Console.WriteLine("提交成功,更新{0}条数据,插入{1}条数据", UpdNumber, InsNumber);}catch (Exception ex){Console.WriteLine("线程{0}内部报错:{1}", TID, ex.Message);}}lock (locker){outval.AddRange(productInfos);finishcount++;Monitor.Pulse(locker); //完成,通知等待队列,告知已完,执行下一个。}}).Start();}lock (locker){while (finishcount != _ThreadCount){Monitor.Wait(locker);//等待}}IEnumerable<ProductInfoDO> infoDOs = outval.OrderByDescending(x => x.UpdateTime).GroupBy(x => new { x.DslSku, x.barcode }).Select(y => y.First());Console.WriteLine("开始更新商品对码表,待更新数:{0}", infoDOs.Count());if (infoDOs.Count() > 0){Console.WriteLine("正在提交...");ChangeProductCheckCode(infoDOs.ToList());Dictionary<string, string> pairs = new Dictionary<string, string>();pairs.Add("JsonStr", CommonFun.Base64Encode(JsonConvert.SerializeObject(infoDOs)));string rel = HttpHelper.HttpPost("...", pairs);Console.WriteLine("商品对码表跟新成功,商品信息表更新:{0}", rel);}Console.WriteLine("【全量更新所有门店】执行完毕线程挂起24小时...[Start:{0}|End:{1}]", startTime, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"));#region 异步获取商品信息async Task<List<ProductS>> GetProductInfo(string orgid){List<ProductS> products = new List<ProductS>();string res = await Task.Run(() =>{int pageNum = 1;int pageSize = 1000;while (true){QueryStockReq req3 = new QueryStockReq();req3.goodsno = "";req3.goodsName = "";req3.pageNum = pageNum;req3.pageSize = pageSize;req3.placePointNo = orgid;Console.WriteLine("门店【{1}】开始获取第{0}页数据-线程ID【{2}】", req3.pageNum.ToString(), orgid, Thread.CurrentThread.ManagedThreadId.ToString("00"));DslERPReq dslERP23 = new DslERPReq(req3);DslERPRes<QueryStockRes> res3 = dSL_ERP.queryStock(dslERP23);if (res3 != null && res3.code == 0){if (res3.data != null && res3.data.list != null){if (res3.data.total > 0){products.AddRange(res3.data.list);if (res3.data.pages == pageNum){break;}pageNum++;}else{Console.WriteLine("未查询到更新数据,返回数据{0},数据JSON:【{1}】", res3.msg, JsonConvert.SerializeObject(res3));break;}}else{Console.WriteLine("门店:{0}第{1}页数据拉取失败返回信息{2}", orgid, req3.pageNum.ToString(), res3.msg);break;}}else{Console.WriteLine("门店:{0}第{1}页数据发起请求失败:{2}", orgid, req3.pageNum.ToString(), res3.msg);}}return "";});return products;}#endregion}
优化了那些问题
- 从总部取数据变为了异步多线程避免了线程阻塞
- 门店按组合处理减小了数据库操作次数
- 操作数据库改为异步避免阻塞
总结
以上就是我个人所经历的大数据处理,虽然优化的并不算完美但是还是总结出了不少经验,也从中学习到了很多,比如,单线程和多线程的运用以及委托和线程同步等知识。