Este mes me enfrenté a la carga de grandes volúmenes de datos desde C# hacia SQL Server. Millones de registros de un solo envión y en una sola transacción.
Primero intenté usar Bulk Insert, pero resultó muy lento para el requerimiento. Luego intenté con Data Tables y SQL Bulk Copy, pero se reventó por memoria. Encontré la solución en una herramienta que nunca había utilizado: yield.
Esta es la versión simplificada de un método que lee archivos por bloques de 1000 registros y genera Data Tables "bajo demanda" usando yield:
protected IEnumerable<DataTable> BuildDataTableByBatches(StreamReader reader, DataColumn[] columns)
{
var dt = new DataTable();
foreach (var col in columns)
{
dt.Columns.Add(col.ColumnName, col.DataType);
}
int n = 0;
string line;
while ((line = reader.ReadLine()) != null)
{
n++;
var values = line.Split("|");
if (values.Length > 0)
{
var row = dt.NewRow();
for (int i = 0; i < values.Length; i++)
{
row.SetField(i, values[i]);
}
dt.Rows.Add(row);
}
if (n % 1000 == 0)
{
var batch = dt;
dt = new DataTable();
foreach (var col in columns)
{
dt.Columns.Add(col.ColumnName, col.DataType);
}
yield return batch;
}
}
yield return dt;
}
En este caso, se leen varios archivos que se llaman igual a la tabla destino y se insertan todos los datos dentro de una gran transacción. Cada archivo se procesa más o menos así:
using var stream = await _storageManager.GetFileStream(containerName, f.Name);
using var reader = new StreamReader(stream);
reader.ReadLine();
tableName = f.Name.Split(".").First();
var columns = await GetTableColumns(tableName, schema);
var dtColumns = columns.Select(c => GetDataColumnDefinition(c)).ToArray();
foreach (var dataTable in BuildDataTableByBatches(reader, dtColumns))
{
await ExecuteBulkInsert(tableName, schema, dataTable, batchSize, conn, tran);
dataTable.Dispose();
}
Y finalmente se utiliza SqlBulkCopy:
protected async Task ExecuteBulkInsert(
string tableName, string schema, DataTable data, int batchSize,
SqlConnection connection, SqlTransaction transaction)
{
using var bulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.Default, transaction)
{
BatchSize = batchSize,
DestinationTableName = $"{schema}.{tableName}"
};
await bulkCopy.WriteToServerAsync(data);
}
Hay métodos adicionales que vale el esfuerzo revisar. Este obtiene la definición de la tabla y permite generar dinámicamente las Data Tables:
protected async Task<IEnumerable<TableColumnDescription>> GetTableColumns(string tableName, string schema)
{
try
{
var columns = new List<TableColumnDescription>();
var query = @"
SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = @tableName
AND TABLE_SCHEMA = @tableSchema";
using SqlConnection conn = new SqlConnection(_dbOptions.DATABASE_CONNECTION);
conn.Open();
using SqlCommand cmd = new SqlCommand(query, conn);
cmd.Parameters.AddWithValue("@tableName", tableName);
cmd.Parameters.AddWithValue("@tableSchema", schema);
using SqlDataReader reader = await cmd.ExecuteReaderAsync();
if (reader.HasRows)
{
while (reader.Read())
{
var col = new TableColumnDescription(
reader.GetString(0),
reader.GetString(1),
reader.GetString(2) == "YES");
columns.Add(col);
}
}
return columns;
}
catch
{
return null;
}
}
Y este genera las Data Columns. Resalto que sólo el tipo de dato uniqueidentifier se debe establecer explícitamente; si la columna destino es de tipo numérico, SQL Server intenta hacer el cast.
protected DataColumn GetDataColumnDefinition(TableColumnDescription col)
{
// uniqueidentifier requires specific column type definition
// other types are not handled because the file only has strings
var column = col.ColumnType == "uniqueidentifier" ?
new DataColumn(col.ColumnName, typeof(Guid)) :
new DataColumn(col.ColumnName);
column.AllowDBNull = col.IsNullable;
return column;
}
Corrección. El último método tiene un error:
protected DataColumn GetDataColumnDefinition(TableColumnDescription col)
{
var columnType = col.ColumnType switch
{
"uniqueidentifier" => typeof(Guid),
"decimal" => typeof(decimal),
_ => null
};
var column = columnType != null ?
new DataColumn(col.ColumnName, columnType) :
new DataColumn(col.ColumnName);
column.AllowDBNull = col.IsNullable;
return column;
}
Si no asignas el tipo de dato decimal en la DataColumn, el SqlBulkCopy ignora el punto decimal. Por ejemplo, si tienes el número 34902.45, el valor insertado será 3490245.0000.
Comentarios
Publicar un comentario