数据库事件触发的设置和应用

描述

数据库无论对于生产管理还是很多的实际应用都非常重要。小编这次聊一下数据库事件触发的应用。示例使用了postgresql和Python。在本文中,事件触发和处理大概地分为两类:

数据库的事件触发和服务器内部处理(1~4)

数据库事件触发后,客户端的程序检测到该事件的触发对应的处理(5~6)

  在数据库系统中,事件触发(通常指数据库触发器)以及读取事件触发的信息用于多种场景和需求。请看两组示例(1~4)和(5~6)。

1. 数据一致性和完整性维护

当对数据库表中的数据进行插入、更新或删除操作时,需要自动验证或调整相关数据,以确保它们符合业务规则或约束。例如,在一个订单管理系统中,如果库存数量减少到一定阈值以下,可以触发一个警告或补货请求。

Step 1-1: 创建数据库表

假设我们有一个 inventory 表,它保存产品库存的信息:

 

CREATE TABLE inventory (
    product_id SERIAL PRIMARY KEY,
    product_name TEXT NOT NULL,
    quantity INT NOT NULL
);

 

Step 1-2: 创建触发函数

创建一个 PL/pgSQL 函数,用于检查库存数量并记录警告信息:

CREATE OR REPLACE FUNCTION check_inventory_threshold()
RETURNS TRIGGER AS $$
BEGIN
    IF NEW.quantity < 10 THEN  -- 假设 10 是阈值
        -- 在此处记录警告或使用某种方式发送通知
        RAISE WARNING 'Product % is below threshold with quantity %', NEW.product_name, NEW.quantity;
        -- 可在此插入补货请求或通知操作
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

 

Step 1-3: 创建触发器

设置一个触发器,更新 inventory 表时调用触发函数:

CREATE TRIGGER inventory_check_trigger
AFTER INSERT OR UPDATE ON inventory
FOR EACH ROW EXECUTE PROCEDURE check_inventory_threshold();
Step 1-4: 使用 Python 进行外部操作

 

一个Python脚本可以用于监控警告并执行更复杂的操作,比如发送电子邮件或自动创建补货单。以下是一个简单的Python示例,假设你将警告日志记录到一个专门的日志表中:

 

import psycopg2
from smtplib import SMTP


def send_notification(product_name, quantity):
    # 发送邮件通知逻辑(确保你已设置SMTP服务器配置)
    with SMTP('smtp.example.com') as smtp:
        smtp.sendmail('from@example.com', 'to@example.com',
                      f'Subject: Inventory Alert

Product {product_name} is below threshold with quantity {quantity}.') def check_and_notify(): try: # Connect to PostgreSQL database connection = psycopg2.connect( host="localhost", database="your_database", user="your_user", password="your_password" ) cursor = connection.cursor() # Query to check logs for low inventory query = """ SELECT product_name, quantity FROM inventory WHERE quantity < 10; """ cursor.execute(query) low_stock_items = cursor.fetchall() for product_name, quantity in low_stock_items: send_notification(product_name, quantity) except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() # Run the check and notify function check_and_notify()

 

 

2. 自动化任务

自动执行某些日常任务,如记录变化、生成日志或进行审计。当某个表中的数据被修改时,触发器可以自动记录修改前后的数据以供审计,当对特定表进行插入、更新或删除操作时,触发器能够捕捉这些事件,并执行相关的处理逻辑。 下面是一个如何使用 PostgreSQL 触发器来记录数据变化的示例。假设我们有一个名为 employee_data 的表,我们希望记录每次数据更新时的操作者信息。

2-1. 创建一个用于日志记录的表

首先,需要新建一个用于存储变更日志的表。假设我们有一个名为employee_data 的表,我们希望记录每次数据更新时的操作者信息。

CREATE TABLE change_log (
    id SERIAL PRIMARY KEY,
    table_name TEXT,
    operation VARCHAR(10),
    changed_by TEXT,
    change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    old_data JSONB,
    new_data JSONB
);

 

2-2. 创建一个触发函数

接下来,创建一个触发函数。当 employee_data 表发生变化时,调用该函数来记录变更,检测并获取 old_data 和 new_data,然后通过 row_to_json 函数将其转换为 JSONB 格式存入日志表中。处理中请留意不同的操作对应的日志记录内容的差异。

CREATE OR REPLACE FUNCTION log_employee_data_changes()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'DELETE' THEN
        INSERT INTO change_log (table_name, operation, changed_by, old_data)
        VALUES (
            TG_TABLE_NAME,
            TG_OP,
            SESSION_USER,
            row_to_json(OLD)
        );
    ELSE
        INSERT INTO change_log (table_name, operation, changed_by, old_data, new_data)
        VALUES (
            TG_TABLE_NAME,
            TG_OP,
            SESSION_USER,
            row_to_json(OLD),
            row_to_json(NEW)
        );
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

TG_OP 是 PostgreSQL 触发器函数中的一个特殊变量。在触发器函数中,TG_OP 用于表示触发事件的操作类型。它会被设置为以下字符串值之一,以标识触发器是由哪种数据库操作激活的:

 

'INSERT': 触发器是由插入操作激活的。

'UPDATE': 触发器是由更新操作激活的。

'DELETE': 触发器是由删除操作激活的.

'TRUNCATE': 触发器是由截断操作激活的。

在触发器函数中使用 TG_OP,可以根据不同的操作类型执行不同的逻辑。   2-3. 创建触发器

最后,为 employee_data 表创建一个触发器,当发生 INSERT、UPDATE 或 DELETE 操作时调用触发函数:

 

CREATE TRIGGER employee_data_changes
AFTER INSERT OR UPDATE OR DELETE ON employee_data
FOR EACH ROW EXECUTE PROCEDURE log_employee_data_changes();

 

2-4. 如果没有对应的表employee_data,就建一个来测试

 

CREATE TABLE employee_data (
    employee_id SERIAL PRIMARY KEY,      -- 员工唯一标识
    first_name VARCHAR(50) NOT NULL,     -- 员工的名字
    last_name VARCHAR(50) NOT NULL,      -- 员工的姓氏
    email VARCHAR(100) UNIQUE NOT NULL,  -- 员工的电子邮件地址
    phone_number VARCHAR(15),            -- 员工的联系电话
    hire_date DATE NOT NULL,             -- 入职日期
    job_title VARCHAR(50),               -- 职位名称
    department VARCHAR(50),              -- 所属部门
    salary NUMERIC(10, 2),               -- 薪水
    manager_id INT,                      -- 上级主管ID,指向另一个员工
    CONSTRAINT fk_manager
        FOREIGN KEY(manager_id) 
        REFERENCES employee_data(employee_id) 
        ON DELETE SET NULL
);

 

2-5. 如果表中没有数据就添加一条来测试

 

INSERT INTO employee_data (
    first_name,
    last_name,
    email,
    phone_number,
    hire_date,
    job_title,
    department,
    salary,
    manager_id
) VALUES (
    'ZZZ',                   -- First name
    'AAA',                    -- Last name
    'ZZZ.AAA@example.com',   -- Email address
    '123-456-7890',           -- Phone number
    '2023-11-01',             -- Hire date
    'Engineer',               -- Job title
    'Engineering',            -- Department
    75000,                    -- Salary
    NULL                      -- Manager ID (assuming no manager or manager not yet assigned)
);

 

 

3. 跨表更新或同步:

当一个表发生变化时时,触发器可以用于自动更新或同步其他表的数据。例如,在一个多表关联的系统中,有一个订单表order和一个库存表inventory,如果订单表中数据有变化,就触发更新库存表中的对应产品的数据。 3.1 建表示例

CREATE TABLE inventory (
    product_id SERIAL PRIMARY KEY,
    product_name VARCHAR(100),
    stock_quantity INT NOT NULL
);


CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    product_id INT REFERENCES inventory(product_id),
    quantity INT NOT NULL
);

 

 

3.2 创建触发事件

当order表中已经发生insert,updat或者delete事件时,就触发下面的函数运行。实际数据的加减操作,请根据实际关系进行调整。这里的简单逻辑是:

有新订单添加时,就在库存表中减少产品库存数

订单数据有更新时,就把库存表中减去更新后订单表中对应产品的订单数据,然后加上更新前订单表中对应产品的数据

当订单取消(删除)时,就会在库存数据上增加之订单表中删除的旧数据

 

CREATE OR REPLACE FUNCTION update_inventory()
RETURNS TRIGGER AS $$
BEGIN
    IF TG_OP = 'INSERT' THEN
        UPDATE inventory
        SET stock_quantity = stock_quantity - NEW.quantity
        WHERE product_id = NEW.product_id;
    ELSIF TG_OP = 'UPDATE' THEN
        UPDATE inventory
        SET stock_quantity = stock_quantity - NEW.quantity + OLD.quantity
        WHERE product_id = NEW.product_id;
    ELSIF TG_OP = 'DELETE' THEN
        UPDATE inventory
        SET stock_quantity = stock_quantity + OLD.quantity
        WHERE product_id = OLD.product_id;
    END IF;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

 

3.3 创建事件触发器

 

CREATE TRIGGER trigger_orders_update
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE PROCEDURE update_inventory();

 

 

(防止出现视觉疲劳)

4. 安全性检查和防护

执行安全性检查,如防止未授权的数据更改或异常数据输入。如果有可疑活动或不当数据修改,触发器可以自动拒绝操作或生成警告。假设你有一个敏感数据的表,如 sensitive_data,需要确保只有授权用户才能更新数据。 4.1 建表sensitive_data示例

CREATE TABLE sensitive_data (
    id SERIAL PRIMARY KEY,
    data TEXT NOT NULL,
    last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

 

4.2 创建触发函数进行安全检查

创建一个触发函数来检查是否是授权用户在做修改。

CREATE OR REPLACE FUNCTION check_user_authorization()
RETURNS TRIGGER AS $$
BEGIN
    -- 简单检查:用户是否在允许的列表中(实际应该更加复杂)
    IF SESSION_USER <> 'authorized_user' THEN
        RAISE EXCEPTION 'Unauthorized user. Access denied.';
    END IF;
    -- 更新 last_modified 时间戳
    NEW.last_modified := CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

 

4.3 为表创建触发器

 

CREATE TRIGGER secure_update_trigger
BEFORE UPDATE ON sensitive_data
FOR EACH ROW EXECUTE PROCEDURE check_user_authorization();

 

该事件触发器的功能说明

功能:这个示例功能是,当有人试图更新 sensitive_data 表中的数据时,触发器函数 check_user_authorization() 会自动检查发起更新的数据库用户是否有权限。如果没有权限,抛出异常并阻止操作。

扩展:在实际的生产环境中,这种安全性检查会更复杂,可能包括日志记录、详细的用户权限检查、使用角色来管理权限等。

安全性:使用触发器确保只有合适和经过验证的用户可以进行关键数据修改,这是保护数据完整性的一部分。

审计:这种自动检查可集成到更大的审计框架中,以全面监控和存储所有数据修改尝试记录。

 

5. 事件通知(客户端程序配合事件触发的同步处理方式)

使用事件触发器和事件通知来实现对特定数据库事件的响应和处理。使用 LISTEN 和 NOTIFY 机制,数据库客户端可以监听特定的通道,并在触发器函数中发送通知。这在需要实时监控数据库事件时非常有用。下面是一个使用 PostgreSQL 实现事件通知的示例。

假设我们希望在 orders 表中插入新订单时发送通知,以便外部系统或服务进行相应处理。

5.1 建一个orders表方便示例

CREATE TABLE orders (
    order_id SERIAL PRIMARY KEY,
    product_id INT NOT NULL,
    quantity INT NOT NULL,
    order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
触发器可以用于事件通知,例如在数据变化时发送电子邮件通知相关人员。这在实时监控和响应系统中非常有用。
5.2 建立触发函数
CREATE OR REPLACE FUNCTION notify_new_order()
RETURNS TRIGGER AS $$
BEGIN
    -- 使用 NOTIFY 发送通知,通道名为 'new_order'
    PERFORM pg_notify('new_order', 'New order placed: ' || NEW.order_id);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

5.3 创建触发器 为 orders 表创建触发器,以在插入新记录时调用触发函数。
CREATE TRIGGER notify_order_insert
AFTER INSERT ON orders
FOR EACH ROW EXECUTE PROCEDURE notify_new_order();

  5.4 使用 Python 监听通知 我们可以使用 Python 脚本来监听并处理通知。以下是一个简单的示例,使用 psycopg2 库监听 new_order 通道。
import psycopg2
import select


def listen_for_new_orders():
    try:
        # Connect to your PostgreSQL database
        connection = psycopg2.connect(
            dbname="your_db",
            user="your_user",
            password="your_password",
            host="localhost"
        )
        connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        cursor = connection.cursor()


        # Listen for notifications on the 'new_order' channel
        cursor.execute("LISTEN new_order;")
        print("Waiting for notifications on channel 'new_order'...")


        while True:
            # Use select() to wait for notification
            if select.select([connection], [], [], 5) == ([], [], []):
                print("No new notifications.")
            else:
                connection.poll()
                while connection.notifies:
                    notify = connection.notifies.pop(0)
                    print(f"Got NOTIFY: {notify.channel} - {notify.payload}")


    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
    finally:
        if connection:
            cursor.close()
            connection.close()


# Call the function to start listening for notifications
if __name__=='__main__':
    listen_for_new_orders()

 

 

  6. 事件通知(客户端程序异步多线程的方式进行检测和操作)

示例的数据库表和事件触发的设置或创建,和示例5中相同,不过这里我们要增加一些复杂度,毕竟,程序处理要尽可能避免堵塞的方式进行等待读取。这里设想另外一种使用场景:

一方面客户端要检测数据库的orders表中的数据变化;另一方面,客户端还在继续读取(或者其他操作)这个数据库中的数据。

 

import threading
import psycopg2
import select
import time


# Global flag to indicate whether the threads should continue running
running = True


def listen_for_new_orders():
    try:
        # Connect to your PostgreSQL database
        connection = psycopg2.connect(
            dbname="your_db",
            user="your_user",
            password="your_password",
            host="localhost"
        )
        connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
        cursor = connection.cursor()


        # Listen for notifications on the 'new_order' channel
        cursor.execute("LISTEN new_order;")
        print("Waiting for notifications on channel 'new_order'...")


        while running:
            # Use select() to wait for notification
            if select.select([connection], [], [], 5) == ([], [], []):
                continue
            else:
                connection.poll()
                while connection.notifies:
                    notify = connection.notifies.pop(0)
                    print(f"Got NOTIFY: {notify.channel} - {notify.payload}")


    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error: {error}")
    finally:
        if connection:
            cursor.close()
            connection.close()


def read_database_records():
    while running:
        try:
            # Example of reading from PostgreSQL
            connection = psycopg2.connect(
                dbname="your_db",
                user="your_user",
                password="your_password",
                host="localhost"
            )
            cursor = connection.cursor()


            # Example query to periodically read data (replace with actual query)
            cursor.execute("SELECT * FROM orders;")
            records = cursor.fetchall()
            for record in records:
                print(f"Order Record: {record}")


            time.sleep(10)  # Wait before reading again to simulate periodic check


        except (Exception, psycopg2.DatabaseError) as error:
            print(f"Error: {error}")
        finally:
            if connection:
                cursor.close()
                connection.close()


def main():
    try:
        # Create threads for listening and reading
        listener_thread = threading.Thread(target=listen_for_new_orders)
        reader_thread = threading.Thread(target=read_database_records)


        # Start the threads
        listener_thread.start()
        reader_thread.start()


        # Wait for both threads to complete (or terminate on Ctrl+C)
        listener_thread.join()
        reader_thread.join()


    except KeyboardInterrupt:
        # Set the running flag to False to stop the threads
        global running
        running = False
        print("Exiting...")


if __name__ == "__main__":
    main()

  请留意上面的示例python代码中,数据库的连接使用了ISOLATION_LEVEL_AUTOCOMMIT,这就意味着每次涉及到数据更改或者增加的操作,数据库将自动提交了。如果要手动方式提交,那就需要配置一个ISOLATION_LEVEL_READ_COMMITTED。 另外需要留意,前面的事件触发示例中,用了:
...
FOR EACH ROW EXECUTE PROCEDURE your_trigger_func();
...
这个代码的执行是针对每条记录的发生来触发了。请根据实际应用的操作需要进行调整。

 

 

 

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分