воскресенье, 8 ноября 2009 г.

LINQ Bulk Insert

Недавно я столкнулся с проблемой произовдительности вставки данных с помощью L2S. Представьте себе следующую ситуацию: у нас e-commerce сайт, в котором можно создавать продукты, категории и мапить продукты на категорию (ии). Остановимся на функционале, где можно мапить продукты на категорию. У вас есть форма для редактирования категории, на которой в таблице отображаются продукты. Таблица пейжирована. Также есть 2 чекбокса - check\ unckeck all и check\unckeck current page ну и кнопка Save.




Вы отмечаете check all нажимаете Save и жопа ждете пока сохраниться мап всех товаров, которые у вас есть на категорию. Не важно большой у вас ассортимент или нет, это будет выполнятся долго, т.к. будет дергаться INSERT INTO CategoryToProductMap столько раз, сколько у вас товаров. В моем случаей это было 10 000 раз. Потом я встретил еще чудо-код, который выполнял подобные действия 130 000 раз ! Вы представляете это !? А если хотя бы 2 человека попадут на эту страницу...

В данной статье мы поговорим о том как можно оптимзировать процесс вставки большого количества данных, большого значит > 1 000. Хотя вы сможете применять это по своим нуждам.

В MS SQL 2000 появилась команда BULK INSERT, которая возволяет загрузить данные из файла в SQL Server "пачкой". Внутри эта команда использует утилиту BCP, копирует данные из файла в указанную таблицу. В ADO.NET 2.0 появился класс SqlBulkCopy, который является обверткой над командой BULK INSERT. В его использовании нет ничего сложного. Вы указываете строку соединения, таблицу в которую хотите вставить данные и сами данные.
Это нам подходит! Надо бы сделать это гламурненько в стиле L2S.
Погуглив, я нашел пример как это делается в стиле L2S, но как по мне он сложноват как в исполнении так и в чтении. Если хотите, можете взять его, он рабочий, я проверил. Но! У меня получилось быстрей :). Почему - не знаю. В конце статьи я приведу примеры по скорости выполнения.
Для реализации своего примера я почерпнул пару штрихов из источника, поэтому желательно если вы с ним ознакомитесь, хотя и необязательно. Примеры я буду приводить на всем известной Northwind базе данных. В результате у нас получится слудующее:

IEnumerable<Customer> dataToInsert = GenerateCustomers(100000);
int batchSize = 5000;
context.Customers.BulkInsert(dataToInsert, batchSize);

Для того, чтобы сделать вставку, необходимо вызвать метод WriteToServer у класса SqlBulkCopy. У него есть несколько перегрузок:


мы будем использовать метод, который принимает IDataReader. У интерфейса IDataReader очень много методов, но нам необходимо только 4. Создадим класс реализующий этот интерфейс. Методы, которые нам нужны пометим как  abstract, остальные как virtual, они нам не нужны и я их не буду показывать в коде.

public abstract class SqlBulkCopyReader : IDataReader
{
     public abstract bool Read();
     public abstract object GetValue(int i);
     public abstract int GetOrdinal(string name);
     public abstract int FieldCount { get; }

     #region // Not required ...
     ....
     #endregion 
}

Отнаследуем класс от SqlBulkCopyReader:

public class LinqEntityDataReader<T> : SqlBulkCopyReader where T : class
{
     private DataTable _sourceTable;
     private readonly IEnumeratort<T> _enumerator;
     private DataRow _current;

     public LinqEntityDataReader(IEnumerable<T> source)
     {
           MapTableName();
           MapColumns();
           _enumerator = source.GetEnumerator();
     }

     public string DestenationTable 
     {
          get { return _sourceTable.TableName; }
     } 

     public IEnumerable<string> Columns
     {
          get
          {
               foreach (DataColumn column in _sourceTable.Columns)
               {
                    yield return column.ColumnName;
               }
           }
      }

      public override object GetValue(int columnIndex)
      {
           return _current[columnIndex];
      }

      public override int GetOrdinal(string name)
      {
           return _sourceTable.Columns[name].Ordinal;
      }

      public override bool Read()
      {
           bool next = _enumerator.MoveNext();

           if (next)
           {
                _current = _sourceTable.NewRow();

                foreach (DataColumn column in _sourceTable.Columns)
                {
                     _current[column] = typeof(T).GetProperty(column.ColumnName)
                           .GetValue(_enumerator.Current, null);
                }
           }

           return next;
     }

     public override int FieldCount
     {
          get { return _sourceTable.Columns.Count; }
     } 

     private void MapTableName()
     {
          Type entityType = typeof(T);

          TableAttribute destenationTable = entityType
              .GetCustomAttributes(typeof(TableAttribute), false)
              .Cast<TableAttribute>()
              .SingleOrDefault();

          if (destenationTable == null)
          {
               throw new ArgumentNullException("destenationTable");
          }

          _sourceTable = new DataTable(destenationTable.Name);
     }

     private void MapColumns()
     {
          Type entityType = typeof(T);
          int columnIndex = 0;

          foreach (PropertyInfo property in entityType.GetProperties())
          {
               ColumnAttribute column = property
                   .GetCustomAttributes(typeof(ColumnAttribute), false)
                   .Cast<columnattribute>()
                   .SingleOrDefault();
          }

          if (column == null)
          {
               return;
          }

          if (!column.IsVersion && !column.DbType.Contains("IDENTITY") && !column.IsDbGenerated)
          {
               _sourceTable.Columns.Add(column.Name ?? property.Name);
          }

          columnIndex++;
     }
}

Начнем с конструктора. Для начала мы должны знать куда нам вставлять данные. Это мы можем узнать у сущности из аттрибута Table, что и происходит в методе MapTableName. Затем нам нужно знать в какие колонки можно вставлять данные. Во вставке могу участвовать лубые колонки кроме IDENTITY, TIMESTAMP и тп. Это можно узнать у свойства сущности, которое помечено аттрибутом Column, что и происходит в методе MapColumns.
SqlBulkCopy начинает свою работу с получения количества колонок, которые участвую во вставке и если указан мапинг, то вызывается метод GetOrdinal, чтоб получить индекс каждой колонки по имени.
Затем SqlBulkCopy идет построчно вызывая метод Read, а потом вызывает GetValue для каждой колонки у текущей записи.
Мое "ноу-хау" - это получение значений колонок рефлексией, когда запрашивается очередная строка, и сохранение этих значений в DataRow. Во-первых это проще, во-вторых читабельней и понятней, а в-третьих - думаете рефлексия в методе Read будет все тормозить ? - Нет! Получилось только быстрей, на удивление (может кто-то скажет почему ?).

Ну и завершающий этап, сама обвертка над SqlBulkCopy в виде Extenstion Method для таблицы-сущности:

public static class TableExtension
{
 public static void BulkInsert<T>(this Table<T> entity, IEnumerable<T> data, int batchSize) 
  where T : class
 {
  LinqEntityDataReader<T> reader = new LinqEntityDataReader<T>(data);

  using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(DBContext.ConnectionString))
  {
   foreach (string columnName in reader.Columns)
   {
    sqlBulkCopy.ColumnMappings.Add(columnName, columnName);
   }

   sqlBulkCopy.BatchSize = batchSize;
   sqlBulkCopy.DestinationTableName = reader.DestenationTable;
   sqlBulkCopy.WriteToServer(new LinqEntityDataReader<T>(data));
   sqlBulkCopy.Close();
  }
 }
}

Ну тут вроде ничего военного, все должно быть ясно. Ну и в результате:

IEnumerable<Customer> dataToInsert = GenerateCustomers(100000);
int batchSize = 5000;
context.Customers.BulkInsert(dataToInsert, batchSize);

Ну и как я обещал грубый тест на производительность. Тест производился на вставке 100 000 записей. BatchSize я подобрал оптимальную для данного количества записей. Забыл сказать, что от этого значения зависит много, вы можете как выиграть в производительности, так и не очень выиграть. Для того чтобы подобрать оптимальное значение надо поэксперементировать. Рекомендации мелкософта не помогли, все зависит от ситуации. Для создания фейковых данных использовался метод:

public static IEnumerable<Customer> GenerateCustomers(int count)
{
 for (int i = 0; i < count; i++)
 {
  yield return new Customer
              {
           CustomerID = i.ToString("00000"), 
   CompanyName = "Company" + i, 
   ContactName = "Frederique Citeaux",
                        ContactTitle = "Marketing Manager",
   Address = "24, place Kleber",
   Phone = "(604) 555-4729",
   Region = "WA"
         };
 }
}

Вот что у меня получилось:

  • Мое решение ~ 8 секунд
  • Решение из пример ~ 11секунд
  • Решение из пример + динамический вызов GetValue, который ему посоветовали в коментах ~ 9.5 секунд
  • Стандартный InsertAllOnSubmit ~ 1 минута 23 секунды
Я не думаю, что это совпадение, все-таки 100 000 данных - это немало. Почему так, я еще не узнал.
В любой случае я получил, что хотел. В моем конкретном случае мапинг всех товаров на категорию (10 000 записей) теперь выполняется ~ 1.1 секунды. Думаю этот пример вас когда-то спасет.

4 комментария:

  1. Ответственность Table<T> за вставку записей - спорный момент. Ты юзаешь UnitOfWork и именно он должен управлять вставками. Должно это выглядеть примерно так:

    context.Customers.AddRange(customers);
    context.EnableBulkInsert = true;
    context.SubmitChanges();

    да, это сложнее и требует своего контекста, но это в стиле UnitOfWork.

    На красивый костыль похоже, а на "..гламурненько в стиле L2S." нет.

    ОтветитьУдалить
  2. Посмотрите пожалуйста мой вопрос http://csharp.foxhelp.ru/csforum/viewtopic.php?f=6&t=35&p=294#p294
    что посоветуете?

    ОтветитьУдалить
  3. У вас проблема со вставкой или с удалением ? Нужно больше деталей, какими объемами данных работате и тп

    ОтветитьУдалить