BulkWriters128

First 128 "bulk writers".
git clone https://git.philomathiclife.com/repos/BulkWriters128
Log | Files | Refs | README

BulkWriterBaseCase.cs (14553B)


      1 using Microsoft.Data.SqlClient;
      2 using Serde.Bin.Ser;
      3 using Std;
      4 using Std.Iter;
      5 using Std.Maybe;
      6 using Std.Num;
      7 using Std.Result;
      8 using System;
      9 using System.Data;
     10 using System.Diagnostics;
     11 using static SQLServer.Helpers;
     12 using System.Runtime.InteropServices;
     13 #region Namespaces
     14 namespace SQLServer {
     15     #region Types
     16     [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Unicode, Pack = 0)]
     17     public struct BulkWriter<T0>: IBulkWriter where T0: struct, IDataType {
     18 
     19         #region Type-level Constructors
     20         #endregion
     21 
     22         #region Instance Constructors
     23         public BulkWriter() => throw new InvalidOperationException("Parameterless constructor is not allowed to be called!");
     24         BulkWriter(UserTable table, Maybe<ErrorTable> errTable, Prod<ulong, double> maxErrorsAllowed, string processName, string userName, bool encrypted, bool encryptedMod) => (Destination, ErrTable, MaxErrorsAllowed, _currentErrorCount, _currentProcessedCount, _processName, _userName, _containsEncryptedColumn, _mustBeAllowEncryptedValueModifications) = (table, errTable, maxErrorsAllowed, ulong.MinValue, ulong.MinValue, processName, userName, encrypted, encryptedMod);
     25         #endregion
     26 
     27         #region Type-level Fields
     28         #endregion
     29 
     30         #region Instance Fields
     31         public readonly Maybe<ErrorTable> ErrTable;
     32         public readonly UserTable Destination;
     33         public readonly Prod<ulong, double> MaxErrorsAllowed;
     34         [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0032:Use auto property", Justification = "Prefer fields.")]
     35         ulong _currentErrorCount;
     36         [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0032:Use auto property", Justification = "Prefer fields.")]
     37         ulong _currentProcessedCount;
     38         readonly string _processName;
     39         readonly string _userName;
     40         readonly bool _containsEncryptedColumn;
     41         readonly bool _mustBeAllowEncryptedValueModifications;
     42         #endregion
     43 
     44         #region Type-level Properties
     45         #endregion
     46 
     47         #region Instance Properties
     48         readonly Maybe<ErrorTable> IBulkWriter.ErrTable => ErrTable;
     49         readonly UserTable IBulkWriter.Destination => Destination;
     50         readonly Prod<ulong, double> IBulkWriter.MaxErrorsAllowed => MaxErrorsAllowed;
     51         public readonly ulong CurrentErrorCount => _currentErrorCount;
     52         public readonly ulong CurrentProcessedCount => _currentProcessedCount;
     53         public readonly ulong CurrentSuccessfullyProcessedCount => _currentProcessedCount - _currentErrorCount;
     54         public readonly double CurrentErrorRatio => (double)_currentErrorCount / _currentProcessedCount;
     55         public readonly bool IsInError => _currentErrorCount > MaxErrorsAllowed.Item0 && CurrentErrorRatio > MaxErrorsAllowed.Item1;
     56         #endregion
     57 
     58         #region Type-level Functions
     59         public static Result<BulkWriter<T0>, BulkWriterCreateError> New(in UserTable table, Prod<ulong, double> maxErrorsAllowed, in Maybe<ErrorTable> errTable, string processName, string userName) {
     60 
     61             if (table.Schema.Database.IsReadOnly) {
     62                 return new(new BulkWriterCreateError(BulkWriterCreateError.Tag.DatabaseIsReadOnly, new StackTrace(1, true)));
     63             } else if (double.IsNaN(maxErrorsAllowed.Item1) || double.IsNegative(maxErrorsAllowed.Item1) || maxErrorsAllowed.Item1 > 1.0d) {
     64                 return new(new BulkWriterCreateError(BulkWriterCreateError.Tag.InvalidErrorRatio, new StackTrace(1, true)));
     65             } else if (processName.Length > 128) {
     66                 return new(new BulkWriterCreateError(BulkWriterCreateError.Tag.ProcessNameLengthExceeds128, new StackTrace(1, true)));
     67             } else if (userName.Length > 128) {
     68                 return new(new BulkWriterCreateError(BulkWriterCreateError.Tag.UserNameLengthExceeds128, new StackTrace(1, true)));
     69             } else {
     70                 var counter = ushort.MinValue;
     71                 var match = TypeMatch(Std.Iter.Functions.FromFn(() => counter++ switch { ushort.MinValue => new(typeof(T0)), _ => Maybe<Type>.None() }), in table);
     72                 return match.IsSome ? match.Unwrap() ? new(new BulkWriter<T0>(table, errTable, maxErrorsAllowed, processName, userName, table.ContainsEncryptedColumn(), false)) : new(new BulkWriterCreateError(BulkWriterCreateError.Tag.TypeMismatch, new StackTrace(1, true))) : new(new BulkWriter<T0>(table, errTable, maxErrorsAllowed, processName, userName, true, true));
     73             }
     74         }
     75         #endregion
     76 
     77         #region Instance Functions
     78         public override readonly bool Equals(object? _) => false;
     79         public override readonly int GetHashCode() => 0;
     80         readonly void IBulkWriter.Sealed(){}
     81         public override readonly string ToString() => string.Empty;
     82         [System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA2100:Review SQL queries for security vulnerabilities", Justification = "Text is generated internally.")]
     83         public Result<Unit, WriteError> Write<TRowIter, TRow, TProd, TErr>(ref TRowIter iter, WriteOptions writeOptions, Maybe<NonZeroUshort> batchSize, Maybe<NonZeroUshort> timeout, bool enableStreaming, bool isSortedAccordingToClusteredIndex, SessionOptions options) where TErr: notnull, IBulkRowError where TProd: notnull, IBinSerializable, IProduct<T0> where TRow: notnull, ISum<TProd, TErr> where TRowIter: notnull, IFusedIterator<TRow> {
     84 
     85             var allowEncryptMod = false;
     86 
     87             if ((writeOptions & WriteOptions.AllowEncryptedValueModifications) == WriteOptions.AllowEncryptedValueModifications) {
     88 
     89                 if (_containsEncryptedColumn) {
     90                     allowEncryptMod = true;
     91                 } else {
     92                     return new(new WriteError(WriteError.Tag.TableExpectedToContainEncryptedDataButDoesNot, new StackTrace(1, true)));
     93                 }
     94             } else if(_mustBeAllowEncryptedValueModifications) {
     95                 return new(new WriteError(WriteError.Tag.TableColumnMismatchOrWriteOptionsMustContainAllowEncryptedValueModifications, new StackTrace(1, true)));
     96             }
     97             using var con = Functions.CreateOpenedConnection(in Destination.Schema.Database, options, !allowEncryptMod && _containsEncryptedColumn, Maybe<Uri>.None());
     98             
     99             if ((writeOptions & WriteOptions.KeepIdentity) == WriteOptions.KeepIdentity && Destination.ContainsIDENTITYColumn()) {
    100                 using var txn = con.BeginTransaction(IsolationLevel.Serializable);
    101                 using (SqlCommand qry = new($"SET IDENTITY_INSERT [{Destination.Schema.Name.Value}].[{Destination.Name}] ON;", con, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = 60, CommandType = CommandType.Text, EnableOptimizedParameterBinding = true }) { _ = qry.ExecuteNonQuery(); }
    102                 txn.Commit();
    103             }
    104             return WriteInternal<TRowIter, TRow, TProd, TErr>(ref iter, writeOptions, batchSize, timeout, enableStreaming, isSortedAccordingToClusteredIndex, con, null, (options & SessionOptions.NUMERIC_ROUNDABORT_OFF) != SessionOptions.NUMERIC_ROUNDABORT_OFF, new StackTrace(1, true).ToString()).MapErr(_writeTxnErrToWriteErr);
    105         }
    106         [System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA2100:Review SQL queries for security vulnerabilities", Justification = "Text is generated internally.")]
    107         public Result<Unit, WriteErrorOrTransactionError> Write<TRowIter, TRow, TProd, TErr>(ref TRowIter iter, WriteOptions writeOptions, Maybe<NonZeroUshort> batchSize, Maybe<NonZeroUshort> timeout, bool enableStreaming, bool isSortedAccordingToClusteredIndex, SqlTransaction txn) where TErr: notnull, IBulkRowError where TProd: notnull, IBinSerializable, IProduct<T0> where TRow: notnull, ISum<TProd, TErr> where TRowIter: notnull, IFusedIterator<TRow> {
    108 
    109             if (txn.Connection.State != ConnectionState.Open) {
    110                 return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.SqlConnectionIsNotOpen, new StackTrace(1, true)));
    111             } else if (!string.Equals(txn.Connection.DataSource, $"tcp:{Destination.Schema.Database.Server.IntoString()}", StringComparison.Ordinal)) {
    112                 return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.SqlConnectionServerMismatch, new StackTrace(1, true)));
    113             } else {
    114 
    115                 if ((writeOptions & WriteOptions.AllowEncryptedValueModifications) == WriteOptions.AllowEncryptedValueModifications) {
    116 
    117                     if (_containsEncryptedColumn) {
    118 
    119                         if (!txn.Connection.ConnectionString.Contains("Column Encryption Setting=enabled", StringComparison.Ordinal)) {
    120                             return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.SqlConnectionViolatesAllowEncryptedValueModifications, new StackTrace(1, true)));
    121                         }
    122                     } else {
    123                         return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.TableExpectedToContainEncryptedDataButDoesNot, new StackTrace(1, true)));
    124                     }
    125                 } else if (_mustBeAllowEncryptedValueModifications) {
    126                     return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.TableColumnMismatchOrWriteOptionsMustContainAllowEncryptedValueModifications, new StackTrace(1, true)));
    127                 } else if (_containsEncryptedColumn && !txn.Connection.ConnectionString.Contains("Column Encryption Setting=enabled", StringComparison.Ordinal)) {
    128                     return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.ContainsEncryptedColumnsButConnectionDoesNotHandleEncryptedColumns, new StackTrace(1, true)));
    129                 }
    130 
    131                 if ((writeOptions & WriteOptions.KeepIdentity) == WriteOptions.KeepIdentity && Destination.ContainsIDENTITYColumn()) {
    132                     bool numericRoundAbort;
    133 
    134                     using (SqlCommand qry = new($@"SET IDENTITY_INSERT [{Destination.Schema.Database.Name.Value}].[{Destination.Schema.Name.Value}].[{Destination.Name}] ON;
    135 SELECT CASE (@@OPTIONS & 8192)
    136            WHEN 8192 THEN CONVERT(bit, 1)
    137            ELSE CONVERT(bit, 0)
    138        END AS fblnNumericRoundabort;", txn.Connection, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = timeout.MapOr(0, _nzUshortToInt), CommandType = CommandType.Text, EnableOptimizedParameterBinding = true }) {
    139                         numericRoundAbort = (bool)qry.ExecuteScalar();
    140                     }
    141                     var tableCopy = Destination;
    142                     return WriteInternal<TRowIter, TRow, TProd, TErr>(ref iter, writeOptions, batchSize, timeout, enableStreaming, isSortedAccordingToClusteredIndex, txn.Connection, txn, numericRoundAbort, new StackTrace(1, true).ToString()).MapOrElse(
    143                         (err) => {
    144                             using SqlCommand qry = new($"SET IDENTITY_INSERT [{tableCopy.Schema.Database.Name.Value}].[{tableCopy.Schema.Name.Value}].[{tableCopy.Name}] OFF;", txn.Connection, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = timeout.MapOr(0, _nzUshortToInt), CommandType = CommandType.Text, EnableOptimizedParameterBinding = true };
    145                             _ = qry.ExecuteNonQuery();
    146                             return new Result<Unit, WriteErrorOrTransactionError>(err);
    147                         },
    148                         (x) => {
    149                             using SqlCommand qry = new($"SET IDENTITY_INSERT [{tableCopy.Schema.Database.Name.Value}].[{tableCopy.Schema.Name.Value}].[{tableCopy.Name}] OFF;", txn.Connection, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = timeout.MapOr(0, _nzUshortToInt), CommandType = CommandType.Text, EnableOptimizedParameterBinding = true };
    150                             var unused = qry.ExecuteNonQuery();
    151                             return new(x);
    152                         }
    153                     );
    154                 } else {
    155                     bool numericRoundAbort;
    156 
    157                     using (SqlCommand qry = new($@"SELECT CASE (@@OPTIONS & 8192)
    158 WHEN 8192 THEN CONVERT(bit, 1)
    159 ELSE CONVERT(bit, 0)
    160 END AS fblnNumericRoundabort;", txn.Connection, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = timeout.MapOr(0, _nzUshortToInt), CommandType = CommandType.Text, EnableOptimizedParameterBinding = true }) {
    161                         numericRoundAbort = (bool)qry.ExecuteScalar();
    162                     }
    163                     return WriteInternal<TRowIter, TRow, TProd, TErr>(ref iter, writeOptions, batchSize, timeout, enableStreaming, isSortedAccordingToClusteredIndex, txn.Connection, txn, numericRoundAbort, new StackTrace(1, true).ToString());
    164                 }
    165             }
    166         }
    167         // MUST ensure that txn is either null or uses con as its SqlConnection!
    168         Result<Unit, WriteErrorOrTransactionError> WriteInternal<TRowIter, TRow, TProd, TErr>(ref TRowIter iter, WriteOptions writeOptions, Maybe<NonZeroUshort> batchSize, Maybe<NonZeroUshort> timeout, bool enableStreaming, bool isSortedAccordingToClusteredIndex, SqlConnection con, SqlTransaction? txn, bool numericRoundAbort, string stackTrace) where TErr: notnull, IBulkRowError where TProd: notnull, IBinSerializable, IProduct<T0> where TRow: notnull, ISum<TProd, TErr> where TRowIter: notnull, IFusedIterator<TRow> {
    169 
    170             if (_currentErrorCount > MaxErrorsAllowed.Item0 && CurrentErrorRatio > MaxErrorsAllowed.Item1) { return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.MaxErrorsExceeded, new StackTrace(1, true))); }
    171             using var blk = CreateBulkCopy(in Destination, writeOptions, batchSize, timeout, enableStreaming, isSortedAccordingToClusteredIndex, con, txn);
    172             using IterDataReader<TRowIter, TRow, TProd, TErr, T0> rdr = new(ErrTable, Destination, MaxErrorsAllowed, _currentProcessedCount, _currentErrorCount, iter, _processName, _userName, numericRoundAbort, stackTrace);
    173             blk.WriteToServer(rdr);
    174             _currentProcessedCount = rdr.CurrentProcessedCount;
    175             return (_currentErrorCount = rdr.CurrentErrorCount) > MaxErrorsAllowed.Item0 && CurrentErrorRatio > MaxErrorsAllowed.Item1 ? new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.MaxErrorsExceeded, new StackTrace(1, true))) : new(new Unit());
    176         }
    177         #endregion
    178 
    179         #region Operators
    180         #endregion
    181 
    182         #region Types
    183         #endregion
    184     }
    185     #endregion
    186 
    187     #region Namespaces
    188     #endregion
    189 }
    190 #endregion