BulkWriters128

First 128 "bulk writers".
git clone https://git.philomathiclife.com/repos/BulkWriters128
Log | Files | Refs | README

IterDataReaderBaseCase.cs (13332B)


      1 using Serde.Bin.Ser;    
      2 using static SQLServer.Helpers;    
      3 using Std;    
      4 using Std.Iter;    
      5 using Std.Maybe;    
      6 using Std.Vec;    
      7 using System;    
      8 using System.Data;    
      9 using System.Data.SqlTypes;    
     10 #region Namespaces    
     11 namespace SQLServer {    
     12     #region Types    
     13     sealed class IterDataReader<TRowIter, TRow, TProd, TErr, T0>: IDataReader where T0: struct, IDataType where TErr: notnull, IBulkRowError where TProd: notnull, IBinSerializable, IProduct<T0> where TRow: notnull, ISum<TProd, TErr> where TRowIter: notnull, IFusedIterator<TRow> {    
     14     
     15         #region Type-level Constructors    
     16         #endregion    
     17     
     18         #region Instance Constructors    
     19         // MUST ensure that processName is no more than 128 chars in length before calling!    
     20         internal IterDataReader(Maybe<ErrorTable> error, UserTable table, Prod<ulong, double> maxErrorsAllowed, ulong currentProcessed, ulong currentError, TRowIter iter, string processName, string userName, bool numericRoundAbort, string truncationStackTrace) => (_error, _table, _errTableExists, _maxErrorsAllowed, CurrentProcessedCount, CurrentErrorCount, _iter, _errs, _isClosed, _numericRoundAbort, _current, _processName, _userName, _truncationStackTrace, _ser) = (error.IsSome ? error.Unwrap() : default, table, error.IsSome, maxErrorsAllowed, currentProcessed, currentError, iter, new Vec<Prod<nvarchar, nvarchar, varbinary>>(), false, numericRoundAbort, default!, processName, userName, new(truncationStackTrace), Serializer.New());    
     21         #endregion    
     22     
     23         #region Type-level Fields    
     24         #endregion    
     25     
     26         #region Instance Fields    
     27         readonly ErrorTable _error;    
     28         readonly UserTable _table;    
     29         TProd _current;    
     30         Serializer _ser;    
     31         readonly Prod<ulong, double> _maxErrorsAllowed;    
     32         Vec<Prod<nvarchar, nvarchar, varbinary>> _errs;    
     33         TRowIter _iter;    
     34         readonly nvarchar _truncationStackTrace;    
     35         readonly string _processName;    
     36         readonly string _userName;    
     37         internal ulong CurrentErrorCount;    
     38         internal ulong CurrentProcessedCount;    
     39         readonly bool _errTableExists;    
     40         readonly bool _numericRoundAbort;    
     41         [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0032:Use auto property", Justification = "Want to order the fields in a particular way while still following my convention of properties being separated from fields.")]    
     42         bool _isClosed;    
     43         #endregion    
     44     
     45         #region Type-level Properties    
     46         #endregion    
     47     
     48         #region Instance Properties    
     49         public bool IsClosed => _isClosed;    
     50         double CurrentErrorRatio => (double)CurrentErrorCount / CurrentProcessedCount;    
     51         public int Depth => 0;    
     52         public int FieldCount => _table.ColumnCount;    
     53         public object this[int ordinal] => GetValue(ordinal);    
     54         public object this[string columnName] => GetValue(GetOrdinal(columnName));    
     55         public int RecordsAffected => -1;    
     56         #endregion    
     57     
     58         #region Type-level Functions    
     59         #endregion    
     60     
     61         #region Instance Functions    
     62         public void Close() => Dispose();    
     63         public void Dispose() {    
     64     
     65             if (_isClosed) { return; }    
     66             (_isClosed, _current, _iter, _ser) = (true, default!, default!, default);    
     67             if (_errTableExists) { _ = WriteErrors(in _error, ref _errs, _processName, _userName, ushort.MinValue); }    
     68             _errs = new Vec<Prod<nvarchar, nvarchar, varbinary>>();    
     69         }    
     70         public sealed override bool Equals(object? _) => false;    
     71         public bool GetBoolean(int ordinal) => (bool)GetValue(ordinal);    
     72         public byte GetByte(int ordinal) => (byte)GetValue(ordinal);    
     73         public long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length) {    
     74     
     75             var val = GetValue(ordinal);    
     76             var bytes = (byte[])val;    
     77             var offset = (int)dataOffset;    
     78             var len = bytes.Length - offset;    
     79             if (len <= 0) { return 0L; }    
     80             var count = Math.Min(len, length);    
     81             var i = 0;    
     82             while (i < count) { buffer![bufferOffset + i] = bytes[offset + i++]; }    
     83             return count;    
     84         }    
     85         public char GetChar(int ordinal) {    
     86     
     87             var val = (string)GetValue(ordinal);    
     88             return val.Length == 1 ? val[0] : throw new InvalidCastException();    
     89         }    
     90         public long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length) {    
     91     
     92             var val = GetValue(ordinal);    
     93             var chars = (string)val;    
     94             var offset = (int)dataOffset;    
     95             var len = chars.Length - offset;    
     96             if (len <= 0) { return 0L; }    
     97             var count = Math.Min(len, length);    
     98             var i = 0;    
     99             while (i < count) { buffer![bufferOffset + i] = chars[offset + i++]; }    
    100             return count;    
    101         }    
    102         public IDataReader GetData(int _) => throw new NotSupportedException();    
    103         public string GetDataTypeName(int ordinal) => GetFieldType(ordinal).Name;    
    104         public DateTime GetDateTime(int ordinal) { var type = GetFieldType(ordinal); return type == typeof(DateTime) ? (DateTime)GetValue(ordinal) : ((SqlDateTime)GetValue(ordinal)).Value; }    
    105         public decimal GetDecimal(int ordinal) { var type = GetFieldType(ordinal); return type == typeof(SqlDecimal) ? ((SqlDecimal)GetValue(ordinal)).Value : ((SqlMoney)GetValue(ordinal)).Value; }    
    106         public double GetDouble(int ordinal) => (double)GetValue(ordinal);    
    107         public Type GetFieldType(int ordinal) => _typeMap[_table[(ushort)ordinal].DataType];    
    108         public float GetFloat(int ordinal) => (float)GetValue(ordinal);    
    109         public Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal);    
    110         public sealed override int GetHashCode() => 0;    
    111         public short GetInt16(int ordinal) => (short)GetValue(ordinal);    
    112         public int GetInt32(int ordinal) => (int)GetValue(ordinal);    
    113         public long GetInt64(int ordinal) => (long)GetValue(ordinal);    
    114         public string GetName(int ordinal) => _table[(ushort)ordinal].Name;    
    115         public int GetOrdinal(string name) {    
    116     
    117             for (ushort i = 0; i < _table.ColumnCount; i++) { if (_table.Schema.Name.Culture.CompareInfo.Compare(name, _table[i].Name, _table.Schema.Name.Options) == 0) { return i; } }    
    118             throw new ArgumentException($"The column name, {name}, does not exist in {_table.IntoString()}.");    
    119         }    
    120         public DataTable GetSchemaTable() {    
    121     
    122             DataTable schema = new() { MinimumCapacity = _table.ColumnCount, TableName = $"{_table.Schema.Name.Value}.{_table.Name}", Locale = _table.Schema.Name.Culture };    
    123             _ = schema.Columns.Add("Ordinal", typeof(ushort));    
    124             _ = schema.Columns.Add("ColumnName", typeof(string));    
    125             _ = schema.Columns.Add("DataType", typeof(Type));    
    126     
    127             for (ushort i = 0; i < _table.ColumnCount; i++) {    
    128                 ref readonly var col = ref _table[i];    
    129                 _ = schema.Rows.Add(i, col.Name, GetFieldType(i));    
    130             }    
    131             return schema;    
    132         }    
    133         public string GetString(int ordinal) => (string)GetValue(ordinal);    
    134         public object GetValue(int ordinal) => ordinal switch { 0 => _current.Field0.Val, _ => throw new ArgumentOutOfRangeException($"{ordinal.ToString()} is not 0."), };    
    135         public int GetValues(object[] values) {    
    136     
    137             var count = Math.Min(values.Length, _table.ColumnCount);    
    138             for (var i = 0; i < count; i++) { values[i] = GetValue(i); }    
    139             return count;    
    140         }    
    141         public bool IsDBNull(int ordinal) => ordinal switch { 0 => _current.Field0.IsNULL, _ => throw new ArgumentOutOfRangeException($"{ordinal.ToString()} is not 0."), };    
    142         public bool NextResult() {    
    143     
    144             if (_errTableExists) { _ = WriteErrors(in _error, ref _errs, _processName, _userName, ushort.MinValue); }    
    145             (_errs, _current, _iter, _ser) = (new Vec<Prod<nvarchar, nvarchar, varbinary>>(), default!, default!, default);    
    146             return false;    
    147         }    
    148         public bool Read() {    
    149             // It would be a lot cleaner to make this function recursive, but that can easily cause a stack overflow.    
    150             while (true) {    
    151                 // Error threshold has been exceeded, so we cease processing.    
    152                 if (CurrentErrorCount > _maxErrorsAllowed.Item0 && CurrentErrorRatio > _maxErrorsAllowed.Item1) {    
    153                     if (_errTableExists) { _ = WriteErrors(in _error, ref _errs, _processName, _userName, ushort.MinValue); }    
    154                     (_errs, _current, _iter, _ser) = (new Vec<Prod<nvarchar, nvarchar, varbinary>>(), default!, default!, default);    
    155                     return false;    
    156                 }    
    157                 var cur = _iter!.Next();    
    158                 // The iterator has no more rows.    
    159                 if (cur.IsNone) {    
    160                     if (_errTableExists) { _ = WriteErrors(in _error, ref _errs, _processName, _userName, ushort.MinValue); }    
    161                     (_errs, _current, _iter, _ser) = (new Vec<Prod<nvarchar, nvarchar, varbinary>>(), default!, default!, default);    
    162                     return false;    
    163                 }    
    164                 CurrentProcessedCount++;    
    165                 var res = cur.Unwrap();    
    166                 // The iterator has returned an error, so we log it and continue processing.    
    167                 if (res.Variant == Var2.V1) {    
    168                     CurrentErrorCount++;    
    169                     if (_errTableExists) { var err = res.Variant1; _ = _errs.Push(new(err.Trace, err.Message, err.Data)); _ = WriteErrors(in _error, ref _errs, _processName, _userName, 4095); }    
    170                     continue;    
    171                 }    
    172                 _current = res.Variant0;    
    173                 // We perform truncation/overflow checks on the valid row and log an error if one would occur and continue processing.    
    174                 // If all columns are good to go, then we return true.    
    175                 // We do not want to check if _error is Some for each column for both truncation and NULL in non-NULL checks;    
    176                 // so we instead do the check once, and call the appropriate function.    
    177                 if (_errTableExists ? TruncateOverflowCheckAndLog() : TruncateOverflowCheck()) {    
    178                     continue;    
    179                 } else {    
    180                     return true;    
    181                 }    
    182             }    
    183         }    
    184         public sealed override string ToString() => string.Empty;    
    185         bool TruncateOverflowCheck() {    
    186     
    187             if (_current.Field0.IsNULL) {    
    188     
    189                 if (!_table[ushort.MinValue].IsNullable) {    
    190                     CurrentErrorCount++;    
    191                     return true;    
    192                 } else {    
    193                     return false;    
    194                 }    
    195             } else if (_current.Field0.TruncationWillOccur(_table[ushort.MinValue], _numericRoundAbort)) {    
    196                 CurrentErrorCount++;    
    197                 return true;    
    198             } else {    
    199                 return false;    
    200             }    
    201         }    
    202         // MUST ensure _error is Some before calling!    
    203         bool TruncateOverflowCheckAndLog() {    
    204     
    205             if (_current.Field0.IsNULL) {    
    206     
    207                 if (!_table[ushort.MinValue].IsNullable) {    
    208                     CurrentErrorCount++;    
    209                     _ = _current.Ser(ref _ser);    
    210                     _ = _errs.Push(new(_truncationStackTrace, new($"NULLs are not allowed in {_table.IntoString()}.{_table[ushort.MinValue].Name}."), varbinary.New(_ser.SerializedData)));    
    211                     _ = WriteErrors(in _error, ref _errs, _processName, _userName, 4095);    
    212                     _ = _ser.Reset();    
    213                     return true;    
    214                 } else {    
    215                     return false;    
    216                 }    
    217             } else if (_current.Field0.TruncationWillOccur(_table[ushort.MinValue], _numericRoundAbort)) {    
    218                 CurrentErrorCount++;    
    219                 _ = _current.Ser(ref _ser);    
    220                 _ = _errs.Push(new(_truncationStackTrace, new($"{_current.Field0.Into()} would truncate or overflow in {_table.IntoString()}.{_table[ushort.MinValue].Name}."), varbinary.New(_ser.SerializedData)));    
    221                 _ = WriteErrors(in _error, ref _errs, _processName, _userName, 4095);    
    222                 _ = _ser.Reset();    
    223                 return true;    
    224             } else {    
    225                 return false;    
    226             }    
    227         }    
    228         #endregion    
    229     
    230         #region Operators    
    231         #endregion    
    232     
    233         #region Types    
    234         #endregion    
    235     }    
    236     #endregion    
    237     
    238     #region Namespaces    
    239     #endregion    
    240 }    
    241 #endregion