一、概述
1.1 背景介绍
在我担任某互联网金融平台SRE期间,曾遇到过一次严重的线上事故:凌晨3点,监控系统疯狂告警,数据库活跃连接数从平时的200飙升到2000,大量请求超时。紧急排查后发现,一个批量更新任务与在线交易产生了死锁,导致数据库连接被占满。
这次事故持续了40分钟,影响了上万名用户的交易。事后复盘发现,问题根源是开发团队对MySQL事务和锁机制理解不足,写出了容易产生死锁的代码。
从那以后,我花了大量时间研究MySQL的事务和锁机制,并总结出一套完整的排查和预防方法。本文将系统性地讲解MySQL事务的ACID特性、锁的工作原理,以及死锁的排查和解决方案。
1.2 技术特点
MySQL InnoDB存储引擎的事务和锁机制具有以下特点:
ACID事务特性
Atomicity(原子性):事务是不可分割的工作单位
Consistency(一致性):事务执行前后数据保持一致
Isolation(隔离性):并发事务之间相互隔离
Durability(持久性):事务提交后数据永久保存
多粒度锁机制
行锁:锁定单行记录,并发度高
间隙锁:锁定索引间隙,防止幻读
表锁:锁定整张表,开销小但并发度低
意向锁:表级锁,用于协调行锁和表锁
MVCC多版本并发控制
读不阻塞写,写不阻塞读
通过undo log实现一致性读
支持多种隔离级别
1.3 适用场景
| 场景类型 | 隔离级别 | 锁策略 | 典型应用 |
|---|---|---|---|
| 高并发读写 | READ COMMITTED | 最小化锁范围 | 电商订单 |
| 金融交易 | REPEATABLE READ | 行锁+间隙锁 | 转账、支付 |
| 报表统计 | READ COMMITTED | 快照读 | 数据分析 |
| 库存扣减 | REPEATABLE READ | SELECT FOR UPDATE | 秒杀系统 |
| 批量更新 | READ COMMITTED | 分批提交 | 数据迁移 |
1.4 环境要求
| 组件 | 版本要求 | 说明 |
|---|---|---|
| MySQL | 8.0.35+ / 8.4 LTS | 本文基于8.0.35版本 |
| 操作系统 | Rocky Linux 9 / Ubuntu 24.04 | 推荐Rocky Linux 9 |
| 存储引擎 | InnoDB | 必须使用InnoDB |
| 内存 | 16GB+ | 足够的缓冲池空间 |
关键配置要求:
-- 查看InnoDB相关配置 SHOW VARIABLES LIKE 'innodb%'; -- 关键配置 innodb_buffer_pool_size = 8G -- 缓冲池大小 innodb_lock_wait_timeout = 50 -- 锁等待超时(秒) innodb_deadlock_detect = ON -- 开启死锁检测 innodb_print_all_deadlocks = ON -- 打印所有死锁信息 transaction_isolation = REPEATABLE-READ -- 默认隔离级别
二、详细步骤
2.1 准备工作
2.1.1 ACID特性深入理解
原子性(Atomicity)
事务中的所有操作要么全部成功,要么全部失败回滚。MySQL通过undo log实现原子性。
-- 原子性示例:转账操作 STARTTRANSACTION; -- 操作1:扣减转出账户余额 UPDATE accounts SET balance = balance - 1000WHERE user_id = 1; -- 操作2:增加转入账户余额 UPDATE accounts SET balance = balance + 1000WHERE user_id = 2; -- 如果两个操作都成功,提交事务 COMMIT; -- 如果任一操作失败,回滚事务 -- ROLLBACK; -- 原子性保证: -- 1. 要么两个账户都更新成功 -- 2. 要么两个账户都保持原状 -- 不会出现钱扣了但没有到账的情况
一致性(Consistency)
事务执行前后,数据库从一个一致状态转换到另一个一致状态。
-- 一致性示例:确保总金额不变 -- 假设系统中只有两个账户,总金额应该始终为10000 -- 事务前检查 SELECTSUM(balance) FROM accounts; -- 结果:10000 STARTTRANSACTION; UPDATE accounts SET balance = balance - 1000WHERE user_id = 1; UPDATE accounts SET balance = balance + 1000WHERE user_id = 2; COMMIT; -- 事务后检查 SELECTSUM(balance) FROM accounts; -- 结果仍然:10000 -- 一致性由应用程序和数据库约束共同保证 -- 比如:CHECK约束、外键约束、触发器等
隔离性(Isolation)
并发执行的事务之间相互隔离,一个事务的中间状态对其他事务不可见。
-- 隔离性示例:并发读写 -- 会话1 STARTTRANSACTION; UPDATE products SET stock = stock - 1WHEREid = 1; -- 此时还未提交 -- 会话2 SELECT stock FROM products WHEREid = 1; -- 根据隔离级别,可能看到更新前或更新后的值 -- MySQL默认使用REPEATABLE READ隔离级别 -- 会话2看到的是事务开始时的快照,即更新前的值
持久性(Durability)
事务一旦提交,其结果就是永久性的,即使系统崩溃也不会丢失。
-- 持久性由redo log保证 -- 事务提交时,redo log会刷入磁盘 -- 相关配置 SHOWVARIABLESLIKE'innodb_flush_log_at_trx_commit'; -- innodb_flush_log_at_trx_commit = 1(默认) -- 每次事务提交都将redo log刷入磁盘 -- 最安全但性能略低 -- innodb_flush_log_at_trx_commit = 2 -- 每次提交写入OS缓存,每秒刷盘 -- 性能好,但断电可能丢失1秒数据 -- innodb_flush_log_at_trx_commit = 0 -- 每秒写入OS缓存并刷盘 -- 性能最好,但可能丢失1秒数据
2.1.2 事务隔离级别
MySQL支持四种隔离级别,解决不同的并发问题:
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 性能 |
|---|---|---|---|---|
| READ UNCOMMITTED | 可能 | 可能 | 可能 | 最高 |
| READ COMMITTED | 不可能 | 可能 | 可能 | 高 |
| REPEATABLE READ | 不可能 | 不可能 | InnoDB防止 | 中 |
| SERIALIZABLE | 不可能 | 不可能 | 不可能 | 最低 |
-- 查看当前隔离级别 SELECT @@transaction_isolation; -- 或 SHOWVARIABLESLIKE'transaction_isolation'; -- 设置会话隔离级别 SETSESSIONTRANSACTIONISOLATIONLEVELREAD COMMITTED; -- 设置全局隔离级别(需要重连生效) SETGLOBALTRANSACTIONISOLATIONLEVELREAD COMMITTED; -- 在配置文件中设置 -- [mysqld] -- transaction-isolation = READ-COMMITTED
脏读演示
-- 会话1(设置为READ UNCOMMITTED) SETSESSIONTRANSACTIONISOLATIONLEVELREAD UNCOMMITTED; STARTTRANSACTION; -- 会话2 STARTTRANSACTION; UPDATE accounts SET balance = 500WHERE user_id = 1; -- 未提交 -- 会话1 SELECT balance FROM accounts WHERE user_id = 1; -- 结果:500(读到了未提交的数据,即脏读) -- 会话2 ROLLBACK; -- 回滚 -- 会话1再次查询 SELECT balance FROM accounts WHERE user_id = 1; -- 结果可能是原来的值,之前读到的500是"脏数据"
不可重复读演示
-- 会话1(READ COMMITTED级别) SETSESSIONTRANSACTIONISOLATIONLEVELREAD COMMITTED; STARTTRANSACTION; SELECT balance FROM accounts WHERE user_id = 1; -- 结果:1000 -- 会话2 UPDATE accounts SET balance = 500WHERE user_id = 1; COMMIT; -- 会话1再次查询 SELECT balance FROM accounts WHERE user_id = 1; -- 结果:500(同一事务内两次读取结果不同,即不可重复读) COMMIT;
幻读演示
-- 会话1(即使REPEATABLE READ也可能有幻读场景) STARTTRANSACTION; SELECTCOUNT(*) FROM orders WHERE user_id = 1; -- 结果:10 -- 会话2 INSERTINTO orders (user_id, amount) VALUES (1, 100); COMMIT; -- 会话1使用当前读 SELECTCOUNT(*) FROM orders WHERE user_id = 1FORUPDATE; -- 结果:11(看到了新插入的行,即幻读) -- 注意:InnoDB的REPEATABLE READ通过间隙锁很大程度上防止了幻读 -- 但在某些边界情况下仍可能发生
2.1.3 创建测试环境
-- 创建测试数据库
CREATEDATABASEIFNOTEXISTS lock_demo;
USE lock_demo;
-- 创建账户表
CREATETABLE accounts (
idBIGINTUNSIGNED AUTO_INCREMENT PRIMARY KEY,
user_id BIGINTUNSIGNEDNOTNULL,
balance DECIMAL(15,2) NOTNULLDEFAULT0.00,
versionINTUNSIGNEDNOTNULLDEFAULT0, -- 乐观锁版本号
created_at DATETIME DEFAULTCURRENT_TIMESTAMP,
updated_at DATETIME DEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
UNIQUEKEY uk_user_id (user_id)
) ENGINE=InnoDB;
-- 创建订单表
CREATETABLE orders (
idBIGINTUNSIGNED AUTO_INCREMENT PRIMARY KEY,
order_no VARCHAR(32) NOTNULL,
user_id BIGINTUNSIGNEDNOTNULL,
amount DECIMAL(10,2) NOTNULL,
statusTINYINTDEFAULT0,
created_at DATETIME DEFAULTCURRENT_TIMESTAMP,
UNIQUEKEY uk_order_no (order_no),
INDEX idx_user_id (user_id),
INDEX idx_status (status),
INDEX idx_user_status (user_id, status)
) ENGINE=InnoDB;
-- 创建库存表
CREATETABLE inventory (
idBIGINTUNSIGNED AUTO_INCREMENT PRIMARY KEY,
product_id BIGINTUNSIGNEDNOTNULL,
stock INTUNSIGNEDNOTNULLDEFAULT0,
versionINTUNSIGNEDNOTNULLDEFAULT0,
UNIQUEKEY uk_product_id (product_id)
) ENGINE=InnoDB;
-- 插入测试数据
INSERTINTO accounts (user_id, balance) VALUES
(1, 10000.00), (2, 5000.00), (3, 3000.00);
INSERTINTO inventory (product_id, stock) VALUES
(1001, 100), (1002, 200), (1003, 50);
-- 生成订单测试数据
INSERTINTO orders (order_no, user_id, amount, status)
SELECT
CONCAT('ORD', LPAD(seq, 10, '0')),
FLOOR(RAND() * 3) + 1,
ROUND(RAND() * 1000, 2),
FLOOR(RAND() * 5)
FROM (
SELECT @row := @row + 1as seq FROM
(SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t1,
(SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t2,
(SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t3,
(SELECT @row := 0) r
) seq_table;
2.2 核心配置
2.2.1 InnoDB锁类型详解
1. 共享锁(S锁)和排他锁(X锁)
-- 共享锁(S锁):允许其他事务读,但不允许写 SELECT * FROM accounts WHERE user_id = 1LOCKINSHAREMODE; -- MySQL 8.0 新语法 SELECT * FROM accounts WHERE user_id = 1FORSHARE; -- 排他锁(X锁):不允许其他事务读写(当前读除外) SELECT * FROM accounts WHERE user_id = 1FORUPDATE; -- 锁兼容性矩阵 -- | | S锁 | X锁 | -- | S锁 | 兼容 | 冲突 | -- | X锁 | 冲突 | 冲突 |
2. 意向锁(IS/IX锁)
-- 意向锁是表级锁,用于表明事务稍后会在表中的行上加什么类型的锁 -- 意向共享锁(IS):事务准备给数据行加共享锁 -- 意向排他锁(IX):事务准备给数据行加排他锁 -- 查看意向锁 SELECT * FROM performance_schema.data_locks WHERE LOCK_TYPE = 'TABLE'; -- 意向锁的作用: -- 加表锁时,不需要遍历每一行来检查是否有行锁 -- 只需检查意向锁即可 -- 兼容性矩阵: -- | | IS | IX | S | X | -- | IS | 兼容 | 兼容 | 兼容 | 冲突 | -- | IX | 兼容 | 兼容 | 冲突 | 冲突 | -- | S | 兼容 | 冲突 | 兼容 | 冲突 | -- | X | 冲突 | 冲突 | 冲突 | 冲突 |
3. 记录锁(Record Lock)
-- 记录锁锁定索引记录 -- 如果表没有索引,InnoDB会创建隐藏的聚簇索引,并使用该索引进行记录锁定 STARTTRANSACTION; -- 锁定id=1的记录 SELECT * FROM accounts WHEREid = 1FORUPDATE; -- 此时其他事务无法修改id=1的行 -- 查看记录锁 SELECT * FROM performance_schema.data_locks WHERE LOCK_TYPE = 'RECORD'AND LOCK_MODE = 'X,REC_NOT_GAP';
4. 间隙锁(Gap Lock)
-- 间隙锁锁定索引记录之间的间隙,防止其他事务插入 -- 只在REPEATABLE READ及以上隔离级别生效 -- 假设accounts表中有id: 1, 5, 10 STARTTRANSACTION; SELECT * FROM accounts WHEREidBETWEEN3AND7FORUPDATE; -- 这会锁定(1,5)和(5,10)的间隙 -- 其他事务无法在这些间隙中插入新记录 -- INSERT INTO accounts (id, user_id, balance) VALUES (3, 3, 1000); -- 会等待 -- 查看间隙锁 SELECT * FROM performance_schema.data_locks WHERE LOCK_TYPE = 'RECORD'AND LOCK_MODE = 'X,GAP';
5. 临键锁(Next-Key Lock)
-- 临键锁 = 记录锁 + 间隙锁 -- 锁定一个索引记录及其前面的间隙 -- 假设有id: 1, 5, 10 START TRANSACTION; SELECT * FROM accounts WHERE id = 5 FOR UPDATE; -- 在REPEATABLE READ级别,这会锁定: -- 1. 记录id=5 -- 2. 间隙(1,5) -- 临键锁是InnoDB默认的锁类型,用于防止幻读
6. 插入意向锁(Insert Intention Lock)
-- 插入意向锁是一种特殊的间隙锁 -- 多个事务可以同时获取同一间隙的插入意向锁(只要插入位置不同) -- 会话1 STARTTRANSACTION; INSERTINTO accounts (id, user_id, balance) VALUES (3, 3, 1000); -- 获取(1,5)间隙的插入意向锁,插入id=3 -- 会话2 STARTTRANSACTION; INSERTINTO accounts (id, user_id, balance) VALUES (4, 4, 2000); -- 也可以获取(1,5)间隙的插入意向锁,插入id=4 -- 两个插入可以并发执行,因为插入位置不冲突
2.2.2 锁监控配置
-- 开启锁监控 SETGLOBAL innodb_status_output = ON; SETGLOBAL innodb_status_output_locks = ON; -- 查看InnoDB状态(包含锁信息) SHOWENGINEINNODBSTATUSG -- 使用performance_schema监控锁 -- data_locks:当前持有的锁 SELECT * FROM performance_schema.data_locks; -- data_lock_waits:锁等待关系 SELECT * FROM performance_schema.data_lock_waits; -- 查看等待锁的事务 SELECT r.trx_id AS waiting_trx_id, r.trx_mysql_thread_id AS waiting_thread, r.trx_query AS waiting_query, b.trx_id AS blocking_trx_id, b.trx_mysql_thread_id AS blocking_thread, b.trx_query AS blocking_query FROM performance_schema.data_lock_waits w INNERJOIN information_schema.innodb_trx b ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID INNERJOIN information_schema.innodb_trx r ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID;
2.2.3 死锁检测配置
-- 开启死锁检测(默认开启) SETGLOBAL innodb_deadlock_detect = ON; -- 设置锁等待超时时间 SETGLOBAL innodb_lock_wait_timeout = 50; -- 默认50秒 -- 打印所有死锁信息到错误日志 SETGLOBAL innodb_print_all_deadlocks = ON; -- 配置文件设置 -- [mysqld] -- innodb_deadlock_detect = ON -- innodb_lock_wait_timeout = 10 -- innodb_print_all_deadlocks = ON
2.3 启动和验证
2.3.1 验证锁机制
-- 测试记录锁 -- 会话1 STARTTRANSACTION; SELECT * FROM accounts WHEREid = 1FORUPDATE; -- 不提交,保持锁定 -- 会话2 STARTTRANSACTION; -- 尝试更新同一行 UPDATE accounts SET balance = balance + 100WHEREid = 1; -- 此语句会等待,因为id=1被会话1锁定 -- 会话1 COMMIT; -- 提交后会话2的更新才会执行 -- 查看锁等待情况 SELECT * FROM performance_schema.data_lock_waits;
2.3.2 验证死锁检测
-- 构造死锁场景 -- 会话1 STARTTRANSACTION; UPDATE accounts SET balance = balance - 100WHEREid = 1; -- 会话2 STARTTRANSACTION; UPDATE accounts SET balance = balance - 100WHEREid = 2; -- 会话1 UPDATE accounts SET balance = balance + 100WHEREid = 2; -- 等待会话2释放id=2的锁 -- 会话2 UPDATE accounts SET balance = balance + 100WHEREid = 1; -- 等待会话1释放id=1的锁 -- 此时发生死锁! -- MySQL会检测到死锁,回滚其中一个事务 -- ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction -- 查看最近的死锁信息 SHOWENGINEINNODBSTATUSG -- 找到"LATEST DETECTED DEADLOCK"部分
三、示例代码和配置
3.1 完整配置示例
3.1.1 死锁案例分析
案例1:相反顺序更新
-- 最常见的死锁场景:两个事务以相反顺序更新行 -- 事务1:先更新A,再更新B STARTTRANSACTION; UPDATE accounts SET balance = balance - 100WHERE user_id = 1; -- 锁定user_id=1 -- 等待... UPDATE accounts SET balance = balance + 100WHERE user_id = 2; -- 需要锁定user_id=2 -- 事务2:先更新B,再更新A STARTTRANSACTION; UPDATE accounts SET balance = balance - 50WHERE user_id = 2; -- 锁定user_id=2 -- 等待... UPDATE accounts SET balance = balance + 50WHERE user_id = 1; -- 需要锁定user_id=1 -- 死锁!事务1持有A等待B,事务2持有B等待A -- 解决方案:固定更新顺序 -- 始终按user_id升序或降序更新 STARTTRANSACTION; -- 方法1:应用层排序 UPDATE accounts SET balance = balance - 100WHERE user_id = 1; UPDATE accounts SET balance = balance + 100WHERE user_id = 2; COMMIT;
案例2:间隙锁死锁
-- 间隙锁导致的死锁 -- 表中有id: 1, 10, 20 -- 事务1 STARTTRANSACTION; SELECT * FROM accounts WHEREid = 5FORUPDATE; -- 锁定间隙(1,10) -- 等待... -- 事务2 STARTTRANSACTION; SELECT * FROM accounts WHEREid = 15FORUPDATE; -- 锁定间隙(10,20) INSERTINTO accounts (id, user_id, balance) VALUES (7, 7, 1000); -- 等待事务1 -- 事务1 INSERTINTO accounts (id, user_id, balance) VALUES (12, 12, 2000); -- 等待事务2 -- 死锁! -- 解决方案: -- 1. 降低隔离级别到READ COMMITTED(不使用间隙锁) -- 2. 使用唯一索引精确匹配,避免间隙锁 -- 3. 减少锁定范围
案例3:唯一键冲突死锁
-- 唯一键冲突可能导致死锁
-- 表中已有 order_no = 'ORD001'
-- 事务1
STARTTRANSACTION;
INSERTINTO orders (order_no, user_id, amount) VALUES ('ORD002', 1, 100);
-- 事务2
STARTTRANSACTION;
INSERTINTO orders (order_no, user_id, amount) VALUES ('ORD002', 2, 200);
-- 唯一键冲突,等待事务1
-- 事务3
STARTTRANSACTION;
INSERTINTO orders (order_no, user_id, amount) VALUES ('ORD002', 3, 300);
-- 也等待
-- 事务1回滚
ROLLBACK;
-- 事务2和事务3可能死锁,因为它们都在等待锁
-- 解决方案:
-- 1. 使用INSERT ... ON DUPLICATE KEY UPDATE
-- 2. 使用INSERT IGNORE
-- 3. 先查询再插入(在应用层处理)
3.1.2 悲观锁实现
/**
* 悲观锁实现转账功能
* 使用SELECT FOR UPDATE锁定记录
*/
@Service
@Transactional
publicclass TransferService {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 转账 - 悲观锁实现
* 关键:按固定顺序获取锁,避免死锁
*/
public void transfer(Long fromUserId, Long toUserId, BigDecimal amount) {
// 按user_id排序,确保获取锁的顺序一致
Long firstUserId = Math.min(fromUserId, toUserId);
Long secondUserId = Math.max(fromUserId, toUserId);
try {
// 按顺序锁定账户
BigDecimal firstBalance = lockAndGetBalance(firstUserId);
BigDecimal secondBalance = lockAndGetBalance(secondUserId);
// 确定转出和转入账户的余额
BigDecimal fromBalance = fromUserId.equals(firstUserId) ? firstBalance : secondBalance;
BigDecimal toBalance = fromUserId.equals(firstUserId) ? secondBalance : firstBalance;
// 检查余额
if (fromBalance.compareTo(amount) < 0) {
thrownew RuntimeException("余额不足");
}
// 执行转账
updateBalance(fromUserId, fromBalance.subtract(amount));
updateBalance(toUserId, toBalance.add(amount));
} catch (Exception e) {
// 异常时事务自动回滚
thrownew RuntimeException("转账失败: " + e.getMessage(), e);
}
}
private BigDecimal lockAndGetBalance(Long userId) {
// SELECT FOR UPDATE 锁定记录
String sql = "SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE";
return jdbcTemplate.queryForObject(sql, BigDecimal.class, userId);
}
private void updateBalance(Long userId, BigDecimal newBalance) {
String sql = "UPDATE accounts SET balance = ? WHERE user_id = ?";
jdbcTemplate.update(sql, newBalance, userId);
}
}
3.1.3 乐观锁实现
/**
* 乐观锁实现库存扣减
* 使用版本号或CAS机制
*/
@Service
publicclass InventoryService {
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 扣减库存 - 乐观锁实现
* @return true 成功,false 失败(库存不足或版本冲突)
*/
public boolean decreaseStock(Long productId, int quantity) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
// 查询当前库存和版本号
String selectSql = "SELECT stock, version FROM inventory WHERE product_id = ?";
Map result = jdbcTemplate.queryForMap(selectSql, productId);
int currentStock = (Integer) result.get("stock");
int currentVersion = (Integer) result.get("version");
// 检查库存
if (currentStock < quantity) {
returnfalse; // 库存不足
}
// 使用版本号进行CAS更新
String updateSql = """
UPDATE inventory
SET stock = stock - ?, version = version + 1
WHERE product_id = ? AND version = ?
""";
int affected = jdbcTemplate.update(updateSql, quantity, productId, currentVersion);
if (affected > 0) {
returntrue; // 更新成功
}
// 版本冲突,重试
try {
Thread.sleep(10 + (long)(Math.random() * 50)); // 随机延迟
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
returnfalse; // 重试次数用尽
}
/**
* 扣减库存 - 使用行级条件更新(更简洁的乐观锁)
*/
public boolean decreaseStockSimple(Long productId, int quantity) {
String sql = """
UPDATE inventory
SET stock = stock - ?
WHERE product_id = ? AND stock >= ?
""";
int affected = jdbcTemplate.update(sql, quantity, productId, quantity);
return affected > 0;
}
}
3.1.4 分布式锁实现
/**
* 基于Redis的分布式锁实现
* 解决跨实例的并发问题
*/
@Component
publicclass DistributedLock {
@Autowired
private StringRedisTemplate redisTemplate;
privatestaticfinallong DEFAULT_EXPIRE_TIME = 30000; // 30秒
/**
* 获取锁
* @param lockKey 锁的key
* @param requestId 请求标识(用于释放锁时验证)
* @param expireTime 过期时间(毫秒)
*/
public boolean tryLock(String lockKey, String requestId, long expireTime) {
Boolean success = redisTemplate.opsForValue().setIfAbsent(
lockKey,
requestId,
expireTime,
TimeUnit.MILLISECONDS
);
return Boolean.TRUE.equals(success);
}
/**
* 释放锁
* 使用Lua脚本保证原子性
*/
public boolean unlock(String lockKey, String requestId) {
String script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
""";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey),
requestId
);
return Long.valueOf(1).equals(result);
}
/**
* 带自动续期的锁
* 使用watchdog机制
*/
public boolean tryLockWithWatchdog(String lockKey, String requestId) {
boolean locked = tryLock(lockKey, requestId, DEFAULT_EXPIRE_TIME);
if (locked) {
// 启动watchdog线程,定期续期
startWatchdog(lockKey, requestId);
}
return locked;
}
private void startWatchdog(String lockKey, String requestId) {
Thread watchdog = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(DEFAULT_EXPIRE_TIME / 3); // 每10秒续期一次
// 续期
String script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('pexpire', KEYS[1], ARGV[2])
else
return 0
end
""";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(lockKey),
requestId,
String.valueOf(DEFAULT_EXPIRE_TIME)
);
if (!Long.valueOf(1).equals(result)) {
break; // 锁已被释放或被其他进程获取
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
watchdog.setDaemon(true);
watchdog.start();
}
}
/**
* 使用分布式锁的示例
*/
@Service
publicclass OrderService {
@Autowired
private DistributedLock distributedLock;
@Autowired
private InventoryService inventoryService;
/**
* 创建订单 - 使用分布式锁保证幂等性
*/
public Order createOrder(String orderNo, Long productId, int quantity) {
String lockKey = "lock" + orderNo;
String requestId = UUID.randomUUID().toString();
try {
// 获取分布式锁
if (!distributedLock.tryLock(lockKey, requestId, 30000)) {
thrownew RuntimeException("获取锁失败,请稍后重试");
}
// 检查订单是否已存在(幂等性检查)
Order existingOrder = orderMapper.findByOrderNo(orderNo);
if (existingOrder != null) {
return existingOrder; // 返回已存在的订单
}
// 扣减库存
if (!inventoryService.decreaseStock(productId, quantity)) {
thrownew RuntimeException("库存不足");
}
// 创建订单
Order order = new Order();
order.setOrderNo(orderNo);
order.setProductId(productId);
order.setQuantity(quantity);
orderMapper.insert(order);
return order;
} finally {
// 释放锁
distributedLock.unlock(lockKey, requestId);
}
}
}
3.2 实际应用案例
3.2.1 秒杀系统防超卖
/**
* 秒杀系统防超卖方案
*/
@Service
publicclass SeckillService {
@Autowired
private RedisTemplate redisTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
/**
* 方案1:Redis预扣库存 + 异步入库
*/
public SeckillResult seckillWithRedis(Long userId, Long productId) {
String stockKey = "seckill" + productId;
String orderKey = "seckill" + productId;
// 1. 检查是否已购买(防止重复购买)
Boolean isMember = redisTemplate.opsForSet().isMember(orderKey, userId);
if (Boolean.TRUE.equals(isMember)) {
return SeckillResult.fail("您已参与过此活动");
}
// 2. 预扣库存(原子操作)
Long stock = redisTemplate.opsForValue().decrement(stockKey);
if (stock == null || stock < 0) {
// 库存不足,恢复
redisTemplate.opsForValue().increment(stockKey);
return SeckillResult.fail("商品已售罄");
}
try {
// 3. 记录用户已购买
redisTemplate.opsForSet().add(orderKey, userId);
// 4. 发送消息到MQ,异步创建订单
OrderMessage message = new OrderMessage(userId, productId, 1);
rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);
return SeckillResult.success("秒杀成功,订单创建中");
} catch (Exception e) {
// 异常时恢复库存
redisTemplate.opsForValue().increment(stockKey);
redisTemplate.opsForSet().remove(orderKey, userId);
return SeckillResult.fail("系统繁忙,请稍后重试");
}
}
/**
* 方案2:数据库行级锁
* 适用于库存量大、并发相对较低的场景
*/
@Transactional
public SeckillResult seckillWithDbLock(Long userId, Long productId, int quantity) {
// 1. 查询库存(加锁)
String selectSql = """
SELECT stock FROM inventory WHERE product_id = ? FOR UPDATE
""";
Integer stock = jdbcTemplate.queryForObject(selectSql, Integer.class, productId);
if (stock == null || stock < quantity) {
return SeckillResult.fail("库存不足");
}
// 2. 扣减库存
String updateSql = "UPDATE inventory SET stock = stock - ? WHERE product_id = ?";
jdbcTemplate.update(updateSql, quantity, productId);
// 3. 创建订单
String insertSql = """
INSERT INTO orders (order_no, user_id, product_id, quantity, status)
VALUES (?, ?, ?, ?, 1)
""";
String orderNo = generateOrderNo();
jdbcTemplate.update(insertSql, orderNo, userId, productId, quantity);
return SeckillResult.success(orderNo);
}
/**
* 方案3:乐观锁 + 限制重试次数
*/
public SeckillResult seckillWithOptimisticLock(Long userId, Long productId, int quantity) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
// 使用乐观锁扣减库存
String sql = """
UPDATE inventory
SET stock = stock - ?, version = version + 1
WHERE product_id = ? AND stock >= ?
""";
int affected = jdbcTemplate.update(sql, quantity, productId, quantity);
if (affected > 0) {
// 扣减成功,创建订单
String orderNo = createOrder(userId, productId, quantity);
return SeckillResult.success(orderNo);
}
// 可能是库存不足或版本冲突,检查库存
Integer stock = jdbcTemplate.queryForObject(
"SELECT stock FROM inventory WHERE product_id = ?",
Integer.class, productId
);
if (stock == null || stock < quantity) {
return SeckillResult.fail("库存不足");
}
// 版本冲突,短暂等待后重试
try {
Thread.sleep(10 + (long)(Math.random() * 30));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return SeckillResult.fail("系统繁忙,请稍后重试");
}
}
3.2.2 死锁自动检测和告警
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL死锁监控和告警脚本
"""
import pymysql
import time
import json
import requests
from datetime import datetime
class DeadlockMonitor:
"""死锁监控器"""
def __init__(self, host, user, password, database, alert_webhook=None):
self.conn_params = {
'host': host,
'user': user,
'password': password,
'database': database,
'charset': 'utf8mb4'
}
self.alert_webhook = alert_webhook
self.last_deadlock_info = None
def get_innodb_status(self):
"""获取InnoDB状态"""
conn = pymysql.connect(**self.conn_params)
try:
with conn.cursor() as cursor:
cursor.execute("SHOW ENGINE INNODB STATUS")
result = cursor.fetchone()
return result[2] if result elseNone
finally:
conn.close()
def parse_deadlock(self, status):
"""解析死锁信息"""
ifnot status:
returnNone
lines = status.split('
')
in_deadlock_section = False
deadlock_info = []
current_section = []
for line in lines:
if'LATEST DETECTED DEADLOCK'in line:
in_deadlock_section = True
continue
if in_deadlock_section:
if line.startswith('---') and'TRANSACTION'notin line:
if current_section:
deadlock_info.append('
'.join(current_section))
current_section = []
continue
if'WE ROLL BACK'in line:
current_section.append(line)
deadlock_info.append('
'.join(current_section))
break
current_section.append(line)
return'
'.join(deadlock_info) if deadlock_info elseNone
def get_lock_waits(self):
"""获取当前锁等待情况"""
conn = pymysql.connect(**self.conn_params)
try:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
sql = """
SELECT
r.trx_id AS waiting_trx_id,
r.trx_mysql_thread_id AS waiting_thread,
TIMESTAMPDIFF(SECOND, r.trx_wait_started, NOW()) AS wait_seconds,
r.trx_query AS waiting_query,
b.trx_id AS blocking_trx_id,
b.trx_mysql_thread_id AS blocking_thread,
b.trx_query AS blocking_query
FROM performance_schema.data_lock_waits w
INNER JOIN information_schema.innodb_trx b
ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID
INNER JOIN information_schema.innodb_trx r
ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID
"""
cursor.execute(sql)
return cursor.fetchall()
finally:
conn.close()
def alert(self, title, content):
"""发送告警"""
print(f"[ALERT] {title}")
print(content)
if self.alert_webhook:
try:
payload = {
'msgtype': 'markdown',
'markdown': {
'title': title,
'text': f"## {title}
{content}"
}
}
requests.post(self.alert_webhook, json=payload, timeout=5)
except Exception as e:
print(f"发送告警失败: {e}")
def check_deadlock(self):
"""检查死锁"""
status = self.get_innodb_status()
deadlock_info = self.parse_deadlock(status)
if deadlock_info and deadlock_info != self.last_deadlock_info:
self.last_deadlock_info = deadlock_info
self.alert(
"MySQL检测到死锁",
f"**时间**: {datetime.now()}
**详情**:
```
{deadlock_info[:2000]}
```"
)
returnTrue
returnFalse
def check_lock_waits(self, threshold_seconds=30):
"""检查长时间锁等待"""
lock_waits = self.get_lock_waits()
for wait in lock_waits:
if wait['wait_seconds'] and wait['wait_seconds'] > threshold_seconds:
self.alert(
"MySQL锁等待超时",
f"**等待时间**: {wait['wait_seconds']}秒
"
f"**等待线程**: {wait['waiting_thread']}
"
f"**等待SQL**: {wait['waiting_query']}
"
f"**阻塞线程**: {wait['blocking_thread']}
"
f"**阻塞SQL**: {wait['blocking_query']}"
)
def run(self, interval=10):
"""运行监控"""
print(f"死锁监控已启动,检查间隔: {interval}秒")
whileTrue:
try:
self.check_deadlock()
self.check_lock_waits(threshold_seconds=30)
except Exception as e:
print(f"监控异常: {e}")
time.sleep(interval)
if __name__ == '__main__':
monitor = DeadlockMonitor(
host='192.168.1.11',
user='monitor',
password='password',
database='lock_demo',
alert_webhook='https://your-webhook-url.com'
)
monitor.run()
3.2.3 事务超时和慢事务监控
-- 查询运行时间超过指定秒数的事务 SELECT trx_id, trx_mysql_thread_id AS thread_id, trx_state, trx_started, TIMESTAMPDIFF(SECOND, trx_started, NOW()) AS running_seconds, trx_rows_locked, trx_rows_modified, trx_lock_structs, trx_query FROM information_schema.innodb_trx WHERETIMESTAMPDIFF(SECOND, trx_started, NOW()) > 60 ORDERBY running_seconds DESC; -- 查询持有锁最多的事务 SELECT trx_id, trx_mysql_thread_id, trx_rows_locked, trx_lock_structs, trx_tables_locked, trx_query FROM information_schema.innodb_trx ORDERBY trx_rows_locked DESC LIMIT10; -- 查询锁定行数最多的表 SELECT object_schema, object_name, COUNT(*) as lock_count FROM performance_schema.data_locks WHERE lock_type = 'RECORD' GROUPBY object_schema, object_name ORDERBY lock_count DESC; -- 创建慢事务告警存储过程 DELIMITER // CREATEPROCEDURE check_slow_transactions(IN threshold_seconds INT) BEGIN DECLARE done INTDEFAULTFALSE; DECLARE v_trx_id VARCHAR(100); DECLARE v_thread_id BIGINT; DECLARE v_running_seconds INT; DECLARE v_query TEXT; DECLARE cur CURSORFOR SELECT trx_id, trx_mysql_thread_id, TIMESTAMPDIFF(SECOND, trx_started, NOW()), trx_query FROM information_schema.innodb_trx WHERETIMESTAMPDIFF(SECOND, trx_started, NOW()) > threshold_seconds; DECLARE CONTINUE HANDLERFORNOTFOUNDSET done = TRUE; -- 创建告警日志表 CREATETABLEIFNOTEXISTS slow_transaction_log ( idBIGINT AUTO_INCREMENT PRIMARY KEY, trx_id VARCHAR(100), thread_id BIGINT, running_seconds INT, queryTEXT, logged_at DATETIME DEFAULTCURRENT_TIMESTAMP ); OPEN cur; read_loop: LOOP FETCH cur INTO v_trx_id, v_thread_id, v_running_seconds, v_query; IF done THEN LEAVE read_loop; ENDIF; -- 记录慢事务 INSERTINTO slow_transaction_log (trx_id, thread_id, running_seconds, query) VALUES (v_trx_id, v_thread_id, v_running_seconds, v_query); ENDLOOP; CLOSE cur; END // DELIMITER ; -- 使用Event定期检查 CREATEEVENTIFNOTEXISTS check_slow_transactions_event ON SCHEDULE EVERY 1MINUTE DOCALL check_slow_transactions(60);
四、最佳实践和注意事项
4.1 最佳实践
4.1.1 事务设计原则
-- 1. 事务尽量短小 -- 差:大事务 STARTTRANSACTION; -- 处理100万条记录 UPDATE orders SETstatus = 1WHERE created_at < '2024-01-01'; -- 锁定大量行 COMMIT; -- 好:分批处理 DELIMITER // CREATEPROCEDURE batch_update_orders() BEGIN DECLARE affected_rows INTDEFAULT1; DECLARE batch_size INTDEFAULT1000; WHILE affected_rows > 0 DO STARTTRANSACTION; UPDATE orders SETstatus = 1 WHERE created_at < '2024-01-01'ANDstatus = 0 LIMIT batch_size; SET affected_rows = ROW_COUNT(); COMMIT; -- 短暂暂停,避免长时间占用资源 DOSLEEP(0.1); ENDWHILE; END // DELIMITER ; -- 2. 避免在事务中进行耗时操作 -- 差:事务中调用外部接口 STARTTRANSACTION; INSERTINTO orders (...) VALUES (...); -- 调用支付接口(可能需要几秒) -- 长时间持有锁 COMMIT; -- 好:先准备数据,再开启事务 -- 准备阶段(无事务) -- 调用支付接口,获取结果 STARTTRANSACTION; INSERTINTO orders (...) VALUES (...); -- 快速完成 INSERTINTO payments (...) VALUES (...); COMMIT; -- 3. 按固定顺序访问资源 -- 统一按主键升序访问,避免死锁
4.1.2 锁优化策略
-- 1. 尽量使用索引访问数据 -- 差:无索引导致锁表 UPDATE orders SETstatus = 1WHERE order_date = '2024-01-01'; -- 如果order_date没有索引,可能锁定大量行 -- 好:有索引时锁定范围精确 CREATEINDEX idx_order_date ON orders(order_date); UPDATE orders SETstatus = 1WHERE order_date = '2024-01-01'; -- 2. 减少锁定范围 -- 差:锁定所有匹配的行 SELECT * FROM orders WHERE user_id = 1FORUPDATE; -- 好:只锁定需要的行 SELECT * FROM orders WHERE user_id = 1ANDstatus = 0FORUPDATE; -- 3. 合理使用锁模式 -- 只读场景使用共享锁 SELECT * FROM orders WHEREid = 1LOCKINSHAREMODE; -- 需要修改时才使用排他锁 SELECT * FROM orders WHEREid = 1FORUPDATE; -- 4. 避免锁升级 -- 差:从共享锁升级到排他锁可能导致死锁 SELECT * FROM orders WHEREid = 1LOCKINSHAREMODE; -- 后续需要更新... UPDATE orders SETstatus = 1WHEREid = 1; -- 可能死锁 -- 好:直接使用排他锁 SELECT * FROM orders WHEREid = 1FORUPDATE; UPDATE orders SETstatus = 1WHEREid = 1;
4.1.3 隔离级别选择
-- 不同场景的隔离级别推荐 -- 1. 高并发读写场景:READ COMMITTED -- 优点:锁范围小,不使用间隙锁 -- 缺点:可能出现不可重复读 SETSESSIONTRANSACTIONISOLATIONLEVELREAD COMMITTED; -- 2. 金融交易场景:REPEATABLE READ(默认) -- 优点:一致性读,防止幻读 -- 缺点:间隙锁可能导致更多死锁 SETSESSIONTRANSACTIONISOLATIONLEVEL REPEATABLE READ; -- 3. 报表查询场景:使用一致性快照 STARTTRANSACTIONWITHCONSISTENTSNAPSHOT; SELECT * FROM orders WHERE ...; -- 读取的是事务开始时的快照,不会被其他事务影响 COMMIT; -- 4. 批量导入场景:可以临时使用READ UNCOMMITTED SETSESSIONTRANSACTIONISOLATIONLEVELREAD UNCOMMITTED; -- 导入完成后恢复 SETSESSIONTRANSACTIONISOLATIONLEVEL REPEATABLE READ;
4.2 注意事项
4.2.1 配置注意
| 配置项 | 建议值 | 说明 |
|---|---|---|
| innodb_lock_wait_timeout | 10-50 | 锁等待超时,根据业务调整 |
| innodb_deadlock_detect | ON | 开启死锁检测 |
| innodb_print_all_deadlocks | ON | 记录所有死锁到错误日志 |
| transaction_isolation | READ-COMMITTED / REPEATABLE-READ | 根据场景选择 |
| innodb_rollback_on_timeout | OFF | 超时时只回滚当前语句,不回滚整个事务 |
| autocommit | ON | 默认开启自动提交 |
4.2.2 常见错误
| 错误类型 | 错误信息 | 原因分析 | 解决方案 |
|---|---|---|---|
| 死锁 | Deadlock found | 循环等待 | 固定访问顺序 |
| 锁超时 | Lock wait timeout exceeded | 持锁时间过长 | 减小事务,增加超时 |
| 事务太大 | Transaction too large | 修改行数过多 | 分批处理 |
| 表锁 | Table lock wait | 无索引导致表锁 | 添加适当索引 |
| 间隙锁冲突 | Conflict on gap lock | 并发插入同一间隙 | 降低隔离级别 |
4.2.3 死锁预防清单
开发阶段: -固定访问顺序:多表操作按表名或主键排序 -减小事务范围:只在必要时开启事务 -使用低隔离级别:非必要不用REPEATABLEREAD -添加必要索引:避免全表扫描锁定 部署阶段: -开启死锁检测:innodb_deadlock_detect=ON -设置合理超时:innodb_lock_wait_timeout=10 -记录死锁日志:innodb_print_all_deadlocks=ON -配置监控告警:死锁次数、锁等待时间 运维阶段: -定期分析死锁:查看SHOWENGINEINNODBSTATUS -监控长事务:超过60秒的事务告警 -监控锁等待:等待超过10秒告警 -分析慢查询:优化持锁时间长的SQL
五、故障排查和监控
5.1 故障排查
5.1.1 死锁分析
-- 查看最近的死锁信息 SHOWENGINEINNODBSTATUSG -- 输出中的关键部分: -- LATEST DETECTED DEADLOCK -- ------------------------ -- 2024-01-01 1000 0x7f... -- *** (1) TRANSACTION: -- TRANSACTION 12345, ACTIVE 1 sec starting index read -- mysql tables in use 1, locked 1 -- LOCK WAIT 3 lock struct(s), heap size 1136, 2 row lock(s) -- MySQL thread id 100, OS thread handle 123, query id 456 192.168.1.10 app_user updating -- UPDATE accounts SET balance = balance - 100 WHERE user_id = 1 -- -- *** (1) WAITING FOR THIS LOCK TO BE GRANTED: -- RECORD LOCKS space id 123 page no 3 n bits 72 index uk_user_id of table `db`.`accounts` -- trx id 12345 lock_mode X locks rec but not gap waiting -- -- *** (2) TRANSACTION: -- TRANSACTION 12346, ACTIVE 1 sec starting index read -- ... -- -- *** (2) HOLDS THE LOCK(S): -- RECORD LOCKS space id 123 page no 3 n bits 72 index uk_user_id of table `db`.`accounts` -- ... -- -- *** (2) WAITING FOR THIS LOCK TO BE GRANTED: -- ... -- -- *** WE ROLL BACK TRANSACTION (1) -- 分析步骤: -- 1. 找到两个事务的SQL -- 2. 分析锁等待关系 -- 3. 确定死锁原因 -- 4. 制定解决方案
死锁日志解读
# 从错误日志中提取死锁信息 grep -A 100 "LATEST DETECTED DEADLOCK" /var/log/mysql/error.log | head -100 # 使用pt-deadlock-logger记录死锁 pt-deadlock-logger --host=localhost --user=root --password=xxx --dest h=localhost,D=monitor,t=deadlocks --run-time=1h
5.1.2 锁等待分析
-- 查看当前锁等待 SELECT waiting.trx_id AS waiting_trx_id, waiting.trx_mysql_thread_id AS waiting_thread, waiting.trx_query AS waiting_query, TIMESTAMPDIFF(SECOND, waiting.trx_wait_started, NOW()) AS waiting_seconds, blocking.trx_id AS blocking_trx_id, blocking.trx_mysql_thread_id AS blocking_thread, blocking.trx_query AS blocking_query, TIMESTAMPDIFF(SECOND, blocking.trx_started, NOW()) AS blocking_duration FROM information_schema.innodb_trx waiting INNERJOIN performance_schema.data_lock_waits dlw ON waiting.trx_id = dlw.REQUESTING_ENGINE_TRANSACTION_ID INNERJOIN information_schema.innodb_trx blocking ON blocking.trx_id = dlw.BLOCKING_ENGINE_TRANSACTION_ID; -- 查看锁的详细信息 SELECT dl.ENGINE_LOCK_ID, dl.ENGINE_TRANSACTION_ID, dl.OBJECT_SCHEMA, dl.OBJECT_NAME, dl.INDEX_NAME, dl.LOCK_TYPE, dl.LOCK_MODE, dl.LOCK_STATUS, dl.LOCK_DATA FROM performance_schema.data_locks dl; -- 终止阻塞事务(谨慎使用) -- 先确认阻塞线程ID KILL12345;
5.1.3 长事务分析
-- 查找运行时间最长的事务 SELECT trx_id, trx_mysql_thread_id AS thread_id, trx_state, trx_started, NOW() - trx_started AS running_time, trx_rows_locked, trx_rows_modified, trx_tables_in_use, trx_tables_locked, trx_query FROM information_schema.innodb_trx ORDERBY trx_started ASC; -- 查看事务对应的连接信息 SELECT t.trx_id, t.trx_mysql_thread_id, p.user, p.host, p.db, p.command, p.time, p.state, t.trx_query FROM information_schema.innodb_trx t INNERJOIN information_schema.processlist p ON t.trx_mysql_thread_id = p.id; -- 查看事务的undo日志量(判断回滚代价) SELECT trx_id, trx_undo_record_size, trx_undo_record_size / 1024 / 1024AS undo_mb FROM information_schema.innodb_trx WHERE trx_undo_record_size > 0;
5.2 性能监控
5.2.1 关键指标
-- InnoDB锁相关指标 SHOWGLOBALSTATUSLIKE'Innodb_row_lock%'; -- Innodb_row_lock_current_waits: 当前等待锁的数量 -- Innodb_row_lock_time: 总锁等待时间(毫秒) -- Innodb_row_lock_time_avg: 平均锁等待时间 -- Innodb_row_lock_time_max: 最大锁等待时间 -- Innodb_row_lock_waits: 总锁等待次数 -- 死锁次数 SHOWGLOBALSTATUSLIKE'Innodb_deadlocks'; -- 锁内存使用 SHOWGLOBALSTATUSLIKE'Innodb_row_lock_memory'; -- 计算锁争用率 SELECT (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Innodb_row_lock_waits') / (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Questions') * 100 AS lock_contention_percent;
5.2.2 监控指标表
| 指标类别 | 指标名称 | 含义 | 告警阈值 |
|---|---|---|---|
| 死锁 | Innodb_deadlocks | 死锁累计次数 | 增长率 > 1/min |
| 锁等待 | Innodb_row_lock_waits | 锁等待累计次数 | 增长率 > 10/sec |
| 锁时间 | Innodb_row_lock_time_avg | 平均锁等待时间(ms) | > 1000 |
| 当前等待 | Innodb_row_lock_current_waits | 当前等待锁数量 | > 10 |
| 长事务 | 运行超过60秒的事务 | 长事务数量 | > 0 |
| 锁表 | Tables_locks_waited | 表锁等待次数 | > 0 |
5.2.3 Prometheus告警规则
groups:
-name:mysql-lock-alerts
rules:
-alert:MySQLDeadlocks
expr:increase(mysql_global_status_innodb_deadlocks[5m])>0
for:1m
labels:
severity:warning
annotations:
summary:"MySQL发生死锁"
description:"{{ $labels.instance }} 在过去5分钟内发生 {{ $value }} 次死锁"
-alert:MySQLHighLockWaits
expr:rate(mysql_global_status_innodb_row_lock_waits[5m])>10
for:5m
labels:
severity:warning
annotations:
summary:"MySQL锁等待频繁"
description:"{{ $labels.instance }} 锁等待率为 {{ $value }}/秒"
-alert:MySQLLongLockWait
expr:mysql_global_status_innodb_row_lock_time_avg>1000
for:5m
labels:
severity:warning
annotations:
summary:"MySQL锁等待时间过长"
description:"{{ $labels.instance }} 平均锁等待时间 {{ $value }}ms"
-alert:MySQLLongTransaction
expr:mysql_info_schema_innodb_trx_running_seconds>60
for:1m
labels:
severity:critical
annotations:
summary:"MySQL存在长事务"
description:"{{ $labels.instance }} 存在运行超过60秒的事务"
5.3 备份与恢复
5.3.1 事务日志备份
#!/bin/bash
# MySQL binlog备份脚本
BACKUP_DIR="/data/backup/binlog"
MYSQL_USER="backup"
MYSQL_PASS="password"
MYSQL_HOST="localhost"
RETENTION_DAYS=7
mkdir -p $BACKUP_DIR
# 获取当前binlog文件列表
mysql -u$MYSQL_USER -p$MYSQL_PASS -h$MYSQL_HOST -e "SHOW BINARY LOGS;" |
tail -n +2 | awk '{print $1}' | whileread binlog; do
# 复制binlog到备份目录
if [ ! -f "$BACKUP_DIR/$binlog" ]; then
mysqlbinlog -u$MYSQL_USER -p$MYSQL_PASS -h$MYSQL_HOST
--read-from-remote-server $binlog > $BACKUP_DIR/$binlog.sql
gzip $BACKUP_DIR/$binlog.sql
echo"备份 $binlog 完成"
fi
done
# 清理过期备份
find $BACKUP_DIR -name "*.sql.gz" -mtime +$RETENTION_DAYS -delete
echo"Binlog备份完成"
5.3.2 死锁恢复流程
-- 死锁后的恢复步骤 -- 1. 确认事务状态 SELECT trx_id, trx_state, trx_started, trx_mysql_thread_id, trx_query FROM information_schema.innodb_trx; -- 2. 如果事务被回滚,应用程序需要重试 -- 检查应用程序的重试逻辑 -- 3. 如果需要手动回滚 ROLLBACK; -- 4. 检查数据一致性 -- 根据业务逻辑验证数据 -- 5. 分析死锁原因 SHOWENGINEINNODBSTATUSG -- 找到LATEST DETECTED DEADLOCK部分 -- 6. 记录和上报 -- 将死锁信息记录到监控系统
六、总结
6.1 技术要点回顾
MySQL事务与锁机制的核心要点:
1. ACID特性
原子性:通过undo log实现
一致性:通过约束和应用逻辑保证
隔离性:通过锁和MVCC实现
持久性:通过redo log保证
2. 锁类型
行锁:记录锁、间隙锁、临键锁
表锁:意向锁、MDL锁
锁模式:共享锁、排他锁
3. 死锁处理
预防:固定访问顺序、减小事务
检测:innodb_deadlock_detect
恢复:自动回滚一个事务
4. 最佳实践
事务尽量短小
按固定顺序访问资源
合理选择隔离级别
使用合适的锁策略
6.2 进阶学习方向
| 方向 | 内容 | 推荐资源 |
|---|---|---|
| MVCC原理 | 版本链、ReadView机制 | 《MySQL技术内幕:InnoDB存储引擎》 |
| 锁算法 | B+树锁定协议 | MySQL源码 |
| 分布式事务 | XA、TCC、SAGA | Seata框架文档 |
| 死锁检测算法 | 等待图、超时检测 | 数据库系统概论 |
| 性能调优 | 锁粒度优化 | Percona博客 |
6.3 参考资料
MySQL官方文档:https://dev.mysql.com/doc/
《MySQL技术内幕:InnoDB存储引擎》第2版
《高性能MySQL》第4版
Percona Blog:https://www.percona.com/blog/
附录
A. 命令速查表
| 命令 | 说明 | 示例 |
|---|---|---|
| START TRANSACTION | 开始事务 | START TRANSACTION; |
| COMMIT | 提交事务 | COMMIT; |
| ROLLBACK | 回滚事务 | ROLLBACK; |
| SELECT ... FOR UPDATE | 排他锁查询 | SELECT * FROM t WHERE id=1 FOR UPDATE; |
| SELECT ... FOR SHARE | 共享锁查询 | SELECT * FROM t WHERE id=1 FOR SHARE; |
| SHOW ENGINE INNODB STATUS | 查看InnoDB状态 | SHOW ENGINE INNODB STATUSG |
| KILL | 终止连接 | KILL 12345; |
B. 配置参数详解
| 参数 | 默认值 | 说明 | 建议 |
|---|---|---|---|
| transaction_isolation | REPEATABLE-READ | 默认隔离级别 | 根据场景选择 |
| innodb_lock_wait_timeout | 50 | 锁等待超时(秒) | 10-30 |
| innodb_deadlock_detect | ON | 死锁检测开关 | ON |
| innodb_print_all_deadlocks | OFF | 记录所有死锁 | ON |
| innodb_rollback_on_timeout | OFF | 超时回滚整个事务 | OFF |
| autocommit | ON | 自动提交 | ON |
C. 术语表
| 术语 | 英文 | 说明 |
|---|---|---|
| 脏读 | Dirty Read | 读取未提交的数据 |
| 不可重复读 | Non-Repeatable Read | 同一事务两次读取结果不同 |
| 幻读 | Phantom Read | 查询结果集行数变化 |
| 死锁 | Deadlock | 循环等待锁 |
| 间隙锁 | Gap Lock | 锁定索引间隙 |
| 临键锁 | Next-Key Lock | 记录锁+间隙锁 |
| MVCC | Multi-Version Concurrency Control | 多版本并发控制 |
| Undo Log | - | 回滚日志 |
| Redo Log | - | 重做日志 |
| 两阶段锁 | Two-Phase Locking | 加锁和解锁分两阶段 |
全部0条评论
快来发表一下你的评论吧 !