Высокопроизводительная вставка с помощью SqlBulkCopy с IDataReader

Kate

Administrator
Команда форума
SqlBulkCopy можно использовать в трех вариантах: вставка данных, представленных в виде DataTable, массива DataRowили IDataReaderэкземпляра. В этой статье я продемонстрирую две реализации IDataReaderинтерфейса, которые используются в сочетании с SqlBulkCopyвысокопроизводительной вставкой в базу данных. Два других варианта похожи друг на друга и могут использоваться для относительно небольших объемов данных, поскольку они требуют, чтобы все записи были предварительно загружены в память перед их передачей SqlBulkCopy. Напротив, этот IDataReaderподход более гибкий и позволяет работать с неограниченным количеством записей в «ленивом» режиме, что означает, что данные могут передаваться SqlBulkCopyна лету так быстро, как сервер может их использовать. Это аналогично подходу IList<T>vs.IEnumerable<T>.

Использование Demo​

Прилагаемый демонстрационный проект состоит из предварительно скомпилированного консольного приложения с файлом конфигурации и Dataподпапкой, содержащей CSVфайл образца . Перед запуском демонстрации обязательно настройте файл конфигурации, указав правильное соединение с stringименем " DefaultDb". Другой параметр " MaxRecordCount" равен 100,000по умолчанию, что должно быть подходящим для этой демонстрации. Обратите внимание, что соединение stringможет указывать на любую существующую базу данных. Все демонстрационные таблицы будут созданы автоматически, поэтому нет необходимости настраивать базу данных вручную.

После запуска демонстрации она появится в окне консоли с просьбой нажать Enterперед инициализацией базы данных и перед выполнением каждого демонстрационного действия. Завершенная демонстрация на моей машине выглядит следующим образом:

b61d6dd612bfaebad33e2862765438be.jpg

В качестве первого шага приложение попытается инициализировать базу данных. Он создаст (или воссоздаст) три таблицы - по одной для каждого демонстрационного действия:

  1. Contacts таблица с колонкамиId, FirstName, LastNameи BirthDate.
  2. DynamicData таблица с колонками Id, 10 integer, 10 string, 10 datetime, and 10 guid.
  3. CsvData таблица, имеющая ту же структуру, что и DynamicData.
Затем приложение выполнит три демонстрационных действия, измеряющих время для каждого действия:

  1. Static Dataset Demoдемонстрирует, ObjectDataReader<T>который позволяет обрабатывать экземпляры любого POCOкласса ( Contactв данном случае класса).
  2. Dynamic Dataset Demoдемонстрирует, DynamicDataReader<T>который также реализует IDataReader, но позволяет пользователю решать, как извлекать данные из базового объекта с Tпомощью определяемого пользователем лямбда-выражения. В этой демонстрации я использую IDictionary<string, object>для представления данных.
  3. CSV Import Demoиспользует CsvParserкласс и вышеупомянутый DynamicDataReader<T>для эффективной загрузки прикрепленного файла " Data \ CsvData.csv " в базу данных.
Данные для первых двух демонстраций генерируются случайным образом на лету с использованием вспомогательного класса RandomDataGenerator. Другой вспомогательный класс TableSchemaProviderиспользуется для извлечения некоторых метаданных из SQL Server и выполнения некоторых служебных команд SQL.

ObjectDataReader​

Как показано ниже, ObjectDataReader<T>принимает IEnumerable<T>в своем конструкторе, который представляет поток фактических данных, которые будут использоваться SqlBulkCopyклассом. Важно отметить , что GetOrdinal()и GetValue()методы не используют Reflection каждый раз , когда они нуждаются в свойствах доступа T. Вместо этого они используют предварительно скомпилированные и кэшированные лямбда-выражения, которые играют роль средств доступа к свойствам и поиска. Эти предварительно скомпилированные лямбда-выражения во много раз быстрее, чем при использовании отражения.

public sealed class ObjectDataReader<TData> : IDataReader
{
private class PropertyAccessor
{
public List<Func<TData, object>> Accessors { get; set; }
public Dictionary<string, int> Lookup { get; set; }
}

private static readonly Lazy<PropertyAccessor> s_propertyAccessorCache =
new Lazy<PropertyAccessor>(() =>
{
var propertyAccessors = typeof(TData)
.GetProperties(BindingFlags.Instance | BindingFlags.Public)
.Where(p => p.CanRead)
.Select((p, i) => new
{
Index = i,
Property = p,
Accessor = CreatePropertyAccessor(p)
})
.ToArray();

return new PropertyAccessor
{
Accessors = propertyAccessors.Select(p => p.Accessor).ToList(),
Lookup = propertyAccessors.ToDictionary(
p => p.Property.Name, p => p.Index, StringComparer.OrdinalIgnoreCase)
};
});

private static Func<TData, object> CreatePropertyAccessor(PropertyInfo p)
{
var parameter = Expression.Parameter(typeof(TData), "input");
var propertyAccess = Expression.Property(parameter, p.GetGetMethod());
var castAsObject = Expression.TypeAs(propertyAccess, typeof(object));
var lamda = Expression.Lambda<Func<TData, object>>(castAsObject, parameter);
return lamda.Compile();
}

private IEnumerator<TData> m_dataEnumerator;

public ObjectDataReader(IEnumerable<TData> data)
{
m_dataEnumerator = data.GetEnumerator();
}

#region IDataReader Members

public void Close()
{
Dispose();
}

public int Depth => 1;

public DataTable GetSchemaTable()
{
return null;
}

public bool IsClosed => m_dataEnumerator == null;

public bool NextResult()
{
return false;
}

public bool Read()
{
if (IsClosed)
throw new ObjectDisposedException(GetType().Name);
return m_dataEnumerator.MoveNext();
}

public int RecordsAffected => -1;

#endregion

// IDisposable Members

#region IDataRecord Members

public int GetOrdinal(string name)
{
int ordinal;
if (!s_propertyAccessorCache.Value.Lookup.TryGetValue(name, out ordinal))
throw new InvalidOperationException("Unknown parameter name: " + name);
return ordinal;
}

public object GetValue(int i)
{
if (m_dataEnumerator == null)
throw new ObjectDisposedException(GetType().Name);
return s_propertyAccessorCache.Value.Accessors(m_dataEnumerator.Current);
}

public int FieldCount => s_propertyAccessorCache.Value.Accessors.Count;

// Not Implemented IDataRecord Members ...

#endregion
}
После того, как мы ObjectDataReader<T>реализовали, мы можем подключить его SqlBulkCopyследующим образом:

private static async Task RunStaticDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "Contacts";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

bulkCopy.ColumnMappings.Add("Id", "Id");
bulkCopy.ColumnMappings.Add("FirstName", "FirstName");
bulkCopy.ColumnMappings.Add("LastName", "LastName");
bulkCopy.ColumnMappings.Add("BirthDate", "BirthDate");

using (var reader = new ObjectDataReader<Contact>(new RandomDataGenerator().GetContacts(count)))
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}

DynamicDataReader​

Вы можете использовать, DynamicDataReader<T>если нет статически определенного класса, представляющего данные. Лучший пример, который иллюстрирует цель, DynamicDataReader<T>- это когда каждая запись вашей таблицы представлена в виде, Dictionary<string, object>где ключи являются именами столбцов. Таким образом, если в словаре нет значения для данного столбца, Nullбудет принято значение. И наоборот, все элементы в словаре, которые не связаны ни с одним столбцом в таблице, будут проигнорированы.

public sealed class DynamicDataReader<T> : IDataReader
{
private readonly IList<SchemaFieldDef> m_schema;
private readonly IDictionary<string, int> m_schemaMapping;
private readonly Func<T, string, object> m_selector;
private IEnumerator<T> m_dataEnumerator;

public DynamicDataReader(IList<SchemaFieldDef> schema, IEnumerable<T> data,
Func<T, string, object> selector)
{
m_schema = schema;
m_schemaMapping = m_schema
.Select((x, i) => new { x.FieldName, Index = i })
.ToDictionary(x => x.FieldName, x => x.Index);
m_selector = selector;
m_dataEnumerator = data.GetEnumerator();
}

#region IDataReader Members

public void Close()
{
Dispose();
}

public int Depth => 1;

public DataTable GetSchemaTable()
{
return null;
}

public bool IsClosed => m_dataEnumerator == null;

public bool NextResult()
{
return false;
}

public bool Read()
{
if (IsClosed)
throw new ObjectDisposedException(GetType().Name);
return m_dataEnumerator.MoveNext();
}

public int RecordsAffected => -1;

#endregion

// IDisposable Members

#region IDataRecord Members

public int FieldCount => m_schema.Count;

public int GetOrdinal(string name)
{
int ordinal;
if (!m_schemaMapping.TryGetValue(name, out ordinal))
throw new InvalidOperationException("Unknown parameter name: " + name);
return ordinal;
}

public object GetValue(int i)
{
if (m_dataEnumerator == null)
throw new ObjectDisposedException(GetType().Name);

var value = m_selector(m_dataEnumerator.Current, m_schema.FieldName);

if (value == null)
return DBNull.Value;

var strValue = value as string;
if (strValue != null)
{
if (strValue.Length > m_schema.Size && m_schema.Size > 0)
strValue = strValue.Substring(0, m_schema.Size);
if (m_schema.DataType == DbType.String)
return strValue;
return SchemaFieldDef.StringToTypedValue(strValue, m_schema.DataType) ?? DBNull.Value;
}

return value;
}

// Not Implemented IDataRecord Members

#endregion
}
DynamicDataReader<T>передает SchemaFieldDefкласс, который описывает имя поля, размер и тип данных БД столбца таблицы. Только те столбцы, которые были переданы через constructor ( IList<SchemaFieldDef> schema), будут участвовать во вставке данных. Два других параметра конструктора представляют сами данные ( IEnumerable<T> data) и определенное пользователем лямбда-выражение ( Func<T, string, object> selector) для доступа к свойствам. Как видите, selectorпринимает экземпляр Tи stringимя поля и возвращает objectобратно, представляющее значение, связанное с этим именем поля. Обратите внимание , что objectтип данных «S может быть либо не- stringтипа С # ( int, decimal, DateTime, Guidи т.д.) , что соответствует фактическому типу в базе данных ( int, numeric, datetime, uniqueidentifierи т.д.), или простоstring. В последнем случае DynamicDataReaderбудет пытаться автоматически преобразовать stringзначение в соответствующий тип данных с помощью SchemaFieldDef.StringToTypedValue()метода. Этот метод поддерживает только несколько типов данных, но при необходимости его можно легко расширить.

Вот пример использования DynamicDataReader<T>вместе с SqlBulkCopy:

private static async Task RunDynamicDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
var fields = await new TableSchemaProvider(connection, "DynamicData").GetFieldsAsync();

using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "DynamicData";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

foreach (var field in fields)
bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);

var data = new RandomDataGenerator().GetDynamicData(count);

using (var reader = new DynamicDataReader<IDictionary<string, object>>
(fields, data, (x, k) => x.GetValueOrDefault(k)))
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}
Это очень похоже на ObjectDataReaderиспользование, за исключением того, что поля не связаны статически.

Импорт файла CSV​

Наконец, третье демонстрационное действие включает CsvParser, DynamicDataReaderи SqlBulkCopyклассы, работающие вместе для достижения высокопроизводительного и масштабируемого импорта данных в CSVформате:

private static async Task RunCsvDatasetDemoAsync(SqlConnection connection, int count,
CancellationToken cancellationToken)
{
using (var csvReader = new StreamReader(@"Data\CsvData.csv"))
{
var csvData = CsvParser.ParseHeadAndTail(csvReader, ',', '"');

var csvHeader = csvData.Item1
.Select((x, i) => new {Index = i, Field = x})
.ToDictionary(x => x.Field, x => x.Index);

var csvLines = csvData.Item2;

var fields = await new TableSchemaProvider(connection, "CsvData").GetFieldsAsync();

using (var bulkCopy = new SqlBulkCopy(connection))
{
bulkCopy.DestinationTableName = "CsvData";
bulkCopy.BatchSize = 1000;
bulkCopy.BulkCopyTimeout = (int) TimeSpan.FromMinutes(10).TotalSeconds;

foreach (var field in fields)
bulkCopy.ColumnMappings.Add(field.FieldName, field.FieldName);

using (var reader = new DynamicDataReader<IList<string>>(fields, csvLines.Take(count),
(x, k) => x.GetValueOrDefault(csvHeader.GetValueOrDefault(k, -1))))
{
await bulkCopy.WriteToServerAsync(reader, cancellationToken);
}
}
}
}
В демонстрационных целях только1,000строк есть в файле CsvData.csv.Однако это конкретное решение сможет обрабатывать любое количество строк с относительно стабильной производительностью. Оно будет сопоставлять имена столбцов в CSVфайле с именами столбцов целевой таблицы. Отсутствующие данные будут заполнены буквой Null. Любые дополнительные столбцы, отсутствующие в целевой таблице, будут проигнорированы.

Резюме​

В этой статье я продемонстрировал один из возможных способов обработки высокопроизводительных вставок в базу данных с использованием управляемого кода. Моей целью было создать гибкий и простой в использовании API, чтобы его можно было применять во многих различных сценариях. В частности, используйте ObjectDataReader<T>для загрузки данных, представленных в виде статически определенных POCOклассов, и DynamicDataReader<T>для загрузки данных любой структуры.

 
Сверху