public class TransactionBehavior<TDbContext, TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse> where TDbContext : EFContext
{
ILogger _logger;
TDbContext _dbContext;
ICapPublisher _capBus;
public TransactionBehavior(TDbContext dbContext, ICapPublisher capBus, ILogger logger)
{
_dbContext = dbContext ?? throw new ArgumentNullException(nameof(dbContext));
_capBus = capBus ?? throw new ArgumentNullException(nameof(capBus));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task<TResponse> Handle(TRequest request, CancellationToken cancellationToken, RequestHandlerDelegate<TResponse> next)
{
var response = default(TResponse);
var typeName = request.GetGenericTypeName();
try
{
// 首先判断当前是否有开启事务
if (_dbContext.HasActiveTransaction)
{
return await next();
}
// 定义了一个数据库操作执行的策略,比如说可以在里面嵌入一些重试的逻辑,这里创建了一个默认的策略
var strategy = _dbContext.Database.CreateExecutionStrategy();
await strategy.ExecuteAsync(async () =>
{
Guid transactionId;
using (var transaction = await _dbContext.BeginTransactionAsync())
using (_logger.BeginScope("TransactionContext:{TransactionId}", transaction.TransactionId))
{
_logger.LogInformation("----- 开始事务 {TransactionId} ({@Command})", transaction.TransactionId, typeName, request);
response = await next();// next 实际上是指我们的后续操作,这里的模式有点像之前讲的中间件模式
_logger.LogInformation("----- 提交事务 {TransactionId} {CommandName}", transaction.TransactionId, typeName);
await _dbContext.CommitTransactionAsync(transaction);
transactionId = transaction.TransactionId;
}
});
return response;
}
catch (Exception ex)
{
_logger.LogError(ex, "处理事务出错 {CommandName} ({@Command})", typeName, request);
throw;
}
}
}