Carga de muchos datos hacia SQL Server, desde C#

Este mes me enfrenté a la carga de grandes volúmenes de datos desde C# hacia SQL Server. Millones de registros de un solo envión y en una sola transacción.

Primero intenté usar Bulk Insert, pero resultó muy lento para el requerimiento. Luego intenté con Data Tables y SQL Bulk Copy, pero se reventó por memoria. Encontré la solución en una herramienta que nunca había utilizado: yield.

Esta es la versión simplificada de un método que lee archivos por bloques de 1000 registros y genera Data Tables "bajo demanda" usando yield:

protected IEnumerable<DataTableBuildDataTableByBatches(StreamReader readerDataColumn[] columns)
{
    var dt = new DataTable();
    foreach (var col in columns)
    {
        dt.Columns.Add(col.ColumnNamecol.DataType);
    }

    int n = 0;
    string line;
    while ((line = reader.ReadLine()) != null)
    {
        n++;
        var values = line.Split("|");

        if (values.Length > 0)
        {
            var row = dt.NewRow();

            for (int i = 0i < values.Lengthi++)
            {
                row.SetField(ivalues[i]);
            }

            dt.Rows.Add(row);
        }

        if (n % 1000 == 0)
        {
            var batch = dt;

            dt = new DataTable();
            foreach (var col in columns)
            {
                dt.Columns.Add(col.ColumnNamecol.DataType);
            }

            yield return batch;
        }
    }

    yield return dt;
}

En este caso, se leen varios archivos que se llaman igual a la tabla destino y se insertan todos los datos dentro de una gran transacción. Cada archivo se procesa más o menos así:

using var stream = await _storageManager.GetFileStream(containerNamef.Name);
using var reader = new StreamReader(stream);
reader.ReadLine();

tableName = f.Name.Split(".").First();
var columns = await GetTableColumns(tableNameschema);
var dtColumns = columns.Select(c => GetDataColumnDefinition(c)).ToArray();

foreach (var dataTable in BuildDataTableByBatches(readerdtColumns))
{
    await ExecuteBulkInsert(tableNameschemadataTablebatchSizeconntran);
    dataTable.Dispose();
}

Y finalmente se utiliza SqlBulkCopy:

protected async Task ExecuteBulkInsert(
    string tableNamestring schemaDataTable dataint batchSize,
    SqlConnection connectionSqlTransaction transaction)
{
    using var bulkCopy = new SqlBulkCopy(connectionSqlBulkCopyOptions.Defaulttransaction)
    {
        BatchSize = batchSize,
        DestinationTableName = $"{schema}.{tableName}"
    };

    await bulkCopy.WriteToServerAsync(data);
}

Hay métodos adicionales que vale el esfuerzo revisar. Este obtiene la definición de la tabla y permite generar dinámicamente las Data Tables:

protected async Task<IEnumerable<TableColumnDescription>> GetTableColumns(string tableNamestring schema)
{
    try
    {
        var columns = new List<TableColumnDescription>();

        var query = @"
            SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE
            FROM INFORMATION_SCHEMA.COLUMNS 
            WHERE TABLE_NAME = @tableName
            AND TABLE_SCHEMA = @tableSchema";

        using SqlConnection conn = new SqlConnection(_dbOptions.DATABASE_CONNECTION);
        conn.Open();

        using SqlCommand cmd = new SqlCommand(queryconn);
        cmd.Parameters.AddWithValue("@tableName"tableName);
        cmd.Parameters.AddWithValue("@tableSchema"schema);

        using SqlDataReader reader = await cmd.ExecuteReaderAsync();

        if (reader.HasRows)
        {
            while (reader.Read())
            {
                var col = new TableColumnDescription(
                    reader.GetString(0),
                    reader.GetString(1),
                    reader.GetString(2) == "YES");
                columns.Add(col);
            }
        }

        return columns;
    }
    catch
    {
        return null;
    }
}

Y este genera las Data Columns. Resalto que sólo el tipo de dato uniqueidentifier se debe establecer explícitamente; si la columna destino es de tipo numérico, SQL Server intenta hacer el cast.

protected DataColumn GetDataColumnDefinition(TableColumnDescription col)
{
    // uniqueidentifier requires specific column type definition
    // other types are not handled because the file only has strings
    var column = col.ColumnType == "uniqueidentifier" ?
        new DataColumn(col.ColumnNametypeof(Guid)) :
        new DataColumn(col.ColumnName);
    column.AllowDBNull = col.IsNullable;
    return column;
}

Corrección. El último método tiene un error:

protected DataColumn GetDataColumnDefinition(TableColumnDescription col)
{
    var columnType = col.ColumnType switch
    {
        "uniqueidentifier" => typeof(Guid),
        "decimal" => typeof(decimal),
        _ => null
    };

    var column = columnType != null ?
        new DataColumn(col.ColumnNamecolumnType) :
        new DataColumn(col.ColumnName);

    column.AllowDBNull = col.IsNullable;
    return column;
}

Si no asignas el tipo de dato decimal en la DataColumn, el SqlBulkCopy ignora el punto decimal. Por ejemplo, si tienes el número 34902.45, el valor insertado será 3490245.0000.




Comentarios

Entradas más populares de este blog

Ampliar el sistema de archivos de una máquina virtual Ubuntu

Adviento C# 2020: Diario de ASP.NET

C#: Registrar las dependencias de un módulo usando extensiones