Недавно я столкнулся с проблемой произовдительности вставки данных с помощью L2S. Представьте себе следующую ситуацию: у нас e-commerce сайт, в котором можно создавать продукты, категории и мапить продукты на категорию (ии). Остановимся на функционале, где можно мапить продукты на категорию. У вас есть форма для редактирования категории, на которой в таблице отображаются продукты. Таблица пейжирована. Также есть 2 чекбокса - check\ unckeck all и check\unckeck current page ну и кнопка Save.
Вы отмечаете check all нажимаете Save и
жопа ждете пока сохраниться мап всех товаров, которые у вас есть на категорию. Не важно большой у вас ассортимент или нет, это будет выполнятся долго, т.к. будет дергаться INSERT INTO CategoryToProductMap столько раз, сколько у вас товаров. В моем случаей это было 10 000 раз. Потом я встретил еще чудо-код, который выполнял подобные действия 130 000 раз ! Вы представляете это !? А если хотя бы 2 человека попадут на эту страницу...
В данной статье мы поговорим о том как можно оптимзировать процесс вставки большого количества данных, большого значит > 1 000. Хотя вы сможете применять это по своим нуждам.
В MS SQL 2000 появилась команда
BULK INSERT, которая возволяет загрузить данные из файла в SQL Server "пачкой". Внутри эта команда использует утилиту
BCP, копирует данные из файла в указанную таблицу. В ADO.NET 2.0 появился класс
SqlBulkCopy, который является обверткой над командой BULK INSERT. В его использовании нет ничего сложного. Вы указываете строку соединения, таблицу в которую хотите вставить данные и сами данные.
Это нам подходит! Надо бы сделать это гламурненько в стиле L2S.
Погуглив, я нашел пример как это делается в стиле L2S, но как по мне он сложноват как в исполнении так и в чтении. Если хотите, можете взять его, он рабочий, я проверил. Но! У меня получилось быстрей :). Почему - не знаю. В конце статьи я приведу примеры по скорости выполнения.
Для реализации своего примера я почерпнул пару штрихов из источника, поэтому желательно если вы с ним ознакомитесь, хотя и необязательно. Примеры я буду приводить на всем известной Northwind базе данных. В результате у нас получится слудующее:
IEnumerable<Customer> dataToInsert = GenerateCustomers(100000);
int batchSize = 5000;
context.Customers.BulkInsert(dataToInsert, batchSize);
Для того, чтобы сделать вставку, необходимо вызвать метод WriteToServer у класса
SqlBulkCopy. У него есть несколько перегрузок:
мы будем использовать метод, который принимает IDataReader. У интерфейса IDataReader очень много методов, но нам необходимо только 4. Создадим класс реализующий этот интерфейс. Методы, которые нам нужны пометим как
abstract, остальные как
virtual, они нам не нужны и я их не буду показывать в коде.
public abstract class SqlBulkCopyReader : IDataReader
{
public abstract bool Read();
public abstract object GetValue(int i);
public abstract int GetOrdinal(string name);
public abstract int FieldCount { get; }
#region // Not required ...
....
#endregion
}
Отнаследуем класс от SqlBulkCopyReader:
public class LinqEntityDataReader<T> : SqlBulkCopyReader where T : class
{
private DataTable _sourceTable;
private readonly IEnumeratort<T> _enumerator;
private DataRow _current;
public LinqEntityDataReader(IEnumerable<T> source)
{
MapTableName();
MapColumns();
_enumerator = source.GetEnumerator();
}
public string DestenationTable
{
get { return _sourceTable.TableName; }
}
public IEnumerable<string> Columns
{
get
{
foreach (DataColumn column in _sourceTable.Columns)
{
yield return column.ColumnName;
}
}
}
public override object GetValue(int columnIndex)
{
return _current[columnIndex];
}
public override int GetOrdinal(string name)
{
return _sourceTable.Columns[name].Ordinal;
}
public override bool Read()
{
bool next = _enumerator.MoveNext();
if (next)
{
_current = _sourceTable.NewRow();
foreach (DataColumn column in _sourceTable.Columns)
{
_current[column] = typeof(T).GetProperty(column.ColumnName)
.GetValue(_enumerator.Current, null);
}
}
return next;
}
public override int FieldCount
{
get { return _sourceTable.Columns.Count; }
}
private void MapTableName()
{
Type entityType = typeof(T);
TableAttribute destenationTable = entityType
.GetCustomAttributes(typeof(TableAttribute), false)
.Cast<TableAttribute>()
.SingleOrDefault();
if (destenationTable == null)
{
throw new ArgumentNullException("destenationTable");
}
_sourceTable = new DataTable(destenationTable.Name);
}
private void MapColumns()
{
Type entityType = typeof(T);
int columnIndex = 0;
foreach (PropertyInfo property in entityType.GetProperties())
{
ColumnAttribute column = property
.GetCustomAttributes(typeof(ColumnAttribute), false)
.Cast<columnattribute>()
.SingleOrDefault();
}
if (column == null)
{
return;
}
if (!column.IsVersion && !column.DbType.Contains("IDENTITY") && !column.IsDbGenerated)
{
_sourceTable.Columns.Add(column.Name ?? property.Name);
}
columnIndex++;
}
}
Начнем с конструктора. Для начала мы должны знать куда нам вставлять данные. Это мы можем узнать у сущности из аттрибута Table, что и происходит в методе MapTableName. Затем нам нужно знать в какие колонки можно вставлять данные. Во вставке могу участвовать лубые колонки кроме IDENTITY, TIMESTAMP и тп. Это можно узнать у свойства сущности, которое помечено аттрибутом Column, что и происходит в методе MapColumns.
SqlBulkCopy начинает свою работу с получения количества колонок, которые участвую во вставке и если указан мапинг, то вызывается метод GetOrdinal, чтоб получить индекс каждой колонки по имени.
Затем SqlBulkCopy идет построчно вызывая метод Read, а потом вызывает GetValue для каждой колонки у текущей записи.
Мое "ноу-хау" - это получение значений колонок рефлексией, когда запрашивается очередная строка, и сохранение этих значений в DataRow. Во-первых это проще, во-вторых читабельней и понятней, а в-третьих - думаете рефлексия в методе Read будет все тормозить ? - Нет! Получилось только быстрей, на удивление (может кто-то скажет почему ?).
Ну и завершающий этап, сама обвертка над SqlBulkCopy в виде Extenstion Method для таблицы-сущности:
public static class TableExtension
{
public static void BulkInsert<T>(this Table<T> entity, IEnumerable<T> data, int batchSize)
where T : class
{
LinqEntityDataReader<T> reader = new LinqEntityDataReader<T>(data);
using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(DBContext.ConnectionString))
{
foreach (string columnName in reader.Columns)
{
sqlBulkCopy.ColumnMappings.Add(columnName, columnName);
}
sqlBulkCopy.BatchSize = batchSize;
sqlBulkCopy.DestinationTableName = reader.DestenationTable;
sqlBulkCopy.WriteToServer(new LinqEntityDataReader<T>(data));
sqlBulkCopy.Close();
}
}
}
Ну тут вроде ничего военного, все должно быть ясно. Ну и в результате:
IEnumerable<Customer> dataToInsert = GenerateCustomers(100000);
int batchSize = 5000;
context.Customers.BulkInsert(dataToInsert, batchSize);
Ну и как я обещал грубый тест на производительность. Тест производился на вставке 100 000 записей. BatchSize я подобрал оптимальную для данного количества записей. Забыл сказать, что от этого значения зависит много, вы можете как выиграть в производительности, так и не очень выиграть. Для того чтобы подобрать оптимальное значение надо поэксперементировать. Рекомендации мелкософта не помогли, все зависит от ситуации. Для создания фейковых данных использовался метод:
public static IEnumerable<Customer> GenerateCustomers(int count)
{
for (int i = 0; i < count; i++)
{
yield return new Customer
{
CustomerID = i.ToString("00000"),
CompanyName = "Company" + i,
ContactName = "Frederique Citeaux",
ContactTitle = "Marketing Manager",
Address = "24, place Kleber",
Phone = "(604) 555-4729",
Region = "WA"
};
}
}
Вот что у меня получилось:
- Мое решение ~ 8 секунд
- Решение из пример ~ 11секунд
- Решение из пример + динамический вызов GetValue, который ему посоветовали в коментах ~ 9.5 секунд
- Стандартный InsertAllOnSubmit ~ 1 минута 23 секунды
Я не думаю, что это совпадение, все-таки 100 000 данных - это немало. Почему так, я еще не узнал.
В любой случае я получил, что хотел. В моем конкретном случае мапинг всех товаров на категорию (10 000 записей) теперь выполняется ~ 1.1 секунды. Думаю этот пример вас когда-то спасет.