Вы отмечаете check all нажимаете Save и
В данной статье мы поговорим о том как можно оптимзировать процесс вставки большого количества данных, большого значит > 1 000. Хотя вы сможете применять это по своим нуждам.
В MS SQL 2000 появилась команда BULK INSERT, которая возволяет загрузить данные из файла в SQL Server "пачкой". Внутри эта команда использует утилиту BCP, копирует данные из файла в указанную таблицу. В ADO.NET 2.0 появился класс SqlBulkCopy, который является обверткой над командой BULK INSERT. В его использовании нет ничего сложного. Вы указываете строку соединения, таблицу в которую хотите вставить данные и сами данные.
Это нам подходит! Надо бы сделать это гламурненько в стиле L2S.
Погуглив, я нашел пример как это делается в стиле L2S, но как по мне он сложноват как в исполнении так и в чтении. Если хотите, можете взять его, он рабочий, я проверил. Но! У меня получилось быстрей :). Почему - не знаю. В конце статьи я приведу примеры по скорости выполнения.
Для реализации своего примера я почерпнул пару штрихов из источника, поэтому желательно если вы с ним ознакомитесь, хотя и необязательно. Примеры я буду приводить на всем известной Northwind базе данных. В результате у нас получится слудующее:
IEnumerable<Customer> dataToInsert = GenerateCustomers(100000); int batchSize = 5000; context.Customers.BulkInsert(dataToInsert, batchSize);
Для того, чтобы сделать вставку, необходимо вызвать метод WriteToServer у класса SqlBulkCopy. У него есть несколько перегрузок:
мы будем использовать метод, который принимает IDataReader. У интерфейса IDataReader очень много методов, но нам необходимо только 4. Создадим класс реализующий этот интерфейс. Методы, которые нам нужны пометим как abstract, остальные как virtual, они нам не нужны и я их не буду показывать в коде.
public abstract class SqlBulkCopyReader : IDataReader { public abstract bool Read(); public abstract object GetValue(int i); public abstract int GetOrdinal(string name); public abstract int FieldCount { get; } #region // Not required ... .... #endregion }
Отнаследуем класс от SqlBulkCopyReader:
public class LinqEntityDataReader<T> : SqlBulkCopyReader where T : class { private DataTable _sourceTable; private readonly IEnumeratort<T> _enumerator; private DataRow _current; public LinqEntityDataReader(IEnumerable<T> source) { MapTableName(); MapColumns(); _enumerator = source.GetEnumerator(); } public string DestenationTable { get { return _sourceTable.TableName; } } public IEnumerable<string> Columns { get { foreach (DataColumn column in _sourceTable.Columns) { yield return column.ColumnName; } } } public override object GetValue(int columnIndex) { return _current[columnIndex]; } public override int GetOrdinal(string name) { return _sourceTable.Columns[name].Ordinal; } public override bool Read() { bool next = _enumerator.MoveNext(); if (next) { _current = _sourceTable.NewRow(); foreach (DataColumn column in _sourceTable.Columns) { _current[column] = typeof(T).GetProperty(column.ColumnName) .GetValue(_enumerator.Current, null); } } return next; } public override int FieldCount { get { return _sourceTable.Columns.Count; } } private void MapTableName() { Type entityType = typeof(T); TableAttribute destenationTable = entityType .GetCustomAttributes(typeof(TableAttribute), false) .Cast<TableAttribute>() .SingleOrDefault(); if (destenationTable == null) { throw new ArgumentNullException("destenationTable"); } _sourceTable = new DataTable(destenationTable.Name); } private void MapColumns() { Type entityType = typeof(T); int columnIndex = 0; foreach (PropertyInfo property in entityType.GetProperties()) { ColumnAttribute column = property .GetCustomAttributes(typeof(ColumnAttribute), false) .Cast<columnattribute>() .SingleOrDefault(); } if (column == null) { return; } if (!column.IsVersion && !column.DbType.Contains("IDENTITY") && !column.IsDbGenerated) { _sourceTable.Columns.Add(column.Name ?? property.Name); } columnIndex++; } }
Начнем с конструктора. Для начала мы должны знать куда нам вставлять данные. Это мы можем узнать у сущности из аттрибута Table, что и происходит в методе MapTableName. Затем нам нужно знать в какие колонки можно вставлять данные. Во вставке могу участвовать лубые колонки кроме IDENTITY, TIMESTAMP и тп. Это можно узнать у свойства сущности, которое помечено аттрибутом Column, что и происходит в методе MapColumns.
SqlBulkCopy начинает свою работу с получения количества колонок, которые участвую во вставке и если указан мапинг, то вызывается метод GetOrdinal, чтоб получить индекс каждой колонки по имени.
Затем SqlBulkCopy идет построчно вызывая метод Read, а потом вызывает GetValue для каждой колонки у текущей записи.
Мое "ноу-хау" - это получение значений колонок рефлексией, когда запрашивается очередная строка, и сохранение этих значений в DataRow. Во-первых это проще, во-вторых читабельней и понятней, а в-третьих - думаете рефлексия в методе Read будет все тормозить ? - Нет! Получилось только быстрей, на удивление (может кто-то скажет почему ?).
Ну и завершающий этап, сама обвертка над SqlBulkCopy в виде Extenstion Method для таблицы-сущности:
public static class TableExtension { public static void BulkInsert<T>(this Table<T> entity, IEnumerable<T> data, int batchSize) where T : class { LinqEntityDataReader<T> reader = new LinqEntityDataReader<T>(data); using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(DBContext.ConnectionString)) { foreach (string columnName in reader.Columns) { sqlBulkCopy.ColumnMappings.Add(columnName, columnName); } sqlBulkCopy.BatchSize = batchSize; sqlBulkCopy.DestinationTableName = reader.DestenationTable; sqlBulkCopy.WriteToServer(new LinqEntityDataReader<T>(data)); sqlBulkCopy.Close(); } } }
Ну тут вроде ничего военного, все должно быть ясно. Ну и в результате:
IEnumerable<Customer> dataToInsert = GenerateCustomers(100000); int batchSize = 5000; context.Customers.BulkInsert(dataToInsert, batchSize);
Ну и как я обещал грубый тест на производительность. Тест производился на вставке 100 000 записей. BatchSize я подобрал оптимальную для данного количества записей. Забыл сказать, что от этого значения зависит много, вы можете как выиграть в производительности, так и не очень выиграть. Для того чтобы подобрать оптимальное значение надо поэксперементировать. Рекомендации мелкософта не помогли, все зависит от ситуации. Для создания фейковых данных использовался метод:
public static IEnumerable<Customer> GenerateCustomers(int count) { for (int i = 0; i < count; i++) { yield return new Customer { CustomerID = i.ToString("00000"), CompanyName = "Company" + i, ContactName = "Frederique Citeaux", ContactTitle = "Marketing Manager", Address = "24, place Kleber", Phone = "(604) 555-4729", Region = "WA" }; } }
Вот что у меня получилось:
- Мое решение ~ 8 секунд
- Решение из пример ~ 11секунд
- Решение из пример + динамический вызов GetValue, который ему посоветовали в коментах ~ 9.5 секунд
- Стандартный InsertAllOnSubmit ~ 1 минута 23 секунды
В любой случае я получил, что хотел. В моем конкретном случае мапинг всех товаров на категорию (10 000 записей) теперь выполняется ~ 1.1 секунды. Думаю этот пример вас когда-то спасет.
Ответственность Table<T> за вставку записей - спорный момент. Ты юзаешь UnitOfWork и именно он должен управлять вставками. Должно это выглядеть примерно так:
ОтветитьУдалитьcontext.Customers.AddRange(customers);
context.EnableBulkInsert = true;
context.SubmitChanges();
да, это сложнее и требует своего контекста, но это в стиле UnitOfWork.
На красивый костыль похоже, а на "..гламурненько в стиле L2S." нет.
пиши исче интересно :)
ОтветитьУдалитьПосмотрите пожалуйста мой вопрос http://csharp.foxhelp.ru/csforum/viewtopic.php?f=6&t=35&p=294#p294
ОтветитьУдалитьчто посоветуете?
У вас проблема со вставкой или с удалением ? Нужно больше деталей, какими объемами данных работате и тп
ОтветитьУдалить