MySQL事务与锁机制详解

描述

一、概述

1.1 背景介绍

在我担任某互联网金融平台SRE期间,曾遇到过一次严重的线上事故:凌晨3点,监控系统疯狂告警,数据库活跃连接数从平时的200飙升到2000,大量请求超时。紧急排查后发现,一个批量更新任务与在线交易产生了死锁,导致数据库连接被占满。

这次事故持续了40分钟,影响了上万名用户的交易。事后复盘发现,问题根源是开发团队对MySQL事务和锁机制理解不足,写出了容易产生死锁的代码。

从那以后,我花了大量时间研究MySQL的事务和锁机制,并总结出一套完整的排查和预防方法。本文将系统性地讲解MySQL事务的ACID特性、锁的工作原理,以及死锁的排查和解决方案。

1.2 技术特点

MySQL InnoDB存储引擎的事务和锁机制具有以下特点:

ACID事务特性

Atomicity(原子性):事务是不可分割的工作单位

Consistency(一致性):事务执行前后数据保持一致

Isolation(隔离性):并发事务之间相互隔离

Durability(持久性):事务提交后数据永久保存

多粒度锁机制

行锁:锁定单行记录,并发度高

间隙锁:锁定索引间隙,防止幻读

表锁:锁定整张表,开销小但并发度低

意向锁:表级锁,用于协调行锁和表锁

MVCC多版本并发控制

读不阻塞写,写不阻塞读

通过undo log实现一致性读

支持多种隔离级别

1.3 适用场景

场景类型 隔离级别 锁策略 典型应用
高并发读写 READ COMMITTED 最小化锁范围 电商订单
金融交易 REPEATABLE READ 行锁+间隙锁 转账、支付
报表统计 READ COMMITTED 快照读 数据分析
库存扣减 REPEATABLE READ SELECT FOR UPDATE 秒杀系统
批量更新 READ COMMITTED 分批提交 数据迁移

1.4 环境要求

组件 版本要求 说明
MySQL 8.0.35+ / 8.4 LTS 本文基于8.0.35版本
操作系统 Rocky Linux 9 / Ubuntu 24.04 推荐Rocky Linux 9
存储引擎 InnoDB 必须使用InnoDB
内存 16GB+ 足够的缓冲池空间

关键配置要求:

 

-- 查看InnoDB相关配置
SHOW VARIABLES LIKE 'innodb%';

-- 关键配置
innodb_buffer_pool_size = 8G  -- 缓冲池大小
innodb_lock_wait_timeout = 50  -- 锁等待超时(秒)
innodb_deadlock_detect = ON  -- 开启死锁检测
innodb_print_all_deadlocks = ON  -- 打印所有死锁信息
transaction_isolation = REPEATABLE-READ  -- 默认隔离级别

 

二、详细步骤

2.1 准备工作

2.1.1 ACID特性深入理解

原子性(Atomicity)

事务中的所有操作要么全部成功,要么全部失败回滚。MySQL通过undo log实现原子性。

 

-- 原子性示例:转账操作
STARTTRANSACTION;

-- 操作1:扣减转出账户余额
UPDATE accounts SET balance = balance - 1000WHERE user_id = 1;

-- 操作2:增加转入账户余额
UPDATE accounts SET balance = balance + 1000WHERE user_id = 2;

-- 如果两个操作都成功,提交事务
COMMIT;

-- 如果任一操作失败,回滚事务
-- ROLLBACK;

-- 原子性保证:
-- 1. 要么两个账户都更新成功
-- 2. 要么两个账户都保持原状
-- 不会出现钱扣了但没有到账的情况

 

一致性(Consistency)

事务执行前后,数据库从一个一致状态转换到另一个一致状态。

 

-- 一致性示例:确保总金额不变
-- 假设系统中只有两个账户,总金额应该始终为10000

-- 事务前检查
SELECTSUM(balance) FROM accounts;  -- 结果:10000

STARTTRANSACTION;
UPDATE accounts SET balance = balance - 1000WHERE user_id = 1;
UPDATE accounts SET balance = balance + 1000WHERE user_id = 2;
COMMIT;

-- 事务后检查
SELECTSUM(balance) FROM accounts;  -- 结果仍然:10000

-- 一致性由应用程序和数据库约束共同保证
-- 比如:CHECK约束、外键约束、触发器等

 

隔离性(Isolation)

并发执行的事务之间相互隔离,一个事务的中间状态对其他事务不可见。

 

-- 隔离性示例:并发读写

-- 会话1
STARTTRANSACTION;
UPDATE products SET stock = stock - 1WHEREid = 1;
-- 此时还未提交

-- 会话2
SELECT stock FROM products WHEREid = 1;
-- 根据隔离级别,可能看到更新前或更新后的值

-- MySQL默认使用REPEATABLE READ隔离级别
-- 会话2看到的是事务开始时的快照,即更新前的值

 

持久性(Durability)

事务一旦提交,其结果就是永久性的,即使系统崩溃也不会丢失。

 

-- 持久性由redo log保证
-- 事务提交时,redo log会刷入磁盘

-- 相关配置
SHOWVARIABLESLIKE'innodb_flush_log_at_trx_commit';

-- innodb_flush_log_at_trx_commit = 1(默认)
-- 每次事务提交都将redo log刷入磁盘
-- 最安全但性能略低

-- innodb_flush_log_at_trx_commit = 2
-- 每次提交写入OS缓存,每秒刷盘
-- 性能好,但断电可能丢失1秒数据

-- innodb_flush_log_at_trx_commit = 0
-- 每秒写入OS缓存并刷盘
-- 性能最好,但可能丢失1秒数据

 

2.1.2 事务隔离级别

MySQL支持四种隔离级别,解决不同的并发问题:

隔离级别 脏读 不可重复读 幻读 性能
READ UNCOMMITTED 可能 可能 可能 最高
READ COMMITTED 不可能 可能 可能
REPEATABLE READ 不可能 不可能 InnoDB防止
SERIALIZABLE 不可能 不可能 不可能 最低

 

-- 查看当前隔离级别
SELECT @@transaction_isolation;
-- 或
SHOWVARIABLESLIKE'transaction_isolation';

-- 设置会话隔离级别
SETSESSIONTRANSACTIONISOLATIONLEVELREAD COMMITTED;

-- 设置全局隔离级别(需要重连生效)
SETGLOBALTRANSACTIONISOLATIONLEVELREAD COMMITTED;

-- 在配置文件中设置
-- [mysqld]
-- transaction-isolation = READ-COMMITTED

 

脏读演示

 

-- 会话1(设置为READ UNCOMMITTED)
SETSESSIONTRANSACTIONISOLATIONLEVELREAD UNCOMMITTED;
STARTTRANSACTION;

-- 会话2
STARTTRANSACTION;
UPDATE accounts SET balance = 500WHERE user_id = 1;
-- 未提交

-- 会话1
SELECT balance FROM accounts WHERE user_id = 1;
-- 结果:500(读到了未提交的数据,即脏读)

-- 会话2
ROLLBACK;  -- 回滚

-- 会话1再次查询
SELECT balance FROM accounts WHERE user_id = 1;
-- 结果可能是原来的值,之前读到的500是"脏数据"

 

不可重复读演示

 

-- 会话1(READ COMMITTED级别)
SETSESSIONTRANSACTIONISOLATIONLEVELREAD COMMITTED;
STARTTRANSACTION;

SELECT balance FROM accounts WHERE user_id = 1;
-- 结果:1000

-- 会话2
UPDATE accounts SET balance = 500WHERE user_id = 1;
COMMIT;

-- 会话1再次查询
SELECT balance FROM accounts WHERE user_id = 1;
-- 结果:500(同一事务内两次读取结果不同,即不可重复读)

COMMIT;

 

幻读演示

 

-- 会话1(即使REPEATABLE READ也可能有幻读场景)
STARTTRANSACTION;

SELECTCOUNT(*) FROM orders WHERE user_id = 1;
-- 结果:10

-- 会话2
INSERTINTO orders (user_id, amount) VALUES (1, 100);
COMMIT;

-- 会话1使用当前读
SELECTCOUNT(*) FROM orders WHERE user_id = 1FORUPDATE;
-- 结果:11(看到了新插入的行,即幻读)

-- 注意:InnoDB的REPEATABLE READ通过间隙锁很大程度上防止了幻读
-- 但在某些边界情况下仍可能发生

 

2.1.3 创建测试环境

 

-- 创建测试数据库
CREATEDATABASEIFNOTEXISTS lock_demo;
USE lock_demo;

-- 创建账户表
CREATETABLE accounts (
    idBIGINTUNSIGNED AUTO_INCREMENT PRIMARY KEY,
    user_id BIGINTUNSIGNEDNOTNULL,
    balance DECIMAL(15,2) NOTNULLDEFAULT0.00,
    versionINTUNSIGNEDNOTNULLDEFAULT0,  -- 乐观锁版本号
    created_at DATETIME DEFAULTCURRENT_TIMESTAMP,
    updated_at DATETIME DEFAULTCURRENT_TIMESTAMPONUPDATECURRENT_TIMESTAMP,
    UNIQUEKEY uk_user_id (user_id)
) ENGINE=InnoDB;

-- 创建订单表
CREATETABLE orders (
    idBIGINTUNSIGNED AUTO_INCREMENT PRIMARY KEY,
    order_no VARCHAR(32) NOTNULL,
    user_id BIGINTUNSIGNEDNOTNULL,
    amount DECIMAL(10,2) NOTNULL,
    statusTINYINTDEFAULT0,
    created_at DATETIME DEFAULTCURRENT_TIMESTAMP,
    UNIQUEKEY uk_order_no (order_no),
    INDEX idx_user_id (user_id),
    INDEX idx_status (status),
    INDEX idx_user_status (user_id, status)
) ENGINE=InnoDB;

-- 创建库存表
CREATETABLE inventory (
    idBIGINTUNSIGNED AUTO_INCREMENT PRIMARY KEY,
    product_id BIGINTUNSIGNEDNOTNULL,
    stock INTUNSIGNEDNOTNULLDEFAULT0,
    versionINTUNSIGNEDNOTNULLDEFAULT0,
    UNIQUEKEY uk_product_id (product_id)
) ENGINE=InnoDB;

-- 插入测试数据
INSERTINTO accounts (user_id, balance) VALUES
(1, 10000.00), (2, 5000.00), (3, 3000.00);

INSERTINTO inventory (product_id, stock) VALUES
(1001, 100), (1002, 200), (1003, 50);

-- 生成订单测试数据
INSERTINTO orders (order_no, user_id, amount, status)
SELECT
    CONCAT('ORD', LPAD(seq, 10, '0')),
    FLOOR(RAND() * 3) + 1,
    ROUND(RAND() * 1000, 2),
    FLOOR(RAND() * 5)
FROM (
    SELECT @row := @row + 1as seq FROM
    (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
     UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t1,
    (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
     UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t2,
    (SELECT0UNIONSELECT1UNIONSELECT2UNIONSELECT3UNIONSELECT4
     UNIONSELECT5UNIONSELECT6UNIONSELECT7UNIONSELECT8UNIONSELECT9) t3,
    (SELECT @row := 0) r
) seq_table;

 

2.2 核心配置

2.2.1 InnoDB锁类型详解

1. 共享锁(S锁)和排他锁(X锁)

 

-- 共享锁(S锁):允许其他事务读,但不允许写
SELECT * FROM accounts WHERE user_id = 1LOCKINSHAREMODE;
-- MySQL 8.0 新语法
SELECT * FROM accounts WHERE user_id = 1FORSHARE;

-- 排他锁(X锁):不允许其他事务读写(当前读除外)
SELECT * FROM accounts WHERE user_id = 1FORUPDATE;

-- 锁兼容性矩阵
-- |     | S锁 | X锁 |
-- | S锁 | 兼容 | 冲突 |
-- | X锁 | 冲突 | 冲突 |

 

2. 意向锁(IS/IX锁)

 

-- 意向锁是表级锁,用于表明事务稍后会在表中的行上加什么类型的锁
-- 意向共享锁(IS):事务准备给数据行加共享锁
-- 意向排他锁(IX):事务准备给数据行加排他锁

-- 查看意向锁
SELECT * FROM performance_schema.data_locks WHERE LOCK_TYPE = 'TABLE';

-- 意向锁的作用:
-- 加表锁时,不需要遍历每一行来检查是否有行锁
-- 只需检查意向锁即可

-- 兼容性矩阵:
-- |     | IS | IX | S  | X  |
-- | IS  | 兼容 | 兼容 | 兼容 | 冲突 |
-- | IX  | 兼容 | 兼容 | 冲突 | 冲突 |
-- | S   | 兼容 | 冲突 | 兼容 | 冲突 |
-- | X   | 冲突 | 冲突 | 冲突 | 冲突 |

 

3. 记录锁(Record Lock)

 

-- 记录锁锁定索引记录
-- 如果表没有索引,InnoDB会创建隐藏的聚簇索引,并使用该索引进行记录锁定

STARTTRANSACTION;
-- 锁定id=1的记录
SELECT * FROM accounts WHEREid = 1FORUPDATE;
-- 此时其他事务无法修改id=1的行

-- 查看记录锁
SELECT * FROM performance_schema.data_locks
WHERE LOCK_TYPE = 'RECORD'AND LOCK_MODE = 'X,REC_NOT_GAP';

 

4. 间隙锁(Gap Lock)

 

-- 间隙锁锁定索引记录之间的间隙,防止其他事务插入
-- 只在REPEATABLE READ及以上隔离级别生效

-- 假设accounts表中有id: 1, 5, 10
STARTTRANSACTION;
SELECT * FROM accounts WHEREidBETWEEN3AND7FORUPDATE;
-- 这会锁定(1,5)和(5,10)的间隙

-- 其他事务无法在这些间隙中插入新记录
-- INSERT INTO accounts (id, user_id, balance) VALUES (3, 3, 1000);  -- 会等待

-- 查看间隙锁
SELECT * FROM performance_schema.data_locks
WHERE LOCK_TYPE = 'RECORD'AND LOCK_MODE = 'X,GAP';

 

5. 临键锁(Next-Key Lock)

 

-- 临键锁 = 记录锁 + 间隙锁
-- 锁定一个索引记录及其前面的间隙

-- 假设有id: 1, 5, 10
START TRANSACTION;
SELECT * FROM accounts WHERE id = 5 FOR UPDATE;
-- 在REPEATABLE READ级别,这会锁定:
-- 1. 记录id=5
-- 2. 间隙(1,5)

-- 临键锁是InnoDB默认的锁类型,用于防止幻读

 

6. 插入意向锁(Insert Intention Lock)

 

-- 插入意向锁是一种特殊的间隙锁
-- 多个事务可以同时获取同一间隙的插入意向锁(只要插入位置不同)

-- 会话1
STARTTRANSACTION;
INSERTINTO accounts (id, user_id, balance) VALUES (3, 3, 1000);
-- 获取(1,5)间隙的插入意向锁,插入id=3

-- 会话2
STARTTRANSACTION;
INSERTINTO accounts (id, user_id, balance) VALUES (4, 4, 2000);
-- 也可以获取(1,5)间隙的插入意向锁,插入id=4
-- 两个插入可以并发执行,因为插入位置不冲突

 

2.2.2 锁监控配置

 

-- 开启锁监控
SETGLOBAL innodb_status_output = ON;
SETGLOBAL innodb_status_output_locks = ON;

-- 查看InnoDB状态(包含锁信息)
SHOWENGINEINNODBSTATUSG

-- 使用performance_schema监控锁
-- data_locks:当前持有的锁
SELECT * FROM performance_schema.data_locks;

-- data_lock_waits:锁等待关系
SELECT * FROM performance_schema.data_lock_waits;

-- 查看等待锁的事务
SELECT
    r.trx_id AS waiting_trx_id,
    r.trx_mysql_thread_id AS waiting_thread,
    r.trx_query AS waiting_query,
    b.trx_id AS blocking_trx_id,
    b.trx_mysql_thread_id AS blocking_thread,
    b.trx_query AS blocking_query
FROM performance_schema.data_lock_waits w
INNERJOIN information_schema.innodb_trx b ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID
INNERJOIN information_schema.innodb_trx r ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID;

 

2.2.3 死锁检测配置

 

-- 开启死锁检测(默认开启)
SETGLOBAL innodb_deadlock_detect = ON;

-- 设置锁等待超时时间
SETGLOBAL innodb_lock_wait_timeout = 50;  -- 默认50秒

-- 打印所有死锁信息到错误日志
SETGLOBAL innodb_print_all_deadlocks = ON;

-- 配置文件设置
-- [mysqld]
-- innodb_deadlock_detect = ON
-- innodb_lock_wait_timeout = 10
-- innodb_print_all_deadlocks = ON

 

2.3 启动和验证

2.3.1 验证锁机制

 

-- 测试记录锁
-- 会话1
STARTTRANSACTION;
SELECT * FROM accounts WHEREid = 1FORUPDATE;
-- 不提交,保持锁定

-- 会话2
STARTTRANSACTION;
-- 尝试更新同一行
UPDATE accounts SET balance = balance + 100WHEREid = 1;
-- 此语句会等待,因为id=1被会话1锁定

-- 会话1
COMMIT;  -- 提交后会话2的更新才会执行

-- 查看锁等待情况
SELECT * FROM performance_schema.data_lock_waits;

 

2.3.2 验证死锁检测

 

-- 构造死锁场景
-- 会话1
STARTTRANSACTION;
UPDATE accounts SET balance = balance - 100WHEREid = 1;

-- 会话2
STARTTRANSACTION;
UPDATE accounts SET balance = balance - 100WHEREid = 2;

-- 会话1
UPDATE accounts SET balance = balance + 100WHEREid = 2;
-- 等待会话2释放id=2的锁

-- 会话2
UPDATE accounts SET balance = balance + 100WHEREid = 1;
-- 等待会话1释放id=1的锁
-- 此时发生死锁!

-- MySQL会检测到死锁,回滚其中一个事务
-- ERROR 1213 (40001): Deadlock found when trying to get lock; try restarting transaction

-- 查看最近的死锁信息
SHOWENGINEINNODBSTATUSG
-- 找到"LATEST DETECTED DEADLOCK"部分

 

三、示例代码和配置

3.1 完整配置示例

3.1.1 死锁案例分析

案例1:相反顺序更新

 

-- 最常见的死锁场景:两个事务以相反顺序更新行

-- 事务1:先更新A,再更新B
STARTTRANSACTION;
UPDATE accounts SET balance = balance - 100WHERE user_id = 1;  -- 锁定user_id=1
-- 等待...
UPDATE accounts SET balance = balance + 100WHERE user_id = 2;  -- 需要锁定user_id=2

-- 事务2:先更新B,再更新A
STARTTRANSACTION;
UPDATE accounts SET balance = balance - 50WHERE user_id = 2;  -- 锁定user_id=2
-- 等待...
UPDATE accounts SET balance = balance + 50WHERE user_id = 1;  -- 需要锁定user_id=1

-- 死锁!事务1持有A等待B,事务2持有B等待A

-- 解决方案:固定更新顺序
-- 始终按user_id升序或降序更新
STARTTRANSACTION;
-- 方法1:应用层排序
UPDATE accounts SET balance = balance - 100WHERE user_id = 1;
UPDATE accounts SET balance = balance + 100WHERE user_id = 2;
COMMIT;

 

案例2:间隙锁死锁

 

-- 间隙锁导致的死锁

-- 表中有id: 1, 10, 20
-- 事务1
STARTTRANSACTION;
SELECT * FROM accounts WHEREid = 5FORUPDATE;  -- 锁定间隙(1,10)
-- 等待...

-- 事务2
STARTTRANSACTION;
SELECT * FROM accounts WHEREid = 15FORUPDATE;  -- 锁定间隙(10,20)
INSERTINTO accounts (id, user_id, balance) VALUES (7, 7, 1000);  -- 等待事务1

-- 事务1
INSERTINTO accounts (id, user_id, balance) VALUES (12, 12, 2000);  -- 等待事务2
-- 死锁!

-- 解决方案:
-- 1. 降低隔离级别到READ COMMITTED(不使用间隙锁)
-- 2. 使用唯一索引精确匹配,避免间隙锁
-- 3. 减少锁定范围

 

案例3:唯一键冲突死锁

 

-- 唯一键冲突可能导致死锁

-- 表中已有 order_no = 'ORD001'
-- 事务1
STARTTRANSACTION;
INSERTINTO orders (order_no, user_id, amount) VALUES ('ORD002', 1, 100);

-- 事务2
STARTTRANSACTION;
INSERTINTO orders (order_no, user_id, amount) VALUES ('ORD002', 2, 200);
-- 唯一键冲突,等待事务1

-- 事务3
STARTTRANSACTION;
INSERTINTO orders (order_no, user_id, amount) VALUES ('ORD002', 3, 300);
-- 也等待

-- 事务1回滚
ROLLBACK;
-- 事务2和事务3可能死锁,因为它们都在等待锁

-- 解决方案:
-- 1. 使用INSERT ... ON DUPLICATE KEY UPDATE
-- 2. 使用INSERT IGNORE
-- 3. 先查询再插入(在应用层处理)

 

3.1.2 悲观锁实现

 

/**
 * 悲观锁实现转账功能
 * 使用SELECT FOR UPDATE锁定记录
 */
@Service
@Transactional
publicclass TransferService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    /**
     * 转账 - 悲观锁实现
     * 关键:按固定顺序获取锁,避免死锁
     */
    public void transfer(Long fromUserId, Long toUserId, BigDecimal amount) {
        // 按user_id排序,确保获取锁的顺序一致
        Long firstUserId = Math.min(fromUserId, toUserId);
        Long secondUserId = Math.max(fromUserId, toUserId);

        try {
            // 按顺序锁定账户
            BigDecimal firstBalance = lockAndGetBalance(firstUserId);
            BigDecimal secondBalance = lockAndGetBalance(secondUserId);

            // 确定转出和转入账户的余额
            BigDecimal fromBalance = fromUserId.equals(firstUserId) ? firstBalance : secondBalance;
            BigDecimal toBalance = fromUserId.equals(firstUserId) ? secondBalance : firstBalance;

            // 检查余额
            if (fromBalance.compareTo(amount) < 0) {
                thrownew RuntimeException("余额不足");
            }

            // 执行转账
            updateBalance(fromUserId, fromBalance.subtract(amount));
            updateBalance(toUserId, toBalance.add(amount));

        } catch (Exception e) {
            // 异常时事务自动回滚
            thrownew RuntimeException("转账失败: " + e.getMessage(), e);
        }
    }

    private BigDecimal lockAndGetBalance(Long userId) {
        // SELECT FOR UPDATE 锁定记录
        String sql = "SELECT balance FROM accounts WHERE user_id = ? FOR UPDATE";
        return jdbcTemplate.queryForObject(sql, BigDecimal.class, userId);
    }

    private void updateBalance(Long userId, BigDecimal newBalance) {
        String sql = "UPDATE accounts SET balance = ? WHERE user_id = ?";
        jdbcTemplate.update(sql, newBalance, userId);
    }
}

 

3.1.3 乐观锁实现

 

/**
 * 乐观锁实现库存扣减
 * 使用版本号或CAS机制
 */
@Service
publicclass InventoryService {

    @Autowired
    private JdbcTemplate jdbcTemplate;

    /**
     * 扣减库存 - 乐观锁实现
     * @return true 成功,false 失败(库存不足或版本冲突)
     */
    public boolean decreaseStock(Long productId, int quantity) {
        int maxRetries = 3;

        for (int i = 0; i < maxRetries; i++) {
            // 查询当前库存和版本号
            String selectSql = "SELECT stock, version FROM inventory WHERE product_id = ?";
            Map result = jdbcTemplate.queryForMap(selectSql, productId);

            int currentStock = (Integer) result.get("stock");
            int currentVersion = (Integer) result.get("version");

            // 检查库存
            if (currentStock < quantity) {
                returnfalse;  // 库存不足
            }

            // 使用版本号进行CAS更新
            String updateSql = """
                UPDATE inventory
                SET stock = stock - ?, version = version + 1
                WHERE product_id = ? AND version = ?
            """;

            int affected = jdbcTemplate.update(updateSql, quantity, productId, currentVersion);

            if (affected > 0) {
                returntrue;  // 更新成功
            }

            // 版本冲突,重试
            try {
                Thread.sleep(10 + (long)(Math.random() * 50));  // 随机延迟
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        returnfalse;  // 重试次数用尽
    }

    /**
     * 扣减库存 - 使用行级条件更新(更简洁的乐观锁)
     */
    public boolean decreaseStockSimple(Long productId, int quantity) {
        String sql = """
            UPDATE inventory
            SET stock = stock - ?
            WHERE product_id = ? AND stock >= ?
        """;

        int affected = jdbcTemplate.update(sql, quantity, productId, quantity);
        return affected > 0;
    }
}

 

3.1.4 分布式锁实现

 

/**
 * 基于Redis的分布式锁实现
 * 解决跨实例的并发问题
 */
@Component
publicclass DistributedLock {

    @Autowired
    private StringRedisTemplate redisTemplate;

    privatestaticfinallong DEFAULT_EXPIRE_TIME = 30000;  // 30秒

    /**
     * 获取锁
     * @param lockKey 锁的key
     * @param requestId 请求标识(用于释放锁时验证)
     * @param expireTime 过期时间(毫秒)
     */
    public boolean tryLock(String lockKey, String requestId, long expireTime) {
        Boolean success = redisTemplate.opsForValue().setIfAbsent(
            lockKey,
            requestId,
            expireTime,
            TimeUnit.MILLISECONDS
        );
        return Boolean.TRUE.equals(success);
    }

    /**
     * 释放锁
     * 使用Lua脚本保证原子性
     */
    public boolean unlock(String lockKey, String requestId) {
        String script = """
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
        """;

        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList(lockKey),
            requestId
        );

        return Long.valueOf(1).equals(result);
    }

    /**
     * 带自动续期的锁
     * 使用watchdog机制
     */
    public boolean tryLockWithWatchdog(String lockKey, String requestId) {
        boolean locked = tryLock(lockKey, requestId, DEFAULT_EXPIRE_TIME);

        if (locked) {
            // 启动watchdog线程,定期续期
            startWatchdog(lockKey, requestId);
        }

        return locked;
    }

    private void startWatchdog(String lockKey, String requestId) {
        Thread watchdog = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    Thread.sleep(DEFAULT_EXPIRE_TIME / 3);  // 每10秒续期一次

                    // 续期
                    String script = """
                        if redis.call('get', KEYS[1]) == ARGV[1] then
                            return redis.call('pexpire', KEYS[1], ARGV[2])
                        else
                            return 0
                        end
                    """;

                    Long result = redisTemplate.execute(
                        new DefaultRedisScript<>(script, Long.class),
                        Collections.singletonList(lockKey),
                        requestId,
                        String.valueOf(DEFAULT_EXPIRE_TIME)
                    );

                    if (!Long.valueOf(1).equals(result)) {
                        break;  // 锁已被释放或被其他进程获取
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });

        watchdog.setDaemon(true);
        watchdog.start();
    }
}

/**
 * 使用分布式锁的示例
 */
@Service
publicclass OrderService {

    @Autowired
    private DistributedLock distributedLock;

    @Autowired
    private InventoryService inventoryService;

    /**
     * 创建订单 - 使用分布式锁保证幂等性
     */
    public Order createOrder(String orderNo, Long productId, int quantity) {
        String lockKey = "lock" + orderNo;
        String requestId = UUID.randomUUID().toString();

        try {
            // 获取分布式锁
            if (!distributedLock.tryLock(lockKey, requestId, 30000)) {
                thrownew RuntimeException("获取锁失败,请稍后重试");
            }

            // 检查订单是否已存在(幂等性检查)
            Order existingOrder = orderMapper.findByOrderNo(orderNo);
            if (existingOrder != null) {
                return existingOrder;  // 返回已存在的订单
            }

            // 扣减库存
            if (!inventoryService.decreaseStock(productId, quantity)) {
                thrownew RuntimeException("库存不足");
            }

            // 创建订单
            Order order = new Order();
            order.setOrderNo(orderNo);
            order.setProductId(productId);
            order.setQuantity(quantity);
            orderMapper.insert(order);

            return order;

        } finally {
            // 释放锁
            distributedLock.unlock(lockKey, requestId);
        }
    }
}

 

3.2 实际应用案例

3.2.1 秒杀系统防超卖

 

/**
 * 秒杀系统防超卖方案
 */
@Service
publicclass SeckillService {

    @Autowired
    private RedisTemplate redisTemplate;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    /**
     * 方案1:Redis预扣库存 + 异步入库
     */
    public SeckillResult seckillWithRedis(Long userId, Long productId) {
        String stockKey = "seckill" + productId;
        String orderKey = "seckill" + productId;

        // 1. 检查是否已购买(防止重复购买)
        Boolean isMember = redisTemplate.opsForSet().isMember(orderKey, userId);
        if (Boolean.TRUE.equals(isMember)) {
            return SeckillResult.fail("您已参与过此活动");
        }

        // 2. 预扣库存(原子操作)
        Long stock = redisTemplate.opsForValue().decrement(stockKey);
        if (stock == null || stock < 0) {
            // 库存不足,恢复
            redisTemplate.opsForValue().increment(stockKey);
            return SeckillResult.fail("商品已售罄");
        }

        try {
            // 3. 记录用户已购买
            redisTemplate.opsForSet().add(orderKey, userId);

            // 4. 发送消息到MQ,异步创建订单
            OrderMessage message = new OrderMessage(userId, productId, 1);
            rabbitTemplate.convertAndSend("seckill.exchange", "seckill.order", message);

            return SeckillResult.success("秒杀成功,订单创建中");

        } catch (Exception e) {
            // 异常时恢复库存
            redisTemplate.opsForValue().increment(stockKey);
            redisTemplate.opsForSet().remove(orderKey, userId);
            return SeckillResult.fail("系统繁忙,请稍后重试");
        }
    }

    /**
     * 方案2:数据库行级锁
     * 适用于库存量大、并发相对较低的场景
     */
    @Transactional
    public SeckillResult seckillWithDbLock(Long userId, Long productId, int quantity) {
        // 1. 查询库存(加锁)
        String selectSql = """
            SELECT stock FROM inventory WHERE product_id = ? FOR UPDATE
        """;
        Integer stock = jdbcTemplate.queryForObject(selectSql, Integer.class, productId);

        if (stock == null || stock < quantity) {
            return SeckillResult.fail("库存不足");
        }

        // 2. 扣减库存
        String updateSql = "UPDATE inventory SET stock = stock - ? WHERE product_id = ?";
        jdbcTemplate.update(updateSql, quantity, productId);

        // 3. 创建订单
        String insertSql = """
            INSERT INTO orders (order_no, user_id, product_id, quantity, status)
            VALUES (?, ?, ?, ?, 1)
        """;
        String orderNo = generateOrderNo();
        jdbcTemplate.update(insertSql, orderNo, userId, productId, quantity);

        return SeckillResult.success(orderNo);
    }

    /**
     * 方案3:乐观锁 + 限制重试次数
     */
    public SeckillResult seckillWithOptimisticLock(Long userId, Long productId, int quantity) {
        int maxRetries = 3;

        for (int i = 0; i < maxRetries; i++) {
            // 使用乐观锁扣减库存
            String sql = """
                UPDATE inventory
                SET stock = stock - ?, version = version + 1
                WHERE product_id = ? AND stock >= ?
            """;

            int affected = jdbcTemplate.update(sql, quantity, productId, quantity);

            if (affected > 0) {
                // 扣减成功,创建订单
                String orderNo = createOrder(userId, productId, quantity);
                return SeckillResult.success(orderNo);
            }

            // 可能是库存不足或版本冲突,检查库存
            Integer stock = jdbcTemplate.queryForObject(
                "SELECT stock FROM inventory WHERE product_id = ?",
                Integer.class, productId
            );

            if (stock == null || stock < quantity) {
                return SeckillResult.fail("库存不足");
            }

            // 版本冲突,短暂等待后重试
            try {
                Thread.sleep(10 + (long)(Math.random() * 30));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }

        return SeckillResult.fail("系统繁忙,请稍后重试");
    }
}

 

3.2.2 死锁自动检测和告警

 

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL死锁监控和告警脚本
"""

import pymysql
import time
import json
import requests
from datetime import datetime


class DeadlockMonitor:
    """死锁监控器"""

    def __init__(self, host, user, password, database, alert_webhook=None):
        self.conn_params = {
            'host': host,
            'user': user,
            'password': password,
            'database': database,
            'charset': 'utf8mb4'
        }
        self.alert_webhook = alert_webhook
        self.last_deadlock_info = None

    def get_innodb_status(self):
        """获取InnoDB状态"""
        conn = pymysql.connect(**self.conn_params)
        try:
            with conn.cursor() as cursor:
                cursor.execute("SHOW ENGINE INNODB STATUS")
                result = cursor.fetchone()
                return result[2] if result elseNone
        finally:
            conn.close()

    def parse_deadlock(self, status):
        """解析死锁信息"""
        ifnot status:
            returnNone

        lines = status.split('
')
        in_deadlock_section = False
        deadlock_info = []
        current_section = []

        for line in lines:
            if'LATEST DETECTED DEADLOCK'in line:
                in_deadlock_section = True
                continue

            if in_deadlock_section:
                if line.startswith('---') and'TRANSACTION'notin line:
                    if current_section:
                        deadlock_info.append('
'.join(current_section))
                        current_section = []
                    continue

                if'WE ROLL BACK'in line:
                    current_section.append(line)
                    deadlock_info.append('
'.join(current_section))
                    break

                current_section.append(line)

        return'

'.join(deadlock_info) if deadlock_info elseNone

    def get_lock_waits(self):
        """获取当前锁等待情况"""
        conn = pymysql.connect(**self.conn_params)
        try:
            with conn.cursor(pymysql.cursors.DictCursor) as cursor:
                sql = """
                    SELECT
                        r.trx_id AS waiting_trx_id,
                        r.trx_mysql_thread_id AS waiting_thread,
                        TIMESTAMPDIFF(SECOND, r.trx_wait_started, NOW()) AS wait_seconds,
                        r.trx_query AS waiting_query,
                        b.trx_id AS blocking_trx_id,
                        b.trx_mysql_thread_id AS blocking_thread,
                        b.trx_query AS blocking_query
                    FROM performance_schema.data_lock_waits w
                    INNER JOIN information_schema.innodb_trx b
                        ON b.trx_id = w.BLOCKING_ENGINE_TRANSACTION_ID
                    INNER JOIN information_schema.innodb_trx r
                        ON r.trx_id = w.REQUESTING_ENGINE_TRANSACTION_ID
                """
                cursor.execute(sql)
                return cursor.fetchall()
        finally:
            conn.close()

    def alert(self, title, content):
        """发送告警"""
        print(f"[ALERT] {title}")
        print(content)

        if self.alert_webhook:
            try:
                payload = {
                    'msgtype': 'markdown',
                    'markdown': {
                        'title': title,
                        'text': f"## {title}

{content}"
                    }
                }
                requests.post(self.alert_webhook, json=payload, timeout=5)
            except Exception as e:
                print(f"发送告警失败: {e}")

    def check_deadlock(self):
        """检查死锁"""
        status = self.get_innodb_status()
        deadlock_info = self.parse_deadlock(status)

        if deadlock_info and deadlock_info != self.last_deadlock_info:
            self.last_deadlock_info = deadlock_info
            self.alert(
                "MySQL检测到死锁",
                f"**时间**: {datetime.now()}

**详情**:
```
{deadlock_info[:2000]}
```"
            )
            returnTrue
        returnFalse

    def check_lock_waits(self, threshold_seconds=30):
        """检查长时间锁等待"""
        lock_waits = self.get_lock_waits()

        for wait in lock_waits:
            if wait['wait_seconds'] and wait['wait_seconds'] > threshold_seconds:
                self.alert(
                    "MySQL锁等待超时",
                    f"**等待时间**: {wait['wait_seconds']}秒
"
                    f"**等待线程**: {wait['waiting_thread']}
"
                    f"**等待SQL**: {wait['waiting_query']}
"
                    f"**阻塞线程**: {wait['blocking_thread']}
"
                    f"**阻塞SQL**: {wait['blocking_query']}"
                )

    def run(self, interval=10):
        """运行监控"""
        print(f"死锁监控已启动,检查间隔: {interval}秒")

        whileTrue:
            try:
                self.check_deadlock()
                self.check_lock_waits(threshold_seconds=30)
            except Exception as e:
                print(f"监控异常: {e}")

            time.sleep(interval)


if __name__ == '__main__':
    monitor = DeadlockMonitor(
        host='192.168.1.11',
        user='monitor',
        password='password',
        database='lock_demo',
        alert_webhook='https://your-webhook-url.com'
    )
    monitor.run()

 

3.2.3 事务超时和慢事务监控

 

-- 查询运行时间超过指定秒数的事务
SELECT
    trx_id,
    trx_mysql_thread_id AS thread_id,
    trx_state,
    trx_started,
    TIMESTAMPDIFF(SECOND, trx_started, NOW()) AS running_seconds,
    trx_rows_locked,
    trx_rows_modified,
    trx_lock_structs,
    trx_query
FROM information_schema.innodb_trx
WHERETIMESTAMPDIFF(SECOND, trx_started, NOW()) > 60
ORDERBY running_seconds DESC;

-- 查询持有锁最多的事务
SELECT
    trx_id,
    trx_mysql_thread_id,
    trx_rows_locked,
    trx_lock_structs,
    trx_tables_locked,
    trx_query
FROM information_schema.innodb_trx
ORDERBY trx_rows_locked DESC
LIMIT10;

-- 查询锁定行数最多的表
SELECT
    object_schema,
    object_name,
    COUNT(*) as lock_count
FROM performance_schema.data_locks
WHERE lock_type = 'RECORD'
GROUPBY object_schema, object_name
ORDERBY lock_count DESC;

-- 创建慢事务告警存储过程
DELIMITER //

CREATEPROCEDURE check_slow_transactions(IN threshold_seconds INT)
BEGIN
    DECLARE done INTDEFAULTFALSE;
    DECLARE v_trx_id VARCHAR(100);
    DECLARE v_thread_id BIGINT;
    DECLARE v_running_seconds INT;
    DECLARE v_query TEXT;

    DECLARE cur CURSORFOR
        SELECT
            trx_id,
            trx_mysql_thread_id,
            TIMESTAMPDIFF(SECOND, trx_started, NOW()),
            trx_query
        FROM information_schema.innodb_trx
        WHERETIMESTAMPDIFF(SECOND, trx_started, NOW()) > threshold_seconds;

    DECLARE CONTINUE HANDLERFORNOTFOUNDSET done = TRUE;

    -- 创建告警日志表
    CREATETABLEIFNOTEXISTS slow_transaction_log (
        idBIGINT AUTO_INCREMENT PRIMARY KEY,
        trx_id VARCHAR(100),
        thread_id BIGINT,
        running_seconds INT,
        queryTEXT,
        logged_at DATETIME DEFAULTCURRENT_TIMESTAMP
    );

    OPEN cur;

    read_loop: LOOP
        FETCH cur INTO v_trx_id, v_thread_id, v_running_seconds, v_query;
        IF done THEN
            LEAVE read_loop;
        ENDIF;

        -- 记录慢事务
        INSERTINTO slow_transaction_log (trx_id, thread_id, running_seconds, query)
        VALUES (v_trx_id, v_thread_id, v_running_seconds, v_query);

    ENDLOOP;

    CLOSE cur;
END //

DELIMITER ;

-- 使用Event定期检查
CREATEEVENTIFNOTEXISTS check_slow_transactions_event
ON SCHEDULE EVERY 1MINUTE
DOCALL check_slow_transactions(60);

 

四、最佳实践和注意事项

4.1 最佳实践

4.1.1 事务设计原则

 

-- 1. 事务尽量短小
-- 差:大事务
STARTTRANSACTION;
-- 处理100万条记录
UPDATE orders SETstatus = 1WHERE created_at < '2024-01-01';  -- 锁定大量行
COMMIT;

-- 好:分批处理
DELIMITER //
CREATEPROCEDURE batch_update_orders()
BEGIN
    DECLARE affected_rows INTDEFAULT1;
    DECLARE batch_size INTDEFAULT1000;

    WHILE affected_rows > 0 DO
        STARTTRANSACTION;

        UPDATE orders
        SETstatus = 1
        WHERE created_at < '2024-01-01'ANDstatus = 0
        LIMIT batch_size;

        SET affected_rows = ROW_COUNT();
        COMMIT;

        -- 短暂暂停,避免长时间占用资源
        DOSLEEP(0.1);
    ENDWHILE;
END //
DELIMITER ;

-- 2. 避免在事务中进行耗时操作
-- 差:事务中调用外部接口
STARTTRANSACTION;
INSERTINTO orders (...) VALUES (...);
-- 调用支付接口(可能需要几秒)
-- 长时间持有锁
COMMIT;

-- 好:先准备数据,再开启事务
-- 准备阶段(无事务)
-- 调用支付接口,获取结果

STARTTRANSACTION;
INSERTINTO orders (...) VALUES (...);  -- 快速完成
INSERTINTO payments (...) VALUES (...);
COMMIT;

-- 3. 按固定顺序访问资源
-- 统一按主键升序访问,避免死锁

 

4.1.2 锁优化策略

 

-- 1. 尽量使用索引访问数据
-- 差:无索引导致锁表
UPDATE orders SETstatus = 1WHERE order_date = '2024-01-01';
-- 如果order_date没有索引,可能锁定大量行

-- 好:有索引时锁定范围精确
CREATEINDEX idx_order_date ON orders(order_date);
UPDATE orders SETstatus = 1WHERE order_date = '2024-01-01';

-- 2. 减少锁定范围
-- 差:锁定所有匹配的行
SELECT * FROM orders WHERE user_id = 1FORUPDATE;

-- 好:只锁定需要的行
SELECT * FROM orders WHERE user_id = 1ANDstatus = 0FORUPDATE;

-- 3. 合理使用锁模式
-- 只读场景使用共享锁
SELECT * FROM orders WHEREid = 1LOCKINSHAREMODE;

-- 需要修改时才使用排他锁
SELECT * FROM orders WHEREid = 1FORUPDATE;

-- 4. 避免锁升级
-- 差:从共享锁升级到排他锁可能导致死锁
SELECT * FROM orders WHEREid = 1LOCKINSHAREMODE;
-- 后续需要更新...
UPDATE orders SETstatus = 1WHEREid = 1;  -- 可能死锁

-- 好:直接使用排他锁
SELECT * FROM orders WHEREid = 1FORUPDATE;
UPDATE orders SETstatus = 1WHEREid = 1;

 

4.1.3 隔离级别选择

 

-- 不同场景的隔离级别推荐

-- 1. 高并发读写场景:READ COMMITTED
-- 优点:锁范围小,不使用间隙锁
-- 缺点:可能出现不可重复读
SETSESSIONTRANSACTIONISOLATIONLEVELREAD COMMITTED;

-- 2. 金融交易场景:REPEATABLE READ(默认)
-- 优点:一致性读,防止幻读
-- 缺点:间隙锁可能导致更多死锁
SETSESSIONTRANSACTIONISOLATIONLEVEL REPEATABLE READ;

-- 3. 报表查询场景:使用一致性快照
STARTTRANSACTIONWITHCONSISTENTSNAPSHOT;
SELECT * FROM orders WHERE ...;
-- 读取的是事务开始时的快照,不会被其他事务影响
COMMIT;

-- 4. 批量导入场景:可以临时使用READ UNCOMMITTED
SETSESSIONTRANSACTIONISOLATIONLEVELREAD UNCOMMITTED;
-- 导入完成后恢复
SETSESSIONTRANSACTIONISOLATIONLEVEL REPEATABLE READ;

 

4.2 注意事项

4.2.1 配置注意

配置项 建议值 说明
innodb_lock_wait_timeout 10-50 锁等待超时,根据业务调整
innodb_deadlock_detect ON 开启死锁检测
innodb_print_all_deadlocks ON 记录所有死锁到错误日志
transaction_isolation READ-COMMITTED / REPEATABLE-READ 根据场景选择
innodb_rollback_on_timeout OFF 超时时只回滚当前语句,不回滚整个事务
autocommit ON 默认开启自动提交

4.2.2 常见错误

错误类型 错误信息 原因分析 解决方案
死锁 Deadlock found 循环等待 固定访问顺序
锁超时 Lock wait timeout exceeded 持锁时间过长 减小事务,增加超时
事务太大 Transaction too large 修改行数过多 分批处理
表锁 Table lock wait 无索引导致表锁 添加适当索引
间隙锁冲突 Conflict on gap lock 并发插入同一间隙 降低隔离级别

4.2.3 死锁预防清单

 

开发阶段:
  -固定访问顺序:多表操作按表名或主键排序
-减小事务范围:只在必要时开启事务
-使用低隔离级别:非必要不用REPEATABLEREAD
-添加必要索引:避免全表扫描锁定

部署阶段:
-开启死锁检测:innodb_deadlock_detect=ON
-设置合理超时:innodb_lock_wait_timeout=10
-记录死锁日志:innodb_print_all_deadlocks=ON
-配置监控告警:死锁次数、锁等待时间

运维阶段:
-定期分析死锁:查看SHOWENGINEINNODBSTATUS
-监控长事务:超过60秒的事务告警
-监控锁等待:等待超过10秒告警
-分析慢查询:优化持锁时间长的SQL

 

五、故障排查和监控

5.1 故障排查

5.1.1 死锁分析

 

-- 查看最近的死锁信息
SHOWENGINEINNODBSTATUSG

-- 输出中的关键部分:
-- LATEST DETECTED DEADLOCK
-- ------------------------
-- 2024-01-01 1000 0x7f...
-- *** (1) TRANSACTION:
-- TRANSACTION 12345, ACTIVE 1 sec starting index read
-- mysql tables in use 1, locked 1
-- LOCK WAIT 3 lock struct(s), heap size 1136, 2 row lock(s)
-- MySQL thread id 100, OS thread handle 123, query id 456 192.168.1.10 app_user updating
-- UPDATE accounts SET balance = balance - 100 WHERE user_id = 1
--
-- *** (1) WAITING FOR THIS LOCK TO BE GRANTED:
-- RECORD LOCKS space id 123 page no 3 n bits 72 index uk_user_id of table `db`.`accounts`
-- trx id 12345 lock_mode X locks rec but not gap waiting
--
-- *** (2) TRANSACTION:
-- TRANSACTION 12346, ACTIVE 1 sec starting index read
-- ...
--
-- *** (2) HOLDS THE LOCK(S):
-- RECORD LOCKS space id 123 page no 3 n bits 72 index uk_user_id of table `db`.`accounts`
-- ...
--
-- *** (2) WAITING FOR THIS LOCK TO BE GRANTED:
-- ...
--
-- *** WE ROLL BACK TRANSACTION (1)

-- 分析步骤:
-- 1. 找到两个事务的SQL
-- 2. 分析锁等待关系
-- 3. 确定死锁原因
-- 4. 制定解决方案

 

死锁日志解读

 

# 从错误日志中提取死锁信息
grep -A 100 "LATEST DETECTED DEADLOCK" /var/log/mysql/error.log | head -100

# 使用pt-deadlock-logger记录死锁
pt-deadlock-logger --host=localhost --user=root --password=xxx 
    --dest h=localhost,D=monitor,t=deadlocks 
    --run-time=1h

 

5.1.2 锁等待分析

 

-- 查看当前锁等待
SELECT
    waiting.trx_id AS waiting_trx_id,
    waiting.trx_mysql_thread_id AS waiting_thread,
    waiting.trx_query AS waiting_query,
    TIMESTAMPDIFF(SECOND, waiting.trx_wait_started, NOW()) AS waiting_seconds,
    blocking.trx_id AS blocking_trx_id,
    blocking.trx_mysql_thread_id AS blocking_thread,
    blocking.trx_query AS blocking_query,
    TIMESTAMPDIFF(SECOND, blocking.trx_started, NOW()) AS blocking_duration
FROM information_schema.innodb_trx waiting
INNERJOIN performance_schema.data_lock_waits dlw
    ON waiting.trx_id = dlw.REQUESTING_ENGINE_TRANSACTION_ID
INNERJOIN information_schema.innodb_trx blocking
    ON blocking.trx_id = dlw.BLOCKING_ENGINE_TRANSACTION_ID;

-- 查看锁的详细信息
SELECT
    dl.ENGINE_LOCK_ID,
    dl.ENGINE_TRANSACTION_ID,
    dl.OBJECT_SCHEMA,
    dl.OBJECT_NAME,
    dl.INDEX_NAME,
    dl.LOCK_TYPE,
    dl.LOCK_MODE,
    dl.LOCK_STATUS,
    dl.LOCK_DATA
FROM performance_schema.data_locks dl;

-- 终止阻塞事务(谨慎使用)
-- 先确认阻塞线程ID
KILL12345;

 

5.1.3 长事务分析

 

-- 查找运行时间最长的事务
SELECT
    trx_id,
    trx_mysql_thread_id AS thread_id,
    trx_state,
    trx_started,
    NOW() - trx_started AS running_time,
    trx_rows_locked,
    trx_rows_modified,
    trx_tables_in_use,
    trx_tables_locked,
    trx_query
FROM information_schema.innodb_trx
ORDERBY trx_started ASC;

-- 查看事务对应的连接信息
SELECT
    t.trx_id,
    t.trx_mysql_thread_id,
    p.user,
    p.host,
    p.db,
    p.command,
    p.time,
    p.state,
    t.trx_query
FROM information_schema.innodb_trx t
INNERJOIN information_schema.processlist p
    ON t.trx_mysql_thread_id = p.id;

-- 查看事务的undo日志量(判断回滚代价)
SELECT
    trx_id,
    trx_undo_record_size,
    trx_undo_record_size / 1024 / 1024AS undo_mb
FROM information_schema.innodb_trx
WHERE trx_undo_record_size > 0;

 

5.2 性能监控

5.2.1 关键指标

 

-- InnoDB锁相关指标
SHOWGLOBALSTATUSLIKE'Innodb_row_lock%';
-- Innodb_row_lock_current_waits: 当前等待锁的数量
-- Innodb_row_lock_time: 总锁等待时间(毫秒)
-- Innodb_row_lock_time_avg: 平均锁等待时间
-- Innodb_row_lock_time_max: 最大锁等待时间
-- Innodb_row_lock_waits: 总锁等待次数

-- 死锁次数
SHOWGLOBALSTATUSLIKE'Innodb_deadlocks';

-- 锁内存使用
SHOWGLOBALSTATUSLIKE'Innodb_row_lock_memory';

-- 计算锁争用率
SELECT
    (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Innodb_row_lock_waits') /
    (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Questions') * 100
AS lock_contention_percent;

 

5.2.2 监控指标表

指标类别 指标名称 含义 告警阈值
死锁 Innodb_deadlocks 死锁累计次数 增长率 > 1/min
锁等待 Innodb_row_lock_waits 锁等待累计次数 增长率 > 10/sec
锁时间 Innodb_row_lock_time_avg 平均锁等待时间(ms) > 1000
当前等待 Innodb_row_lock_current_waits 当前等待锁数量 > 10
长事务 运行超过60秒的事务 长事务数量 > 0
锁表 Tables_locks_waited 表锁等待次数 > 0

5.2.3 Prometheus告警规则

 

groups:
  -name:mysql-lock-alerts
    rules:
      -alert:MySQLDeadlocks
        expr:increase(mysql_global_status_innodb_deadlocks[5m])>0
        for:1m
        labels:
          severity:warning
        annotations:
          summary:"MySQL发生死锁"
          description:"{{ $labels.instance }} 在过去5分钟内发生 {{ $value }} 次死锁"

      -alert:MySQLHighLockWaits
        expr:rate(mysql_global_status_innodb_row_lock_waits[5m])>10
        for:5m
        labels:
          severity:warning
        annotations:
          summary:"MySQL锁等待频繁"
          description:"{{ $labels.instance }} 锁等待率为 {{ $value }}/秒"

      -alert:MySQLLongLockWait
        expr:mysql_global_status_innodb_row_lock_time_avg>1000
        for:5m
        labels:
          severity:warning
        annotations:
          summary:"MySQL锁等待时间过长"
          description:"{{ $labels.instance }} 平均锁等待时间 {{ $value }}ms"

      -alert:MySQLLongTransaction
        expr:mysql_info_schema_innodb_trx_running_seconds>60
        for:1m
        labels:
          severity:critical
        annotations:
          summary:"MySQL存在长事务"
          description:"{{ $labels.instance }} 存在运行超过60秒的事务"

 

5.3 备份与恢复

5.3.1 事务日志备份

 

#!/bin/bash
# MySQL binlog备份脚本

BACKUP_DIR="/data/backup/binlog"
MYSQL_USER="backup"
MYSQL_PASS="password"
MYSQL_HOST="localhost"
RETENTION_DAYS=7

mkdir -p $BACKUP_DIR

# 获取当前binlog文件列表
mysql -u$MYSQL_USER -p$MYSQL_PASS -h$MYSQL_HOST -e "SHOW BINARY LOGS;" | 
    tail -n +2 | awk '{print $1}' | whileread binlog; do

    # 复制binlog到备份目录
    if [ ! -f "$BACKUP_DIR/$binlog" ]; then
        mysqlbinlog -u$MYSQL_USER -p$MYSQL_PASS -h$MYSQL_HOST 
            --read-from-remote-server $binlog > $BACKUP_DIR/$binlog.sql
        gzip $BACKUP_DIR/$binlog.sql
        echo"备份 $binlog 完成"
    fi
done

# 清理过期备份
find $BACKUP_DIR -name "*.sql.gz" -mtime +$RETENTION_DAYS -delete

echo"Binlog备份完成"

 

5.3.2 死锁恢复流程

 

-- 死锁后的恢复步骤

-- 1. 确认事务状态
SELECT
    trx_id,
    trx_state,
    trx_started,
    trx_mysql_thread_id,
    trx_query
FROM information_schema.innodb_trx;

-- 2. 如果事务被回滚,应用程序需要重试
-- 检查应用程序的重试逻辑

-- 3. 如果需要手动回滚
ROLLBACK;

-- 4. 检查数据一致性
-- 根据业务逻辑验证数据

-- 5. 分析死锁原因
SHOWENGINEINNODBSTATUSG
-- 找到LATEST DETECTED DEADLOCK部分

-- 6. 记录和上报
-- 将死锁信息记录到监控系统

 

六、总结

6.1 技术要点回顾

MySQL事务与锁机制的核心要点:

1. ACID特性

原子性:通过undo log实现

一致性:通过约束和应用逻辑保证

隔离性:通过锁和MVCC实现

持久性:通过redo log保证

2. 锁类型

行锁:记录锁、间隙锁、临键锁

表锁:意向锁、MDL锁

锁模式:共享锁、排他锁

3. 死锁处理

预防:固定访问顺序、减小事务

检测:innodb_deadlock_detect

恢复:自动回滚一个事务

4. 最佳实践

事务尽量短小

按固定顺序访问资源

合理选择隔离级别

使用合适的锁策略

6.2 进阶学习方向

方向 内容 推荐资源
MVCC原理 版本链、ReadView机制 《MySQL技术内幕:InnoDB存储引擎》
锁算法 B+树锁定协议 MySQL源码
分布式事务 XA、TCC、SAGA Seata框架文档
死锁检测算法 等待图、超时检测 数据库系统概论
性能调优 锁粒度优化 Percona博客

6.3 参考资料

MySQL官方文档:https://dev.mysql.com/doc/

《MySQL技术内幕:InnoDB存储引擎》第2版

《高性能MySQL》第4版

Percona Blog:https://www.percona.com/blog/

附录

A. 命令速查表

命令 说明 示例
START TRANSACTION 开始事务 START TRANSACTION;
COMMIT 提交事务 COMMIT;
ROLLBACK 回滚事务 ROLLBACK;
SELECT ... FOR UPDATE 排他锁查询 SELECT * FROM t WHERE id=1 FOR UPDATE;
SELECT ... FOR SHARE 共享锁查询 SELECT * FROM t WHERE id=1 FOR SHARE;
SHOW ENGINE INNODB STATUS 查看InnoDB状态 SHOW ENGINE INNODB STATUSG
KILL 终止连接 KILL 12345;

B. 配置参数详解

参数 默认值 说明 建议
transaction_isolation REPEATABLE-READ 默认隔离级别 根据场景选择
innodb_lock_wait_timeout 50 锁等待超时(秒) 10-30
innodb_deadlock_detect ON 死锁检测开关 ON
innodb_print_all_deadlocks OFF 记录所有死锁 ON
innodb_rollback_on_timeout OFF 超时回滚整个事务 OFF
autocommit ON 自动提交 ON

C. 术语表

术语 英文 说明
脏读 Dirty Read 读取未提交的数据
不可重复读 Non-Repeatable Read 同一事务两次读取结果不同
幻读 Phantom Read 查询结果集行数变化
死锁 Deadlock 循环等待锁
间隙锁 Gap Lock 锁定索引间隙
临键锁 Next-Key Lock 记录锁+间隙锁
MVCC Multi-Version Concurrency Control 多版本并发控制
Undo Log - 回滚日志
Redo Log - 重做日志
两阶段锁 Two-Phase Locking 加锁和解锁分两阶段

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分