作为技术人,很忌讳对某个问题浅尝辄止,一知半解,特别是经常使用的技术。Spring 作为经久不衰的 Java 框架,自然少不了对
事务
的支持。那么,关于Spring 的事务
,你了解多少呢?如果只是知道在方法上加一个@Transactional
注解就可以支持事务,或者说只是简单地知道Spring 声明式事务
的背后原理是AOP
,恐怕还不够。今天我们就一起深入了解下 Spring 的事务。
Spring 中开启事务有两种方式,一种是 声明式事务,另一种是编程式事务。
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void test() {
// 业务逻辑
}
使用 TransactionTemplate
等类和 API 手动管理事务,控制事务的新建、提交、回滚等过程。
@Resource
private TransactionTemplate transactionTemplate;
@Transactional
public void test() {
transactionTemplate.executeWithoutResult(status -> {
// 业务逻辑
if (something not right) {
// 回滚
status.setRollbackOnly();
}
});
}
@Resource
private PlatformTransactionManager transactionManager;
@Transactional
publicvoidtest() {
// 定义事务
TransactionDefinitiontransactionDefinition=newDefaultTransactionDefinition();
// 获取事务状态
TransactionStatustransactionStatus= transactionManager.getTransaction(transactionDefinition);
try {
// 业务操作
// 提交事务
transactionManager.commit(transactionStatus);
} catch (Exception e) {
// 异常回滚事务
transactionManager.rollback(transactionStatus);
throw e;
}
}
上面的两种事务实现方式,明显声明式事务
更简单
,更简洁
,对业务也没有侵入性
。但为什么说不建议
用声明式事务呢?
当然这个也不是笔者建议的,而是阿里巴巴《Java开发手册 v1.5.0 华山版》 建议的。
为什么它要这么建议呢? 答案肯定是声明式事务
存在着让人难以忽略的缺点。
首先最容易想到的应该是事务的粒度问题。
因为声明式事务
能控制的最小粒度
是方法
,整个方法都包含在事务内,但有时这并不是我们想要的,我们希望事务粒度能更小一点,比如说只有某几行数据库操作才需要事务。
比如,如果我们将 RPC调用
放入事务方法中,如果事务提交失败
,数据库最后回滚,但是RPC调用
无法回滚,这就导致了严重的数据不一致
问题。
又比如,我们在事务方法中加入太多耗时操作
,例如文件操作,更新缓存,发送消息等,就会让事务变成一个长事务
,数据库连接
会长时间地被占用,就可能导致数据库连接池耗尽
,也更容易产生死锁
问题。
更为关键的是,由于这种事务注解很容易被人忽略,并且还存在方法嵌套
,所以上面的问题很容易“防不慎防”
。相反,使用编程式事务
,能让开发者很清楚地明确事务的边界
。
另外一个比较重要的问题就是,由于开发者的疏忽
或者技术水平的原因,可能会导致声明式事务失效
。
在讲 事务失效 之前,我们需要简单了解一下 Spring 声明式事务
的原理是什么,这里后面会深入探究,这里先简单概述一下:
Spring 声明式事务通过
AOP
实现,基于@Transactional
注解和事务管理器
。Spring使用代理模式(JDK动态代理
或CGLIB
)拦截带有@Transactional
的方法调用,在方法执行前获取事务配置,启动事务;若方法成功执行,提交事务;若发生异常,根据配置回滚事务。这些流程由TransactionInterceptor
调用PlatformTransactionManager
完成,从而实现自动管理事务边界
。
既然 Spring声明式事务
的实现依赖于 AOP
,那么按照道理说,所有能到导致 AOP 失效
的情况也都会导致声明式事务事务实效
。
那么又是哪些情况会导致 AOP 失效
呢?
这就要继续追问,AOP
的原理是什么?
Spring
中AOP
的原理就是 JDK动态代理
和 CGLIB 代理
,有接口的使用 JDK动态代理
,没接口的使用 CGLIB 代理
。
所以,以下情况不走代理
或者无法代理
的情况会导致 AOP 失效
,从而导致 事务失效
。
类内部方法调用
直接调用原始对象
,根本不涉及代理对象
,所以事务必然失效,但是也可以通过依赖注入自己来规避。非public方法
、final方法
、静态方法
,所以这些方法上的事务也会失效。除了上面讲到的那些AOP
的原因,还有一些跟 Spring 事务属性配置
相关的。
其实上面讲的都是可能因为开发人员
人为疏忽
导致的事务问题,但是正是因为谁也不能保证开发人员不会犯错,水平极高,所以只能通过一些开发规范来尽量规避这些问题,使用编程式事务
,就能够大大减少上述问题的发生。当然,这也不是说声明式事务完全不能用,只是说不能滥用
。
其实刚才还漏讲了一个可能导致事务失效
的原因:多线程
,跨线程的事务管理
Spring 的声明式事务
也是不支持的。
这里,我们就要开始从声明式事务的原理,也就是它是怎么实现的开始讲起了。
我们在上文中简单提到了 AOP
和 事务管理器 - TransactionManager
。
我们可以从源码获取更多细节。
public abstractclassTransactionAspectSupport {
// 核心方法: 在事务环境中执行目标方法
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, InvocationCallback invocation)throws Throwable {
// 1. 获取事务属性
TransactionAttributeSourcetas= getTransactionAttributeSource();
TransactionAttributetxAttr= (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 2. 确定事务管理器
TransactionManagertm= determineTransactionManager(txAttr, targetClass);
// 3. 处理响应式事务
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager rtm) {
// 处理响应式事务逻辑...
return handleReactiveTransaction(/*...*/);
}
// 4. 处理普通事务 使用 PlatformTransactionManager
PlatformTransactionManagerptm= asPlatformTransactionManager(tm);
StringjoinpointIdentification= methodIdentification(method, targetClass, txAttr);
// 5. 执行事务处理
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 标准事务处理流程:
TransactionInfotxInfo= createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
ObjectretVal=null;
try {
// 执行目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// 异常回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
}
}
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
// 回调式事务处理...
}
}
}
从上面我们可以很清晰地看到整体事务处理流程。
我们先从 "5. 执行事务处理"
中的很重要的一个类 TransactionInfo
说起,看下它里面存的是什么?
protected staticfinalclassTransactionInfo {
// 事务管理器
privatefinal PlatformTransactionManager transactionManager;
// 事务属性(传播行为、隔离级别等配置)
privatefinal TransactionAttribute transactionAttribute;
// 方法标识(用于日志)
privatefinal String joinpointIdentification;
// 当前事务状态
private TransactionStatus transactionStatus;
// 父事务信息
private TransactionInfo oldTransactionInfo;
}
TransactionInfo
存储的就是事务信息
,它的主要作用是:
事务管理器
、事务属性配置
、事务状态
等不过,这个事务信息 TransactionInfo
保存在哪里呢?这是一个很重要的问题。
我们来梳理下 TransactionInfo
的需求:
事务上下文
,事务跟当前调用线程息息相关。传播属性
(REQUIRED、REQUIRES_NEW等),也就是同一线程内的方法调用应该可以很方便地访问当前事务信息,从而决定是否新建事务,换句话说,也就是 事务可以嵌套传播
。在方法调用链中显式传递事务信息
。从上面的需求,我们自然而然地想到了Java中的 ThreadLocal
,它用来保存事务信息再合适不过,事实也的确如此。
我们看下 prepareTransactionInfo() -> bindToThread()
,其实就是将当前事务信息 TransactionInfo
保存到 ThreadLocal
中。
// 父事务信息
private TransactionInfo oldTransactionInfo;
// 保存当前调用线程的事务信息
privatestaticfinal ThreadLocal<TransactionInfo> transactionInfoHolder =
newNamedThreadLocal<>("Current aspect-driven transaction");
// 将当前事务,放入事务上下文 ThreadLocal
privatevoidbindToThread() {
// 暂存父事务到 oldTransactionInfo
this.oldTransactionInfo = transactionInfoHolder.get();
// 保存当前事务到 ThreadLocal
transactionInfoHolder.set(this);
}
这里我们也就明白了为什么一开始说,多线程也会导致声明式事务失效
,因为 ThreadLocal
保存的内容不能跨线程
。
接着我们继续回到 TransactionInfo
,它里面其它几个属性非常好理解,但是其中 oldTransactionInfo
让人觉得有点奇怪,它到底有什么用?其实它非常重要
,具有特殊作用。
我们先看下面这个场景:
@Transactional
publicvoidouter() {
// 创建 TransactionInfo1
inner(); // 调用内层事务方法
// 恢复到 TransactionInfo1
}
@Transactional
publicvoidinner() {
// 创建 TransactionInfo2
// oldTransactionInfo 指向 TransactionInfo1
// 方法结束时恢复到 TransactionInfo1
}
看到这里是不是恍然大悟,在这种方法嵌套中,要怎么处理父事务
和子事务
?答案就在 oldTransactionInfo
属性。
private void bindToThread() {
// 取出 transactionInfoHolder(ThreadLocal) 中的父事务 TransactionInfo1 到 oldTransactionInfo
this.oldTransactionInfo = transactionInfoHolder.get();
// 将新事务 TransactionInfo2 保存到 transactionInfoHolder
transactionInfoHolder.set(this);
}
private void restoreThreadLocalStatus() {
// 处理完了 TransactionInfo2,恢复之前保存的父事务 TransactionInfo1
transactionInfoHolder.set(this.oldTransactionInfo);
}
嵌套事务
场景中,保存外层事务的信息。这样确保内层事务执行完成后,可以恢复到外层事务的上下文。ThreadLocal
中事务信息 TransactionInfo
的完整性链式结构
,支持多层事务嵌套
。接着我们再回到入口处,看下 createTransactionIfNecessary
方法,这个方法也比较关键,从中可以看出我们的事务是如何创建的,以及在注解中设置的 propagation
属性在这里会起什么作用。
protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm,
TransactionAttribute txAttr, final String joinpointIdentification) {
// 如果事务没有指定名称,使用方法名称作为事务名称
if (txAttr != null && txAttr.getName() == null) {
txAttr = newDelegatingTransactionAttribute(txAttr)....
}
// 从事务管理器获取事务状态
TransactionStatusstatus=null;
// 事务属性不为空
if (txAttr != null) {
//存在事务管理器
if (tm != null) {
// 根据指定的传播行为,返回当前活跃的事务或者新建一个事务
status = tm.getTransaction(txAttr);
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
我们先跳过
PlatformTransactionManager#getTransaction
先看下
TransactionAspectSupport#prepareTransactionInfo
protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, String joinpointIdentification,
@Nullable TransactionStatus status) {
// 1. 创建事务信息对象
TransactionInfotxInfo=newTransactionInfo(tm, txAttr, joinpointIdentification);
if (txAttr != null) {
// 2. 如果有事务属性配置,说明需要事务
// 3. 设置事务状态
txInfo.newTransactionStatus(status);
}
else {
// 4. 没有事务属性,说明不需要事务
}
// 5. 重要:总是绑定到ThreadLocal
txInfo.bindToThread();
return txInfo;
}
prepareTransactionInfo
主要就是创建 TransactionInfo
对象(包含:事务管理器
、事务属性
、方法标识符
等),维护事务状态
,并且绑定到 ThreadLocal
。
现在继续深入到
PlatformTransactionManager#getTransaction(TransactionDefinition)
PlatformTransactionManager
其实是个接口,所以要看继承它的抽象类:
AbstractPlatformTransactionManager#getTransaction
public final TransactionStatus getTransaction(TransactionDefinition definition)
throws TransactionException {
// 如果没有事务定义使用默认值
TransactionDefinitiondef= (definition != null ? definition : TransactionDefinition.withDefaults());
// 获取事务,不同的ORM框架(JDBC、JPA、Hibernate等)获取事务的方式可能不同,交由子类去实现
Objecttransaction= doGetTransaction();
// 如果当前已经存在事务,并且该事务处于活跃状态
if (isExistingTransaction(transaction)) {
// 根据设置的传播行为决定如何处理
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 为新事务检查超时时间设置
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
thrownewInvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 当前不存在事务,根据设置的传播行为决定如何处理,如果设定为 PROPAGATION_MANDATORY,抛出异常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
thrownewIllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 如果是其它的,开启一个新事务
elseif (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHoldersuspendedResources= suspend(null);
try {
// 开启新事务
return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
// 恢复挂起资源
resume(null, suspendedResources);
throw ex;
}
}
else {
// 创建“空”事务:没有实际事务,但可能会有事务同步器。
booleannewSynchronization= (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
接着看看 handleExistingTransaction
,看下如果当前事务上下文
存在活跃事务
会如何处理。
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 如果传播行为是 PROPAGATION_NEVER,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
thrownewIllegalTransactionStateException...
}
// 如果传播行为是 PROPAGATION_NOT_SUPPORTED,直接在非事务中运行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 挂起当前事务
ObjectsuspendedResources= suspend(transaction);
booleannewSynchronization= (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 第二个参数传递null,表示不需要事务
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 如果是 PROPAGATION_REQUIRES_NEW,
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
// 挂起当前事务
SuspendedResourcesHoldersuspendedResources= suspend(transaction);
try {
// 开启新事务
return startTransaction(definition, transaction, false, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
// 出现异常,恢复挂起事务
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 省略
}
// PROPAGATION_REQUIRED, PROPAGATION_SUPPORTS, PROPAGATION_MANDATORY:
// 原事务是否有效
if (isValidateExistingTransaction()) {
// 新加入的事务必须与原事务隔离级别相同 否则报错
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
IntegercurrentIsolationLevel= TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
thrownewIllegalTransactionStateException...
}
}
// 当前事务非只读事务 但是已经存在的事务是只读 报错
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
thrownewIllegalTransactionStateException...
}
}
}
// 事务同步器:提供了事务生命周期的钩子方法 用于资源管理、状态清理、监控等场景
booleannewSynchronization= (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 根据给定参数创建新的事务状态
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
可以看出
AbstractPlatformTransactionManager#getTransaction
会根据不同的事务传播行为
做出不同行为,创建新事务,或者使用现有事务,或者挂起当前事务,或者抛出异常等。
就比如PROPAGATION_REQUIRED
和 PROPAGATION_REQUIRES_NEW
的在处理上区别如下:
不挂起
当前事务不创建
新事务TransactionStatus
中 transaction
为 现有事务
挂起
当前事务创建
新事务TransactionStatus
中 transaction
为 新事务
这里的返回值 TransactionStatus
其实就是 事务状态
,会作为参数传给上面的 prepareTransactionInfo()
方法。
public classDefaultTransactionStatusextendsAbstractTransactionStatus {
// 事务名称
privatefinal String transactionName;
// 事务
privatefinal Object transaction;
// 是否是新事务
privatefinalboolean newTransaction;
// 是否是新的事务同步器
privatefinalboolean newSynchronization;
// 是否嵌套
privatefinalboolean nested;
// 是否只读
privatefinalboolean readOnly;
// debug标记
privatefinalboolean debug;
// 事务挂起暂存的资源
privatefinal Object suspendedResources;
}
TransactionAspectSupport
:事务切面支持类PlatformTransactionManager
:事务管理器TransactionInfo
:事务信息载体ThreadLocal
:事务上下文存储器整体流程图:
流程说明:
AOP
拦截带有 @Transactional
注解的方法
,调用 TransactionAspectSupport.invokeWithinTransaction()
。TransactionAttribute
)PlatformTransactionManager
)TransactionInfo
并保存事务状态传播行为
决定是否创建新事务或者挂起当前事务ThreadLocal
存储事务信息事务嵌套
(通过 oldTransactionInfo
)try {
// 执行业务方法
retVal = invocation.proceedWithInvocation();
// 提交事务
commitTransactionAfterReturning();
} catch (Exception ex) {
// 回滚事务
completeTransactionAfterThrowing();
throw ex;
} finally {
// 清理事务信息
cleanupTransactionInfo();
}
最后,我们出一个场景题来看下你对刚才的事务的传播行为
的理解。
@Service
publicclassDemoServiceA {
@Resource
private DemoServiceB demoServiceB;
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
publicvoida() {
// 步骤A:写数据库1
// 步骤B:写数据库2
// 步骤C:调用 DemoServiceB.b() 方法读数据
vardata= demoServiceB.b();
}
}
@Service
publicclassDemoServiceB {
@Transactional(rollbackFor = Exception.class, propagation = Propagation.?)
public Object b() {
// read data from db
}
}
在上面的场景中,demoServiceB
的 b()
方法的传播行为-propagation
应该设置为什么呢?
答案是应该设置为:
Propagation.NOT_SUPPORTED
因为这样的话,如果步骤C
读取数据
失败,不会导致步骤A
和 步骤B
中数据修改回滚
。
那如果是下面这样呢?
调用 demoServiceB.b()
步骤在中间。
@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void a() {
// 步骤A:写数据库1
// 步骤B:调用 DemoServiceB.b() 方法读数据
var data = demoServiceB.b();
// 步骤C:写数据库2
}
答案是应该设置为:
Propagation.REQUIRED
因为这样的话,如果步骤B
读取数据失败,步骤C
还没开始,步骤A
修改了的数据应该回滚的。
希望到这里,你对 Spring 的事务原理和事务的使用有更多的认识。
欢迎关注我的公众号“子安聊代码”,一起探讨技术。