事务性发件箱模式:解决数据库事务和消息的一致性

参考:https://microservices.io/patterns/data/transactional-outbox.html

这是2022年10月28日创建的草稿,最终的文字内容和代码通过 ChatGPT 生成。

事务性发件箱模式是一种解决数据库事务和消息的一致性问题的方法。它的基本思想是,将消息的发送操作与数据库的更新操作绑定在一起,保证这两个操作是原子性的,要么都成功,要么都失败。

在传统的数据库应用中,数据库事务与消息的发送是独立的两个操作,如果数据库事务因为某种原因失败了,消息可能已经被发送出去了,这就导致了数据的不一致性。事务性发件箱模式的目的就是解决这个问题。

事务性发件箱模式的实现方式有多种,其中一种是将消息的发送操作与数据库事务放在同一个线程中进行。当数据库事务提交之前,将消息放入一个临时的发件箱中,如果数据库事务成功提交,则消息也会被发送出去;如果数据库事务失败,则消息也不会被发送。

另一种实现方式是在数据库事务提交之后,通过发布/订阅模式将消息发送给消息队列,然后由消息队列负责将消息发送出去。这种方式的优点是能够支持分布式系统,数据库事务和消息发送可以在不同的机器上进行。

无论哪种实现方式,事务性发件箱模式都能够保证数据库事务和消息的一致性。这对于需要处理大量数据和消息的应用来说,是非常有用的。它能够防止数据的不一致性,避免出现因消息发送失败而导致的数据错误。

架构图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
       +----------------+
| ORDER SERVICE +-----------INSERT------+
+-----+----------+ |
| |
INSERT |
+------------+----------------------------------+-----------------+
| Transaction| | |
| ORDER|TABLE OUTBOX|TABLE |
| +------>--------+------+ +------+--v------+-----+ |
| | | | | | | | | |
| +------+--------+------+ +------+--^------+-----+ |
| | |
+-----------------------------------------------+-----------------+
|
READ
|
+-------------+---------+
|JOB | |
| +---------+------+ |
| | Send Message | |
| +----------------+ |
| |
+-----------------------+

示例

  1. 在数据库中建立一张临时发件箱表,包含消息的 ID、内容和状态字段。比如:
    1
    2
    3
    4
    5
    create table outbox (
    id int auto_increment primary key,
    message varchar(255),
    status tinyint
    );
  2. 在数据库事务处理类中,在更新数据库之后,将消息写入发件箱表。比如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public class UserDao {
    private Connection conn;
    private MessageSender sender;

    public UserDao(Connection conn, MessageSender sender) {
    this.conn = conn;
    this.sender = sender;
    }

    public void updateUser(int userId, String username) throws SQLException {
    // 开启事务
    conn.setAutoCommit(false);

    try {
    // 更新数据库中的用户信息
    String sql = "update user set username = ? where id = ?";
    PreparedStatement stmt = conn.prepareStatement(sql);
    stmt.setString(1, username);
    stmt.setInt(2, userId);
    stmt.executeUpdate();

    // 将消息写入发件箱表
    sql = "insert into outbox (message, status) values (?, 0)";
    stmt = conn.prepareStatement(sql);
    stmt.setString(1, "用户 " + userId + " 的用户名已经更新为 " + username);
    stmt.executeUpdate();

    // 提交事务
    conn.commit();
    } catch (SQLException e) {
    // 回滚事务
    conn.rollback();
    throw e;
    } finally {
    // 关闭数据库连接
    conn.close();
    }
    }
    }
  3. 在发送消息的过程中,从发件箱表中取出,发送成功后删除
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    public class OutboxProcessor implements Runnable {
    private Connection conn;
    private MessageSender sender;

    public OutboxProcessor(Connection conn, MessageSender sender) {
    this.conn = conn;
    this.sender = sender;
    }

    public void run() {
    while (true) {
    try {
    // 使用独占锁锁定发件箱表中的消息
    String sql = "select * from outbox where status = 0 for update";
    PreparedStatement stmt = conn.prepareStatement(sql);
    ResultSet rs = stmt.executeQuery();

    // 遍历结果集,发送消息并删除发件箱表中的消息
    while (rs.next()) {
    // 发送消息到消息
    sender.send(rs.getString("message"));
    // 删除发件箱表中的消息
    sql = "delete from outbox where id = ?";
    PreparedStatement stmt2 = conn.prepareStatement(sql);
    stmt2.setInt(1, rs.getInt("id"));
    stmt2.executeUpdate();
    }
    // 休眠一段时间,避免频繁查询发件箱表
    Thread.sleep(1000);
    } catch (SQLException | InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }
  4. 在主程序中,创建一个 OutboxProcessor 线程,并启动它。比如:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class Main {
    public static void main(String[] args) {
    // 创建消息发送器
    MessageSender sender = new RabbitMQSender();

    // 创建数据库连接
    Connection conn = DriverManager.getConnection(...);

    // 创建 OutboxProcessor 线程
    OutboxProcessor processor = new OutboxProcessor(conn, sender);

    // 启动 OutboxProcessor 线程
    new Thread(processor).start();
    }
    }

事务性发件箱模式:解决数据库事务和消息的一致性
https://blog.mybatis.io/post/f0148245.html
作者
Liuzh
发布于
2022年12月7日
许可协议