数据库无论对于生产管理还是很多的实际应用都非常重要。小编这次聊一下数据库事件触发的应用。示例使用了postgresql和Python。在本文中,事件触发和处理大概地分为两类:
数据库的事件触发和服务器内部处理(1~4)
数据库事件触发后,客户端的程序检测到该事件的触发对应的处理(5~6)
在数据库系统中,事件触发(通常指数据库触发器)以及读取事件触发的信息用于多种场景和需求。请看两组示例(1~4)和(5~6)。
1. 数据一致性和完整性维护
当对数据库表中的数据进行插入、更新或删除操作时,需要自动验证或调整相关数据,以确保它们符合业务规则或约束。例如,在一个订单管理系统中,如果库存数量减少到一定阈值以下,可以触发一个警告或补货请求。
Step 1-1: 创建数据库表
假设我们有一个 inventory 表,它保存产品库存的信息:
CREATE TABLE inventory ( product_id SERIAL PRIMARY KEY, product_name TEXT NOT NULL, quantity INT NOT NULL );
Step 1-2: 创建触发函数
创建一个 PL/pgSQL 函数,用于检查库存数量并记录警告信息:
CREATE OR REPLACE FUNCTION check_inventory_threshold() RETURNS TRIGGER AS $$ BEGIN IF NEW.quantity < 10 THEN -- 假设 10 是阈值 -- 在此处记录警告或使用某种方式发送通知 RAISE WARNING 'Product % is below threshold with quantity %', NEW.product_name, NEW.quantity; -- 可在此插入补货请求或通知操作 END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;
Step 1-3: 创建触发器
设置一个触发器,更新 inventory 表时调用触发函数:
CREATE TRIGGER inventory_check_trigger AFTER INSERT OR UPDATE ON inventory FOR EACH ROW EXECUTE PROCEDURE check_inventory_threshold();Step 1-4: 使用 Python 进行外部操作
一个Python脚本可以用于监控警告并执行更复杂的操作,比如发送电子邮件或自动创建补货单。以下是一个简单的Python示例,假设你将警告日志记录到一个专门的日志表中:
import psycopg2 from smtplib import SMTP def send_notification(product_name, quantity): # 发送邮件通知逻辑(确保你已设置SMTP服务器配置) with SMTP('smtp.example.com') as smtp: smtp.sendmail('from@example.com', 'to@example.com', f'Subject: Inventory Alert
Product {product_name} is below threshold with quantity {quantity}.') def check_and_notify(): try: # Connect to PostgreSQL database connection = psycopg2.connect( host="localhost", database="your_database", user="your_user", password="your_password" ) cursor = connection.cursor() # Query to check logs for low inventory query = """ SELECT product_name, quantity FROM inventory WHERE quantity < 10; """ cursor.execute(query) low_stock_items = cursor.fetchall() for product_name, quantity in low_stock_items: send_notification(product_name, quantity) except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() # Run the check and notify function check_and_notify()
2. 自动化任务
自动执行某些日常任务,如记录变化、生成日志或进行审计。当某个表中的数据被修改时,触发器可以自动记录修改前后的数据以供审计,当对特定表进行插入、更新或删除操作时,触发器能够捕捉这些事件,并执行相关的处理逻辑。 下面是一个如何使用 PostgreSQL 触发器来记录数据变化的示例。假设我们有一个名为 employee_data 的表,我们希望记录每次数据更新时的操作者信息。
2-1. 创建一个用于日志记录的表
首先,需要新建一个用于存储变更日志的表。假设我们有一个名为employee_data 的表,我们希望记录每次数据更新时的操作者信息。
CREATE TABLE change_log ( id SERIAL PRIMARY KEY, table_name TEXT, operation VARCHAR(10), changed_by TEXT, change_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, old_data JSONB, new_data JSONB );
2-2. 创建一个触发函数
接下来,创建一个触发函数。当 employee_data 表发生变化时,调用该函数来记录变更,检测并获取 old_data 和 new_data,然后通过 row_to_json 函数将其转换为 JSONB 格式存入日志表中。处理中请留意不同的操作对应的日志记录内容的差异。
CREATE OR REPLACE FUNCTION log_employee_data_changes() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'DELETE' THEN INSERT INTO change_log (table_name, operation, changed_by, old_data) VALUES ( TG_TABLE_NAME, TG_OP, SESSION_USER, row_to_json(OLD) ); ELSE INSERT INTO change_log (table_name, operation, changed_by, old_data, new_data) VALUES ( TG_TABLE_NAME, TG_OP, SESSION_USER, row_to_json(OLD), row_to_json(NEW) ); END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;TG_OP 是 PostgreSQL 触发器函数中的一个特殊变量。在触发器函数中,TG_OP 用于表示触发事件的操作类型。它会被设置为以下字符串值之一,以标识触发器是由哪种数据库操作激活的:
'INSERT': 触发器是由插入操作激活的。
'UPDATE': 触发器是由更新操作激活的。
'DELETE': 触发器是由删除操作激活的.
'TRUNCATE': 触发器是由截断操作激活的。
在触发器函数中使用 TG_OP,可以根据不同的操作类型执行不同的逻辑。 2-3. 创建触发器
最后,为 employee_data 表创建一个触发器,当发生 INSERT、UPDATE 或 DELETE 操作时调用触发函数:
CREATE TRIGGER employee_data_changes AFTER INSERT OR UPDATE OR DELETE ON employee_data FOR EACH ROW EXECUTE PROCEDURE log_employee_data_changes();
2-4. 如果没有对应的表employee_data,就建一个来测试
CREATE TABLE employee_data ( employee_id SERIAL PRIMARY KEY, -- 员工唯一标识 first_name VARCHAR(50) NOT NULL, -- 员工的名字 last_name VARCHAR(50) NOT NULL, -- 员工的姓氏 email VARCHAR(100) UNIQUE NOT NULL, -- 员工的电子邮件地址 phone_number VARCHAR(15), -- 员工的联系电话 hire_date DATE NOT NULL, -- 入职日期 job_title VARCHAR(50), -- 职位名称 department VARCHAR(50), -- 所属部门 salary NUMERIC(10, 2), -- 薪水 manager_id INT, -- 上级主管ID,指向另一个员工 CONSTRAINT fk_manager FOREIGN KEY(manager_id) REFERENCES employee_data(employee_id) ON DELETE SET NULL );
2-5. 如果表中没有数据就添加一条来测试
INSERT INTO employee_data ( first_name, last_name, email, phone_number, hire_date, job_title, department, salary, manager_id ) VALUES ( 'ZZZ', -- First name 'AAA', -- Last name 'ZZZ.AAA@example.com', -- Email address '123-456-7890', -- Phone number '2023-11-01', -- Hire date 'Engineer', -- Job title 'Engineering', -- Department 75000, -- Salary NULL -- Manager ID (assuming no manager or manager not yet assigned) );
3. 跨表更新或同步:
当一个表发生变化时时,触发器可以用于自动更新或同步其他表的数据。例如,在一个多表关联的系统中,有一个订单表order和一个库存表inventory,如果订单表中数据有变化,就触发更新库存表中的对应产品的数据。 3.1 建表示例
CREATE TABLE inventory ( product_id SERIAL PRIMARY KEY, product_name VARCHAR(100), stock_quantity INT NOT NULL ); CREATE TABLE orders ( order_id SERIAL PRIMARY KEY, product_id INT REFERENCES inventory(product_id), quantity INT NOT NULL );
3.2 创建触发事件
当order表中已经发生insert,updat或者delete事件时,就触发下面的函数运行。实际数据的加减操作,请根据实际关系进行调整。这里的简单逻辑是:
有新订单添加时,就在库存表中减少产品库存数
订单数据有更新时,就把库存表中减去更新后订单表中对应产品的订单数据,然后加上更新前订单表中对应产品的数据
当订单取消(删除)时,就会在库存数据上增加之订单表中删除的旧数据
CREATE OR REPLACE FUNCTION update_inventory() RETURNS TRIGGER AS $$ BEGIN IF TG_OP = 'INSERT' THEN UPDATE inventory SET stock_quantity = stock_quantity - NEW.quantity WHERE product_id = NEW.product_id; ELSIF TG_OP = 'UPDATE' THEN UPDATE inventory SET stock_quantity = stock_quantity - NEW.quantity + OLD.quantity WHERE product_id = NEW.product_id; ELSIF TG_OP = 'DELETE' THEN UPDATE inventory SET stock_quantity = stock_quantity + OLD.quantity WHERE product_id = OLD.product_id; END IF; RETURN NEW; END; $$ LANGUAGE plpgsql;
3.3 创建事件触发器
CREATE TRIGGER trigger_orders_update AFTER INSERT OR UPDATE OR DELETE ON orders FOR EACH ROW EXECUTE PROCEDURE update_inventory();
(防止出现视觉疲劳)
4. 安全性检查和防护
执行安全性检查,如防止未授权的数据更改或异常数据输入。如果有可疑活动或不当数据修改,触发器可以自动拒绝操作或生成警告。假设你有一个敏感数据的表,如 sensitive_data,需要确保只有授权用户才能更新数据。 4.1 建表sensitive_data示例
CREATE TABLE sensitive_data ( id SERIAL PRIMARY KEY, data TEXT NOT NULL, last_modified TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
4.2 创建触发函数进行安全检查
创建一个触发函数来检查是否是授权用户在做修改。
CREATE OR REPLACE FUNCTION check_user_authorization() RETURNS TRIGGER AS $$ BEGIN -- 简单检查:用户是否在允许的列表中(实际应该更加复杂) IF SESSION_USER <> 'authorized_user' THEN RAISE EXCEPTION 'Unauthorized user. Access denied.'; END IF; -- 更新 last_modified 时间戳 NEW.last_modified := CURRENT_TIMESTAMP; RETURN NEW; END; $$ LANGUAGE plpgsql;
4.3 为表创建触发器
CREATE TRIGGER secure_update_trigger BEFORE UPDATE ON sensitive_data FOR EACH ROW EXECUTE PROCEDURE check_user_authorization();
该事件触发器的功能说明
功能:这个示例功能是,当有人试图更新 sensitive_data 表中的数据时,触发器函数 check_user_authorization() 会自动检查发起更新的数据库用户是否有权限。如果没有权限,抛出异常并阻止操作。
扩展:在实际的生产环境中,这种安全性检查会更复杂,可能包括日志记录、详细的用户权限检查、使用角色来管理权限等。
安全性:使用触发器确保只有合适和经过验证的用户可以进行关键数据修改,这是保护数据完整性的一部分。
审计:这种自动检查可集成到更大的审计框架中,以全面监控和存储所有数据修改尝试记录。
5. 事件通知(客户端程序配合事件触发的同步处理方式)
使用事件触发器和事件通知来实现对特定数据库事件的响应和处理。使用 LISTEN 和 NOTIFY 机制,数据库客户端可以监听特定的通道,并在触发器函数中发送通知。这在需要实时监控数据库事件时非常有用。下面是一个使用 PostgreSQL 实现事件通知的示例。
假设我们希望在 orders 表中插入新订单时发送通知,以便外部系统或服务进行相应处理。
5.1 建一个orders表方便示例
CREATE TABLE orders ( order_id SERIAL PRIMARY KEY, product_id INT NOT NULL, quantity INT NOT NULL, order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); 触发器可以用于事件通知,例如在数据变化时发送电子邮件通知相关人员。这在实时监控和响应系统中非常有用。5.2 建立触发函数
CREATE OR REPLACE FUNCTION notify_new_order() RETURNS TRIGGER AS $$ BEGIN -- 使用 NOTIFY 发送通知,通道名为 'new_order' PERFORM pg_notify('new_order', 'New order placed: ' || NEW.order_id); RETURN NEW; END; $$ LANGUAGE plpgsql;5.3 创建触发器 为 orders 表创建触发器,以在插入新记录时调用触发函数。
CREATE TRIGGER notify_order_insert AFTER INSERT ON orders FOR EACH ROW EXECUTE PROCEDURE notify_new_order();5.4 使用 Python 监听通知 我们可以使用 Python 脚本来监听并处理通知。以下是一个简单的示例,使用 psycopg2 库监听 new_order 通道。
import psycopg2 import select def listen_for_new_orders(): try: # Connect to your PostgreSQL database connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = connection.cursor() # Listen for notifications on the 'new_order' channel cursor.execute("LISTEN new_order;") print("Waiting for notifications on channel 'new_order'...") while True: # Use select() to wait for notification if select.select([connection], [], [], 5) == ([], [], []): print("No new notifications.") else: connection.poll() while connection.notifies: notify = connection.notifies.pop(0) print(f"Got NOTIFY: {notify.channel} - {notify.payload}") except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() # Call the function to start listening for notifications if __name__=='__main__': listen_for_new_orders()
6. 事件通知(客户端程序异步多线程的方式进行检测和操作)
示例的数据库表和事件触发的设置或创建,和示例5中相同,不过这里我们要增加一些复杂度,毕竟,程序处理要尽可能避免堵塞的方式进行等待读取。这里设想另外一种使用场景:
一方面客户端要检测数据库的orders表中的数据变化;另一方面,客户端还在继续读取(或者其他操作)这个数据库中的数据。
import threading import psycopg2 import select import time # Global flag to indicate whether the threads should continue running running = True def listen_for_new_orders(): try: # Connect to your PostgreSQL database connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) connection.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cursor = connection.cursor() # Listen for notifications on the 'new_order' channel cursor.execute("LISTEN new_order;") print("Waiting for notifications on channel 'new_order'...") while running: # Use select() to wait for notification if select.select([connection], [], [], 5) == ([], [], []): continue else: connection.poll() while connection.notifies: notify = connection.notifies.pop(0) print(f"Got NOTIFY: {notify.channel} - {notify.payload}") except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() def read_database_records(): while running: try: # Example of reading from PostgreSQL connection = psycopg2.connect( dbname="your_db", user="your_user", password="your_password", host="localhost" ) cursor = connection.cursor() # Example query to periodically read data (replace with actual query) cursor.execute("SELECT * FROM orders;") records = cursor.fetchall() for record in records: print(f"Order Record: {record}") time.sleep(10) # Wait before reading again to simulate periodic check except (Exception, psycopg2.DatabaseError) as error: print(f"Error: {error}") finally: if connection: cursor.close() connection.close() def main(): try: # Create threads for listening and reading listener_thread = threading.Thread(target=listen_for_new_orders) reader_thread = threading.Thread(target=read_database_records) # Start the threads listener_thread.start() reader_thread.start() # Wait for both threads to complete (or terminate on Ctrl+C) listener_thread.join() reader_thread.join() except KeyboardInterrupt: # Set the running flag to False to stop the threads global running running = False print("Exiting...") if __name__ == "__main__": main()请留意上面的示例python代码中,数据库的连接使用了ISOLATION_LEVEL_AUTOCOMMIT,这就意味着每次涉及到数据更改或者增加的操作,数据库将自动提交了。如果要手动方式提交,那就需要配置一个ISOLATION_LEVEL_READ_COMMITTED。 另外需要留意,前面的事件触发示例中,用了:
... FOR EACH ROW EXECUTE PROCEDURE your_trigger_func(); ...这个代码的执行是针对每条记录的发生来触发了。请根据实际应用的操作需要进行调整。
全部0条评论
快来发表一下你的评论吧 !