BulkWriterBaseCase.cs (14553B)
1 using Microsoft.Data.SqlClient; 2 using Serde.Bin.Ser; 3 using Std; 4 using Std.Iter; 5 using Std.Maybe; 6 using Std.Num; 7 using Std.Result; 8 using System; 9 using System.Data; 10 using System.Diagnostics; 11 using static SQLServer.Helpers; 12 using System.Runtime.InteropServices; 13 #region Namespaces 14 namespace SQLServer { 15 #region Types 16 [StructLayout(LayoutKind.Sequential, CharSet = CharSet.Unicode, Pack = 0)] 17 public struct BulkWriter<T0>: IBulkWriter where T0: struct, IDataType { 18 19 #region Type-level Constructors 20 #endregion 21 22 #region Instance Constructors 23 public BulkWriter() => throw new InvalidOperationException("Parameterless constructor is not allowed to be called!"); 24 BulkWriter(UserTable table, Maybe<ErrorTable> errTable, Prod<ulong, double> maxErrorsAllowed, string processName, string userName, bool encrypted, bool encryptedMod) => (Destination, ErrTable, MaxErrorsAllowed, _currentErrorCount, _currentProcessedCount, _processName, _userName, _containsEncryptedColumn, _mustBeAllowEncryptedValueModifications) = (table, errTable, maxErrorsAllowed, ulong.MinValue, ulong.MinValue, processName, userName, encrypted, encryptedMod); 25 #endregion 26 27 #region Type-level Fields 28 #endregion 29 30 #region Instance Fields 31 public readonly Maybe<ErrorTable> ErrTable; 32 public readonly UserTable Destination; 33 public readonly Prod<ulong, double> MaxErrorsAllowed; 34 [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0032:Use auto property", Justification = "Prefer fields.")] 35 ulong _currentErrorCount; 36 [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0032:Use auto property", Justification = "Prefer fields.")] 37 ulong _currentProcessedCount; 38 readonly string _processName; 39 readonly string _userName; 40 readonly bool _containsEncryptedColumn; 41 readonly bool _mustBeAllowEncryptedValueModifications; 42 #endregion 43 44 #region Type-level Properties 45 #endregion 46 47 #region Instance Properties 48 readonly Maybe<ErrorTable> IBulkWriter.ErrTable => ErrTable; 49 readonly UserTable IBulkWriter.Destination => Destination; 50 readonly Prod<ulong, double> IBulkWriter.MaxErrorsAllowed => MaxErrorsAllowed; 51 public readonly ulong CurrentErrorCount => _currentErrorCount; 52 public readonly ulong CurrentProcessedCount => _currentProcessedCount; 53 public readonly ulong CurrentSuccessfullyProcessedCount => _currentProcessedCount - _currentErrorCount; 54 public readonly double CurrentErrorRatio => (double)_currentErrorCount / _currentProcessedCount; 55 public readonly bool IsInError => _currentErrorCount > MaxErrorsAllowed.Item0 && CurrentErrorRatio > MaxErrorsAllowed.Item1; 56 #endregion 57 58 #region Type-level Functions 59 public static Result<BulkWriter<T0>, BulkWriterCreateError> New(in UserTable table, Prod<ulong, double> maxErrorsAllowed, in Maybe<ErrorTable> errTable, string processName, string userName) { 60 61 if (table.Schema.Database.IsReadOnly) { 62 return new(new BulkWriterCreateError(BulkWriterCreateError.Tag.DatabaseIsReadOnly, new StackTrace(1, true))); 63 } else if (double.IsNaN(maxErrorsAllowed.Item1) || double.IsNegative(maxErrorsAllowed.Item1) || maxErrorsAllowed.Item1 > 1.0d) { 64 return new(new BulkWriterCreateError(BulkWriterCreateError.Tag.InvalidErrorRatio, new StackTrace(1, true))); 65 } else if (processName.Length > 128) { 66 return new(new BulkWriterCreateError(BulkWriterCreateError.Tag.ProcessNameLengthExceeds128, new StackTrace(1, true))); 67 } else if (userName.Length > 128) { 68 return new(new BulkWriterCreateError(BulkWriterCreateError.Tag.UserNameLengthExceeds128, new StackTrace(1, true))); 69 } else { 70 var counter = ushort.MinValue; 71 var match = TypeMatch(Std.Iter.Functions.FromFn(() => counter++ switch { ushort.MinValue => new(typeof(T0)), _ => Maybe<Type>.None() }), in table); 72 return match.IsSome ? match.Unwrap() ? new(new BulkWriter<T0>(table, errTable, maxErrorsAllowed, processName, userName, table.ContainsEncryptedColumn(), false)) : new(new BulkWriterCreateError(BulkWriterCreateError.Tag.TypeMismatch, new StackTrace(1, true))) : new(new BulkWriter<T0>(table, errTable, maxErrorsAllowed, processName, userName, true, true)); 73 } 74 } 75 #endregion 76 77 #region Instance Functions 78 public override readonly bool Equals(object? _) => false; 79 public override readonly int GetHashCode() => 0; 80 readonly void IBulkWriter.Sealed(){} 81 public override readonly string ToString() => string.Empty; 82 [System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA2100:Review SQL queries for security vulnerabilities", Justification = "Text is generated internally.")] 83 public Result<Unit, WriteError> Write<TRowIter, TRow, TProd, TErr>(ref TRowIter iter, WriteOptions writeOptions, Maybe<NonZeroUshort> batchSize, Maybe<NonZeroUshort> timeout, bool enableStreaming, bool isSortedAccordingToClusteredIndex, SessionOptions options) where TErr: notnull, IBulkRowError where TProd: notnull, IBinSerializable, IProduct<T0> where TRow: notnull, ISum<TProd, TErr> where TRowIter: notnull, IFusedIterator<TRow> { 84 85 var allowEncryptMod = false; 86 87 if ((writeOptions & WriteOptions.AllowEncryptedValueModifications) == WriteOptions.AllowEncryptedValueModifications) { 88 89 if (_containsEncryptedColumn) { 90 allowEncryptMod = true; 91 } else { 92 return new(new WriteError(WriteError.Tag.TableExpectedToContainEncryptedDataButDoesNot, new StackTrace(1, true))); 93 } 94 } else if(_mustBeAllowEncryptedValueModifications) { 95 return new(new WriteError(WriteError.Tag.TableColumnMismatchOrWriteOptionsMustContainAllowEncryptedValueModifications, new StackTrace(1, true))); 96 } 97 using var con = Functions.CreateOpenedConnection(in Destination.Schema.Database, options, !allowEncryptMod && _containsEncryptedColumn, Maybe<Uri>.None()); 98 99 if ((writeOptions & WriteOptions.KeepIdentity) == WriteOptions.KeepIdentity && Destination.ContainsIDENTITYColumn()) { 100 using var txn = con.BeginTransaction(IsolationLevel.Serializable); 101 using (SqlCommand qry = new($"SET IDENTITY_INSERT [{Destination.Schema.Name.Value}].[{Destination.Name}] ON;", con, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = 60, CommandType = CommandType.Text, EnableOptimizedParameterBinding = true }) { _ = qry.ExecuteNonQuery(); } 102 txn.Commit(); 103 } 104 return WriteInternal<TRowIter, TRow, TProd, TErr>(ref iter, writeOptions, batchSize, timeout, enableStreaming, isSortedAccordingToClusteredIndex, con, null, (options & SessionOptions.NUMERIC_ROUNDABORT_OFF) != SessionOptions.NUMERIC_ROUNDABORT_OFF, new StackTrace(1, true).ToString()).MapErr(_writeTxnErrToWriteErr); 105 } 106 [System.Diagnostics.CodeAnalysis.SuppressMessage("Security", "CA2100:Review SQL queries for security vulnerabilities", Justification = "Text is generated internally.")] 107 public Result<Unit, WriteErrorOrTransactionError> Write<TRowIter, TRow, TProd, TErr>(ref TRowIter iter, WriteOptions writeOptions, Maybe<NonZeroUshort> batchSize, Maybe<NonZeroUshort> timeout, bool enableStreaming, bool isSortedAccordingToClusteredIndex, SqlTransaction txn) where TErr: notnull, IBulkRowError where TProd: notnull, IBinSerializable, IProduct<T0> where TRow: notnull, ISum<TProd, TErr> where TRowIter: notnull, IFusedIterator<TRow> { 108 109 if (txn.Connection.State != ConnectionState.Open) { 110 return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.SqlConnectionIsNotOpen, new StackTrace(1, true))); 111 } else if (!string.Equals(txn.Connection.DataSource, $"tcp:{Destination.Schema.Database.Server.IntoString()}", StringComparison.Ordinal)) { 112 return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.SqlConnectionServerMismatch, new StackTrace(1, true))); 113 } else { 114 115 if ((writeOptions & WriteOptions.AllowEncryptedValueModifications) == WriteOptions.AllowEncryptedValueModifications) { 116 117 if (_containsEncryptedColumn) { 118 119 if (!txn.Connection.ConnectionString.Contains("Column Encryption Setting=enabled", StringComparison.Ordinal)) { 120 return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.SqlConnectionViolatesAllowEncryptedValueModifications, new StackTrace(1, true))); 121 } 122 } else { 123 return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.TableExpectedToContainEncryptedDataButDoesNot, new StackTrace(1, true))); 124 } 125 } else if (_mustBeAllowEncryptedValueModifications) { 126 return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.TableColumnMismatchOrWriteOptionsMustContainAllowEncryptedValueModifications, new StackTrace(1, true))); 127 } else if (_containsEncryptedColumn && !txn.Connection.ConnectionString.Contains("Column Encryption Setting=enabled", StringComparison.Ordinal)) { 128 return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.ContainsEncryptedColumnsButConnectionDoesNotHandleEncryptedColumns, new StackTrace(1, true))); 129 } 130 131 if ((writeOptions & WriteOptions.KeepIdentity) == WriteOptions.KeepIdentity && Destination.ContainsIDENTITYColumn()) { 132 bool numericRoundAbort; 133 134 using (SqlCommand qry = new($@"SET IDENTITY_INSERT [{Destination.Schema.Database.Name.Value}].[{Destination.Schema.Name.Value}].[{Destination.Name}] ON; 135 SELECT CASE (@@OPTIONS & 8192) 136 WHEN 8192 THEN CONVERT(bit, 1) 137 ELSE CONVERT(bit, 0) 138 END AS fblnNumericRoundabort;", txn.Connection, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = timeout.MapOr(0, _nzUshortToInt), CommandType = CommandType.Text, EnableOptimizedParameterBinding = true }) { 139 numericRoundAbort = (bool)qry.ExecuteScalar(); 140 } 141 var tableCopy = Destination; 142 return WriteInternal<TRowIter, TRow, TProd, TErr>(ref iter, writeOptions, batchSize, timeout, enableStreaming, isSortedAccordingToClusteredIndex, txn.Connection, txn, numericRoundAbort, new StackTrace(1, true).ToString()).MapOrElse( 143 (err) => { 144 using SqlCommand qry = new($"SET IDENTITY_INSERT [{tableCopy.Schema.Database.Name.Value}].[{tableCopy.Schema.Name.Value}].[{tableCopy.Name}] OFF;", txn.Connection, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = timeout.MapOr(0, _nzUshortToInt), CommandType = CommandType.Text, EnableOptimizedParameterBinding = true }; 145 _ = qry.ExecuteNonQuery(); 146 return new Result<Unit, WriteErrorOrTransactionError>(err); 147 }, 148 (x) => { 149 using SqlCommand qry = new($"SET IDENTITY_INSERT [{tableCopy.Schema.Database.Name.Value}].[{tableCopy.Schema.Name.Value}].[{tableCopy.Name}] OFF;", txn.Connection, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = timeout.MapOr(0, _nzUshortToInt), CommandType = CommandType.Text, EnableOptimizedParameterBinding = true }; 150 var unused = qry.ExecuteNonQuery(); 151 return new(x); 152 } 153 ); 154 } else { 155 bool numericRoundAbort; 156 157 using (SqlCommand qry = new($@"SELECT CASE (@@OPTIONS & 8192) 158 WHEN 8192 THEN CONVERT(bit, 1) 159 ELSE CONVERT(bit, 0) 160 END AS fblnNumericRoundabort;", txn.Connection, txn, SqlCommandColumnEncryptionSetting.Disabled) { CommandTimeout = timeout.MapOr(0, _nzUshortToInt), CommandType = CommandType.Text, EnableOptimizedParameterBinding = true }) { 161 numericRoundAbort = (bool)qry.ExecuteScalar(); 162 } 163 return WriteInternal<TRowIter, TRow, TProd, TErr>(ref iter, writeOptions, batchSize, timeout, enableStreaming, isSortedAccordingToClusteredIndex, txn.Connection, txn, numericRoundAbort, new StackTrace(1, true).ToString()); 164 } 165 } 166 } 167 // MUST ensure that txn is either null or uses con as its SqlConnection! 168 Result<Unit, WriteErrorOrTransactionError> WriteInternal<TRowIter, TRow, TProd, TErr>(ref TRowIter iter, WriteOptions writeOptions, Maybe<NonZeroUshort> batchSize, Maybe<NonZeroUshort> timeout, bool enableStreaming, bool isSortedAccordingToClusteredIndex, SqlConnection con, SqlTransaction? txn, bool numericRoundAbort, string stackTrace) where TErr: notnull, IBulkRowError where TProd: notnull, IBinSerializable, IProduct<T0> where TRow: notnull, ISum<TProd, TErr> where TRowIter: notnull, IFusedIterator<TRow> { 169 170 if (_currentErrorCount > MaxErrorsAllowed.Item0 && CurrentErrorRatio > MaxErrorsAllowed.Item1) { return new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.MaxErrorsExceeded, new StackTrace(1, true))); } 171 using var blk = CreateBulkCopy(in Destination, writeOptions, batchSize, timeout, enableStreaming, isSortedAccordingToClusteredIndex, con, txn); 172 using IterDataReader<TRowIter, TRow, TProd, TErr, T0> rdr = new(ErrTable, Destination, MaxErrorsAllowed, _currentProcessedCount, _currentErrorCount, iter, _processName, _userName, numericRoundAbort, stackTrace); 173 blk.WriteToServer(rdr); 174 _currentProcessedCount = rdr.CurrentProcessedCount; 175 return (_currentErrorCount = rdr.CurrentErrorCount) > MaxErrorsAllowed.Item0 && CurrentErrorRatio > MaxErrorsAllowed.Item1 ? new(new WriteErrorOrTransactionError(WriteErrorOrTransactionError.Tag.MaxErrorsExceeded, new StackTrace(1, true))) : new(new Unit()); 176 } 177 #endregion 178 179 #region Operators 180 #endregion 181 182 #region Types 183 #endregion 184 } 185 #endregion 186 187 #region Namespaces 188 #endregion 189 } 190 #endregion