缓存一致性 每次逢年过节的时候抢票非常艰难,放票的时候那么多人同时去抢票,如果所有人查询、购票等都去访问数据库,那数据库的压力得有多大,这时候很多都会引入缓存, 把车票信息放入缓存,这样可以减少数据库压力。当乘客购买成功之后,数据库发生了变化,需要及时更新缓存中的数据,以便于其他乘客能从缓存中及时获取最新车票信息。这就是缓存一致性。
1、同步双写:也就是修改db的时候同时修改一下缓存,这种模式下会出现无法保证数据库与缓存的原子性。
如果出现多线程同时修改db的情况,网络延迟导致数据库修改顺序与请求顺序错位
例如:A 先操作数据库修改 x=1
B也修改数据库 x=2
但是网络延迟导致
B先修改缓存 x=2
A再修改缓存 x=1
这样就导致了数据库中x=2,而缓存中则是 x=1,导致数据库与缓存不一致。
2、设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新
优势:简单、方便
缺点:时效性差,缓存过期之前可能不一致
场景:更新频率较低,时效性要求低的业务
那我们有没有什么更加好的解决方案呢?
阿里云的canal就为我们很好的解决了这一问题:
canal: 是Alibaba旗下的一款开源项目,纯Java开发.它是基于数据库增量日志解析,提供 增量数据订阅&消费 ,目前主要支持mysql。
mysql的主从复制原理:
MySQL master 将数据变更写入二进制日志( binary log , 其中记录叫做二进制日志事件 binary log events ,可以通过 show binlog events 进行查看)
MySQL slave 将 master 的 binary log events 拷贝到它的中继日志( relay log )
MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
canal工作原理
canal模拟mysql salve的交互协议,伪装自己为mysql slave,向mysql master发送dump协议;
mysql master收到dump请求,开始推送binary log给slave(也就是canal);
canal解析binary log对象(原始byte流).
一、登进Mysql后,使用show variables like'log_bin';查询是否开启binlog,如果开启(ON),进行下一步,如果没开启(OFF),在数据库的my.ini配置文件添加配置
[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server_id=1
二、binlog开启后,创建一个canal用户并授权,官网配置是@%,表示所有服务器,所以改为localhost就可以,在mysql中,运行如下代码,设置完成之后重启:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost' identified by 'canal';
FLUSH PRIVILEGES;
三、安装canal
在conf文件夹里找到confcanal.properties
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.instance.mysql.slaveId=20 #只要和mysql的master的不一样即可
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
四、spring boot中整合canal maven依赖
< dependency >
< groupId >com.alibaba.otter< /groupId >
< artifactId >canal.client< /artifactId >
< version >1.1.4< /version >
< /dependency >
public class CanalService {
public static void main(String[] args) throws Exception{
//1.获取 canal 连接对象,我在本机上部署的,所以是127.0.0.1
CanalConnector canalConnector =
CanalConnectors.newSingleConnector(new
InetSocketAddress("127.0.0.1", 11111), "example", "", "");
System.out.println("canal启动并开始监听数据 ...... ");
while (true){
canalConnector.connect();
//订阅表 test数据库下的所有表
canalConnector.subscribe("test.*");
//获取数据
Message message = canalConnector.get(100);
//解析message
List< CanalEntry.Entry > entries = message.getEntries();
if(entries.size() <=0){
System.out.println("未检测到数据");
Thread.sleep(1000);
}
for(CanalEntry.Entry entry : entries){
//1、获取表名
String tableName = entry.getHeader().getTableName();
//2、获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//3、获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
//判断是否rowdata类型数据
if(CanalEntry.EntryType.ROWDATA.equals(entryType)){
//对第三步中的数据进行解析
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//获取当前事件的操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
//获取数据集
List< CanalEntry.RowData > rowDatasList = rowChange.getRowDatasList();
//便利数据
for(CanalEntry.RowData rowData : rowDatasList){
//数据变更之前的内容
JSONObject beforeData = new JSONObject();
List< CanalEntry.Column > beforeColumnsList = rowData.getAfterColumnsList();
for(CanalEntry.Column column : beforeColumnsList){
beforeData.put(column.getName(),column.getValue());
}
//数据变更之后的内容
List< CanalEntry.Column > afterColumnsList = rowData.getAfterColumnsList();
JSONObject afterData = new JSONObject();
for(CanalEntry.Column column : afterColumnsList){
afterData.put(column.getName(),column.getValue());
}
System.out.println("Table :" + tableName +
",eventType :" + eventType +
",beforeData :" + beforeData +
",afterData : " + afterData);
//操作缓存
}
}else {
System.out.println("当前操作类型为:" + entryType);
}
}
}
}
}
我手动在book表中操作数据,可以看到程序监控输出结果
五、 最后我们拿到数据之后可以放入消息队列,这样可以加入重试机制,还可以防止幂等问题,最后再写入缓存。
全部0条评论
快来发表一下你的评论吧 !