IterDataReaderBaseCase.cs (13332B)
1 using Serde.Bin.Ser; 2 using static SQLServer.Helpers; 3 using Std; 4 using Std.Iter; 5 using Std.Maybe; 6 using Std.Vec; 7 using System; 8 using System.Data; 9 using System.Data.SqlTypes; 10 #region Namespaces 11 namespace SQLServer { 12 #region Types 13 sealed class IterDataReader<TRowIter, TRow, TProd, TErr, T0>: IDataReader where T0: struct, IDataType where TErr: notnull, IBulkRowError where TProd: notnull, IBinSerializable, IProduct<T0> where TRow: notnull, ISum<TProd, TErr> where TRowIter: notnull, IFusedIterator<TRow> { 14 15 #region Type-level Constructors 16 #endregion 17 18 #region Instance Constructors 19 // MUST ensure that processName is no more than 128 chars in length before calling! 20 internal IterDataReader(Maybe<ErrorTable> error, UserTable table, Prod<ulong, double> maxErrorsAllowed, ulong currentProcessed, ulong currentError, TRowIter iter, string processName, string userName, bool numericRoundAbort, string truncationStackTrace) => (_error, _table, _errTableExists, _maxErrorsAllowed, CurrentProcessedCount, CurrentErrorCount, _iter, _errs, _isClosed, _numericRoundAbort, _current, _processName, _userName, _truncationStackTrace, _ser) = (error.IsSome ? error.Unwrap() : default, table, error.IsSome, maxErrorsAllowed, currentProcessed, currentError, iter, new Vec<Prod<nvarchar, nvarchar, varbinary>>(), false, numericRoundAbort, default!, processName, userName, new(truncationStackTrace), Serializer.New()); 21 #endregion 22 23 #region Type-level Fields 24 #endregion 25 26 #region Instance Fields 27 readonly ErrorTable _error; 28 readonly UserTable _table; 29 TProd _current; 30 Serializer _ser; 31 readonly Prod<ulong, double> _maxErrorsAllowed; 32 Vec<Prod<nvarchar, nvarchar, varbinary>> _errs; 33 TRowIter _iter; 34 readonly nvarchar _truncationStackTrace; 35 readonly string _processName; 36 readonly string _userName; 37 internal ulong CurrentErrorCount; 38 internal ulong CurrentProcessedCount; 39 readonly bool _errTableExists; 40 readonly bool _numericRoundAbort; 41 [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE0032:Use auto property", Justification = "Want to order the fields in a particular way while still following my convention of properties being separated from fields.")] 42 bool _isClosed; 43 #endregion 44 45 #region Type-level Properties 46 #endregion 47 48 #region Instance Properties 49 public bool IsClosed => _isClosed; 50 double CurrentErrorRatio => (double)CurrentErrorCount / CurrentProcessedCount; 51 public int Depth => 0; 52 public int FieldCount => _table.ColumnCount; 53 public object this[int ordinal] => GetValue(ordinal); 54 public object this[string columnName] => GetValue(GetOrdinal(columnName)); 55 public int RecordsAffected => -1; 56 #endregion 57 58 #region Type-level Functions 59 #endregion 60 61 #region Instance Functions 62 public void Close() => Dispose(); 63 public void Dispose() { 64 65 if (_isClosed) { return; } 66 (_isClosed, _current, _iter, _ser) = (true, default!, default!, default); 67 if (_errTableExists) { _ = WriteErrors(in _error, ref _errs, _processName, _userName, ushort.MinValue); } 68 _errs = new Vec<Prod<nvarchar, nvarchar, varbinary>>(); 69 } 70 public sealed override bool Equals(object? _) => false; 71 public bool GetBoolean(int ordinal) => (bool)GetValue(ordinal); 72 public byte GetByte(int ordinal) => (byte)GetValue(ordinal); 73 public long GetBytes(int ordinal, long dataOffset, byte[]? buffer, int bufferOffset, int length) { 74 75 var val = GetValue(ordinal); 76 var bytes = (byte[])val; 77 var offset = (int)dataOffset; 78 var len = bytes.Length - offset; 79 if (len <= 0) { return 0L; } 80 var count = Math.Min(len, length); 81 var i = 0; 82 while (i < count) { buffer![bufferOffset + i] = bytes[offset + i++]; } 83 return count; 84 } 85 public char GetChar(int ordinal) { 86 87 var val = (string)GetValue(ordinal); 88 return val.Length == 1 ? val[0] : throw new InvalidCastException(); 89 } 90 public long GetChars(int ordinal, long dataOffset, char[]? buffer, int bufferOffset, int length) { 91 92 var val = GetValue(ordinal); 93 var chars = (string)val; 94 var offset = (int)dataOffset; 95 var len = chars.Length - offset; 96 if (len <= 0) { return 0L; } 97 var count = Math.Min(len, length); 98 var i = 0; 99 while (i < count) { buffer![bufferOffset + i] = chars[offset + i++]; } 100 return count; 101 } 102 public IDataReader GetData(int _) => throw new NotSupportedException(); 103 public string GetDataTypeName(int ordinal) => GetFieldType(ordinal).Name; 104 public DateTime GetDateTime(int ordinal) { var type = GetFieldType(ordinal); return type == typeof(DateTime) ? (DateTime)GetValue(ordinal) : ((SqlDateTime)GetValue(ordinal)).Value; } 105 public decimal GetDecimal(int ordinal) { var type = GetFieldType(ordinal); return type == typeof(SqlDecimal) ? ((SqlDecimal)GetValue(ordinal)).Value : ((SqlMoney)GetValue(ordinal)).Value; } 106 public double GetDouble(int ordinal) => (double)GetValue(ordinal); 107 public Type GetFieldType(int ordinal) => _typeMap[_table[(ushort)ordinal].DataType]; 108 public float GetFloat(int ordinal) => (float)GetValue(ordinal); 109 public Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal); 110 public sealed override int GetHashCode() => 0; 111 public short GetInt16(int ordinal) => (short)GetValue(ordinal); 112 public int GetInt32(int ordinal) => (int)GetValue(ordinal); 113 public long GetInt64(int ordinal) => (long)GetValue(ordinal); 114 public string GetName(int ordinal) => _table[(ushort)ordinal].Name; 115 public int GetOrdinal(string name) { 116 117 for (ushort i = 0; i < _table.ColumnCount; i++) { if (_table.Schema.Name.Culture.CompareInfo.Compare(name, _table[i].Name, _table.Schema.Name.Options) == 0) { return i; } } 118 throw new ArgumentException($"The column name, {name}, does not exist in {_table.IntoString()}."); 119 } 120 public DataTable GetSchemaTable() { 121 122 DataTable schema = new() { MinimumCapacity = _table.ColumnCount, TableName = $"{_table.Schema.Name.Value}.{_table.Name}", Locale = _table.Schema.Name.Culture }; 123 _ = schema.Columns.Add("Ordinal", typeof(ushort)); 124 _ = schema.Columns.Add("ColumnName", typeof(string)); 125 _ = schema.Columns.Add("DataType", typeof(Type)); 126 127 for (ushort i = 0; i < _table.ColumnCount; i++) { 128 ref readonly var col = ref _table[i]; 129 _ = schema.Rows.Add(i, col.Name, GetFieldType(i)); 130 } 131 return schema; 132 } 133 public string GetString(int ordinal) => (string)GetValue(ordinal); 134 public object GetValue(int ordinal) => ordinal switch { 0 => _current.Field0.Val, _ => throw new ArgumentOutOfRangeException($"{ordinal.ToString()} is not 0."), }; 135 public int GetValues(object[] values) { 136 137 var count = Math.Min(values.Length, _table.ColumnCount); 138 for (var i = 0; i < count; i++) { values[i] = GetValue(i); } 139 return count; 140 } 141 public bool IsDBNull(int ordinal) => ordinal switch { 0 => _current.Field0.IsNULL, _ => throw new ArgumentOutOfRangeException($"{ordinal.ToString()} is not 0."), }; 142 public bool NextResult() { 143 144 if (_errTableExists) { _ = WriteErrors(in _error, ref _errs, _processName, _userName, ushort.MinValue); } 145 (_errs, _current, _iter, _ser) = (new Vec<Prod<nvarchar, nvarchar, varbinary>>(), default!, default!, default); 146 return false; 147 } 148 public bool Read() { 149 // It would be a lot cleaner to make this function recursive, but that can easily cause a stack overflow. 150 while (true) { 151 // Error threshold has been exceeded, so we cease processing. 152 if (CurrentErrorCount > _maxErrorsAllowed.Item0 && CurrentErrorRatio > _maxErrorsAllowed.Item1) { 153 if (_errTableExists) { _ = WriteErrors(in _error, ref _errs, _processName, _userName, ushort.MinValue); } 154 (_errs, _current, _iter, _ser) = (new Vec<Prod<nvarchar, nvarchar, varbinary>>(), default!, default!, default); 155 return false; 156 } 157 var cur = _iter!.Next(); 158 // The iterator has no more rows. 159 if (cur.IsNone) { 160 if (_errTableExists) { _ = WriteErrors(in _error, ref _errs, _processName, _userName, ushort.MinValue); } 161 (_errs, _current, _iter, _ser) = (new Vec<Prod<nvarchar, nvarchar, varbinary>>(), default!, default!, default); 162 return false; 163 } 164 CurrentProcessedCount++; 165 var res = cur.Unwrap(); 166 // The iterator has returned an error, so we log it and continue processing. 167 if (res.Variant == Var2.V1) { 168 CurrentErrorCount++; 169 if (_errTableExists) { var err = res.Variant1; _ = _errs.Push(new(err.Trace, err.Message, err.Data)); _ = WriteErrors(in _error, ref _errs, _processName, _userName, 4095); } 170 continue; 171 } 172 _current = res.Variant0; 173 // We perform truncation/overflow checks on the valid row and log an error if one would occur and continue processing. 174 // If all columns are good to go, then we return true. 175 // We do not want to check if _error is Some for each column for both truncation and NULL in non-NULL checks; 176 // so we instead do the check once, and call the appropriate function. 177 if (_errTableExists ? TruncateOverflowCheckAndLog() : TruncateOverflowCheck()) { 178 continue; 179 } else { 180 return true; 181 } 182 } 183 } 184 public sealed override string ToString() => string.Empty; 185 bool TruncateOverflowCheck() { 186 187 if (_current.Field0.IsNULL) { 188 189 if (!_table[ushort.MinValue].IsNullable) { 190 CurrentErrorCount++; 191 return true; 192 } else { 193 return false; 194 } 195 } else if (_current.Field0.TruncationWillOccur(_table[ushort.MinValue], _numericRoundAbort)) { 196 CurrentErrorCount++; 197 return true; 198 } else { 199 return false; 200 } 201 } 202 // MUST ensure _error is Some before calling! 203 bool TruncateOverflowCheckAndLog() { 204 205 if (_current.Field0.IsNULL) { 206 207 if (!_table[ushort.MinValue].IsNullable) { 208 CurrentErrorCount++; 209 _ = _current.Ser(ref _ser); 210 _ = _errs.Push(new(_truncationStackTrace, new($"NULLs are not allowed in {_table.IntoString()}.{_table[ushort.MinValue].Name}."), varbinary.New(_ser.SerializedData))); 211 _ = WriteErrors(in _error, ref _errs, _processName, _userName, 4095); 212 _ = _ser.Reset(); 213 return true; 214 } else { 215 return false; 216 } 217 } else if (_current.Field0.TruncationWillOccur(_table[ushort.MinValue], _numericRoundAbort)) { 218 CurrentErrorCount++; 219 _ = _current.Ser(ref _ser); 220 _ = _errs.Push(new(_truncationStackTrace, new($"{_current.Field0.Into()} would truncate or overflow in {_table.IntoString()}.{_table[ushort.MinValue].Name}."), varbinary.New(_ser.SerializedData))); 221 _ = WriteErrors(in _error, ref _errs, _processName, _userName, 4095); 222 _ = _ser.Reset(); 223 return true; 224 } else { 225 return false; 226 } 227 } 228 #endregion 229 230 #region Operators 231 #endregion 232 233 #region Types 234 #endregion 235 } 236 #endregion 237 238 #region Namespaces 239 #endregion 240 } 241 #endregion