Когда мы помещаем много сообщений на шину, и они вызывают процесс в нашей бизнес-логике, возникает эта ошибка:
Время ожидания истекло до получения соединения из пула. Это могло произойти из-за того, что все соединения в пуле использовались и был достигнут максимальный размер пула.
Этого не происходит, когда, скажем, 15 сообщений вызывают наш процесс. Однако это происходит, когда вызывается 80 или 130 процессов.
Мы используем шаблон единицы работы, и соединение закрывается после использования. Поэтому я изо всех сил пытаюсь понять, почему он не будет доступен в пуле для следующего процесса.
Вот как единица работы используется в нашем приложении:
using (var uow = _uowFactory.Create(true))
{
await uow.AccrualRepo.AddAccrualHistoriesAsync(histories);
await uow.CommitAsync();
}
И вот как фабрика возвращает uow:
public class UnitOfWorkFactory : IUnitOfWorkFactory
{
private readonly IConfiguration _configuration;
private readonly IMediator _mediator;
private readonly IStateAccessor _stateAccessor;
private readonly ITimeProvider _timeProvider;
private readonly IDbConnection _connection;
private readonly IAccrualMapper _accrualMapper;
private readonly ILogger<RepoBase> _logger;
public UnitOfWorkFactory(IConfiguration configuration, IDbConnection sqlConnection, IMediator mediator,
IStateAccessor stateAccessor, ITimeProvider timeProvider, IAccrualMapper accrualMapper, ILogger<RepoBase> logger)
{
_configuration = configuration;
_mediator = mediator;
_stateAccessor = stateAccessor;
_timeProvider = timeProvider;
_connection = sqlConnection;
_accrualMapper = accrualMapper;
_logger = logger;
}
public IUnitOfWork Create(bool useTransaction)
{
return new UnitOfWork(_configuration, _connection, _mediator, _stateAccessor, _timeProvider, _accrualMapper, _logger, useTransaction);
}
Наш файл Startup.cs
устанавливает их для внедрения зависимостей следующим образом:
services.AddTransient<IUnitOfWorkFactory, UnitOfWorkFactory>();
services.AddTransient<IDbConnection, SqlConnection>();
Теперь это много кода, но наш uow выглядит так. Обратите внимание, что соединение закрывается после вызова CommitAsync()
и при удалении.
public class UnitOfWork : IUnitOfWork, IDisposable
{
private readonly IConfiguration _configuration;
private readonly IMediator _mediator;
private readonly IStateAccessor _stateAccessor;
private readonly ITimeProvider _timeProvider;
private readonly IAccrualMapper _accrualMapper;
private readonly ILogger<RepoBase> _logger;
private IDbConnection _connection;
private IDbTransaction _transaction;
private IAccrualRepo _accrualRepo;
private bool _disposed;
private bool _commitOccurred;
private bool _useTransaction;
public UnitOfWork(IConfiguration configuration, IDbConnection sqlConnection, IMediator mediator,
IStateAccessor stateAccessor, ITimeProvider timeProvider, IAccrualMapper accrualMapper,
ILogger<RepoBase> logger, bool useTransaction = true)
{
_configuration = configuration;
_mediator = mediator;
_stateAccessor = stateAccessor;
_timeProvider = timeProvider;
_useTransaction = useTransaction;
_accrualMapper = accrualMapper;
_logger = logger;
_connection = sqlConnection;
_connection.ConnectionString = _configuration["ConnectionString"];
_connection.Open();
if (useTransaction)
{
_transaction = _connection.BeginTransaction();
}
}
public IAccrualRepo AccrualRepo
{
get => _accrualRepo ?? (_accrualRepo = new AccrualRepo(_configuration, _connection,
_transaction, _stateAccessor, _timeProvider, _mediator, _logger));
set => _accrualRepo = value;
}
public async Task CommitAsync()
{
if (!_useTransaction)
{
throw new InvalidOperationException("Attempting to call commit on a unit of work that isn't using a transaction");
}
try
{
_transaction.Commit();
_commitOccurred = true;
await InvokePostCommitOnReposAsync();
}
catch
{
_transaction.Rollback();
throw;
}
finally
{
_connection.Close();
_transaction.Dispose();
ResetRepositories();
}
}
private async Task InvokePostCommitOnReposAsync()
{
var repos = new List<RepoBase>();
if (_accrualRepo != null) { repos.Add((RepoBase)_accrualRepo); }
try
{
foreach (var repo in repos)
{
await repo.PostCommitAsync();
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Exception occurred while invoking post commit on a repo.");
}
}
private void ResetRepositories()
{
_accrualRepo = null; // Note: there are more repos here, but removed for clarity.
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this); // Already disposed; no need for the GC to finalize.
}
protected virtual void Dispose(bool calledFromDisposeAndNotFromFinalizer)
{
if (_disposed) { return; }
if (calledFromDisposeAndNotFromFinalizer)
{
// If the user never called commit, but we are using a transaction, then roll back.
if (!_commitOccurred && _useTransaction && _transaction != null) { _transaction.Rollback(); }
if (_transaction != null) { _transaction.Dispose(); _transaction = null; }
if (_connection != null) { _connection.Dispose(); _connection = null; }
}
_disposed = true;
}
}
Так почему же у нас возникла эта проблема с пулом соединений? Здесь что-то делается неправильно? Может быть, нам нужно увеличить размер пула соединений?