使用canal和RabbitMQ实现缓存数据同步
1. 缓存更新策略
1.1 需求分析
当tb_ad(广告)表的数据发生变化时,更新Redis中的广告数据。
1.2 实现思路
(1)修改数据监控微服务,监控tb_ad表,当发生增删改操作时,提取position值(广告位置key),发送到RabbitMQ
(2)从RabbitMQ中提取消息,通过OkHttpClient调用ad_update来实现对广告缓存数据的更新。
1.3 代码实现
1.3.1 发送消息到mq
(1)在RabbitMQ管理后台创建队列 ad_update_queue ,用于接收广告更新通知
(2)引入RabbitMQ起步依赖
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
(3)配置文件application.properties 添加内容
spring.RabbitMQ.host=192.168.200.128
(4)修改BusinessListener类
@CanalEventListener
public class BusinessListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@ListenPoint(schema = "changgou_business", table = {"tb_ad"})
public void adUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println("广告数据发生变化");
//修改前数据
for(CanalEntry.Column column: rowData.getBeforeColumnsList()) {
if(column.getName().equals("position")){
System.out.println("发送消息到mq ad_update_queue:"+column.getValue());
rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //发送消息到mq
break;
}
}
//修改后数据
for(CanalEntry.Column column: rowData.getAfterColumnsList()) {
if(column.getName().equals("position")){
System.out.println("发送消息到mq ad_update_queue:"+column.getValue());
rabbitTemplate.convertAndSend("","ad_update_queue",column.getValue()); //发送消息到mq
break;
}
}
}
}
(5)测试,运行数据监控微服务canal,新增、修改或删除tb_ad表数据,修改后观察控制台输出和RabbitMQ管理界面中ad_update_queue是否接收到消息
1.3.2 从mq中提取消息执行更新
(1)changgou_service_business工程pom.xml引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.9.0</version>
</dependency>
(2)在spring节点下添加RabbitMQ配置
spring:
RabbitMQ:
host: 192.168.200.128
(3)com.changgou.business包下创建listener包,包下创建类
@Component
@RabbitListener(queues = "ad_update_queue")
public class AdListener {
/**
* 获取更新广告通知
* @param message
*/
@RabbitHandler
public void updateAd(String message){
System.out.println("接收到消息:"+message);
String url = "http://192.168.200.128/ad_update?position="+message;
OkHttpClient okHttpClient = new OkHttpClient();
final Request request = new Request.Builder()
.url(url)
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
e.printStackTrace();//显示错误信息
}
@Override
public void onResponse(Call call, Response response) throws IOException {
System.out.println("调用成功"+response.message());
}
});
}
}
(4)测试,启动eureka和business微服务,观察控制台输出和数据同步效果。
2. 商品上架索引库导入数据
2.1 需求分析
商品上架将商品的sku列表导入或更新索引库。
2.2 实现思路
(1)当调用商品微服务中商品上架方法时候, 则商品微服务连接mysql数据库根据SPU的主键ID更改SPU表中is_marketable状态字段的值为1(已上架).
(2)在RabbitMQ管理后台创建商品上架交换器(Exchanges)。使用分列模式(fanout)的交换器是考虑商品上架会有很多种逻辑需要处理,导入索引库只是其中一项,另外还有商品详细页静态化等操作。这样我们可以创建导入索引库的队列和商品详细页静态化队列并与商品上架交换器进行绑定。
(3)商品微服务将需要上架的SPU的主键ID发送给RabbitMQ的商品上架交换器, 交换器则将数据根据路由规则发给对应的索引库上架队列和静态页上架队列.
(4) 搜索微服务从RabbitMQ的索引库上架队列中提取spu的id,通过feign调用商品微服务得到sku的列表,并且通过调用elasticsearch的高级restAPI 将sku列表导入到索引库。
(5) 静态页微服务从RabbitMQ的静态页上架队列中提取SPU的id, 通过feign调用商品微服务得到sku, 分类, 品牌, spu等各种数据信息, 然后根据模板生成静态化页面.
2.3 代码实现
2.3.1 配置RabbitMQ
在RabbitMQ后台创建交换器goods_up_exchange(类型为fanout),创建队列search_add_queue绑定交换器goods_up_exchange
2.3.2 发送上架商品id到Mq
(1) changgou_service_goods工程引入依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2) changgou_service_goods工程的application.yml中添加RabbitMQ配置
spring:
RabbitMQ:
host: 192.168.200.128
(3) changgou_service_goods工程的SpuServiceImpl中更改方法
@Autowired
private RabbitMessagingTemplate rabbitTemplate;
@Override
public void put(String id) {
/**
* 1. 更改数据库中的上架状态
*/
Spu spu = spuMapper.selectByPrimaryKey(id);
if(!spu.getStatus().equals("1")){
throw new RuntimeException("未通过审核的商品不能上架!");
}
spu.setIsMarketable("1");//上架状态
spuMapper.updateByPrimaryKeySelective(spu);
/**
* 2. 将数据发送到RabbitMQ中
*/
rabbitTemplate.convertAndSend("goods_up_exchange","",id);
}
2.3.3 索引库环境准备
(1)elasticsearch 6.5.2安装
(2)ik中文分词器安装
(3)kibana-6.5.2 安装
2.3.4 搜索微服务搭建
(1)创建changgou_service_search模块,pom.xml引入依赖
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_service_goods_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_service_search_api</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)changgou_service_search的application.yml
server:
port: 9009
spring:
application:
name: search
rabbitmq:
host: 192.168.200.128
Redis:
host: 192.168.200.128
main:
allow-bean-definition-overriding: true #当遇到同样名字的时候,是否允许覆盖注册
data:
elasticsearch:
cluster-name: elasticsearch
cluster-nodes: 192.168.200.128:9300
eureka:
client:
service-url:
defaultZone: http://127.0.0.1:6868/eureka
instance:
prefer-ip-address: true
feign:
hystrix:
enabled: true
client:
config:
default: #配置全局的feign的调用超时时间 如果 有指定的服务配置 默认的配置不会生效
connectTimeout: 600000 # 指定的是 消费者 连接服务提供者的连接超时时间 是否能连接 单位是毫秒
readTimeout: 600000 # 指定的是调用服务提供者的 服务 的超时时间() 单位是毫秒
#hystrix 配置
hystrix:
command:
default:
execution:
timeout:
#如果enabled设置为false,则请求超时交给ribbon控制
enabled: false
isolation:
strategy: SEMAPHORE
elasticSearch的配置是我们自己定义的,后边的连接工厂类会用到
(3)创建com.changgou包,包下创建SearchApplication
@SpringBootApplication
@EnableEurekaClient
@EnableDiscoveryClient
@EnableFeignClients(basePackages = "com.itheima.feign")
public class SearchApplication {
public static void main(String[] args) {
SpringApplication.run(SearchApplication.class);
}
}
2.3.5 创建索引库结构
- 在changgou_service_api项目下创建changgou_service_search_api项目
- pom.xml文件引入依赖
<dependency>
<groupId>com.changgou</groupId>
<artifactId>changgou_common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
- 创建包com.changgou.pojo
- 创建和ES索引库映射实体类SkuInfo.java
@Document(indexName = "skuinfo", type = "docs")
public class SkuInfo implements Serializable {
//商品id,同时也是商品编号
@Id
@Field(index = true, store = true, type = FieldType.Keyword)
private Long id;
//SKU名称
@Field(index = true, store = true, type = FieldType.Text, analyzer = "ik_smart")
private String name;
//商品价格,单位为:元
@Field(index = true, store = true, type = FieldType.Double)
private Long price;
//库存数量
@Field(index = true, store = true, type = FieldType.Integer)
private Integer num;
//商品图片
@Field(index = false, store = true, type = FieldType.Text)
private String image;
//商品状态,1-正常,2-下架,3-删除
@Field(index = true, store = true, type = FieldType.Keyword)
private String status;
//创建时间
private Date createTime;
//更新时间
private Date updateTime;
//是否默认
@Field(index = true, store = true, type = FieldType.Keyword)
private String isDefault;
//SPUID
@Field(index = true, store = true, type = FieldType.Long)
private Long spuId;
//类目ID
@Field(index = true, store = true, type = FieldType.Long)
private Long categoryId;
//类目名称
@Field(index = true, store = true,type = FieldType.Keyword)
private String categoryName;
//品牌名称
@Field(index = true, store = true,type = FieldType.Keyword)
private String brandName;
//规格
private String spec;
//规格参数
private Map<String, Object> specMap;
......get和set方法......
2.3.6 创建ES操作的Dao接口
public interface SearchMapper extends ElasticsearchRepository<SkuInfo,Long> {
}
2.3.7 搜索微服务批量导入数据逻辑
(1) changgou_service_goods_api创建com.changgou.feign 包,包下创建接口
@FeignClient(name="goods")
@RequestMapping("/sku")
public interface SkuFeign {
/***
* 多条件搜索品牌数据
* @param searchMap
* @return
*/
@GetMapping(value = "/search" )
public Result findList(@RequestParam Map searchMap);
}
(2)changgou_service_search项目下创建 com.changgou.search.service包包下创建接口EsManagerService
public interface EsManagerService {
/**
* 创建索引库结构
*/
public void createIndexAndMapping();
/**
* 根据spuid导入数据到ES索引库
* @param spuId 商品id
*/
public void importDataToESBySpuId(String spuId);
/**
* 导入全部数据到ES索引库
*/
public void importAll();
}
(2)创建com.changgou.search.service包,包下创建服务实现类
@Service
public class EsManagerServiceImpl implements EsManagerService {
@Autowired
private SearchMapper searchMapper;
@Autowired
private SkuFeign skuFeign;
@Autowired
private ElasticsearchTemplate esTemplate;
/**
* 创建索引库结构
*/
@Override
public void createIndexAndMapping() {
//创建索引
esTemplate.createIndex(SkuInfo.class);
//创建映射
esTemplate.putMapping(SkuInfo.class);
}
/**
* 根据spuid导入数据到ES索引库
* @param spuId 商品id
*/
@Override
public void importDataToESBySpuId(String spuId) {
List<Sku> skuList = skuFeign.findSkuListBySpuId(spuId);
List<SkuInfo> skuInfos = JSON.parseArray(JSON.toJSONString(skuList), SkuInfo.class);
for (SkuInfo skuInfo : skuInfos) {
skuInfo.setSpecMap(JSON.parseObject(skuInfo.getSpec(), Map.class));
}
searchMapper.saveAll(skuInfos);
}
/**
* 导入全部数据到ES索引库
*/
@Override
public void importAll() {
Map paramMap = new HashMap();
paramMap.put("status", "1");
Result result = skuFeign.findList(paramMap);
List<SkuInfo> skuInfos = JSON.parseArray(JSON.toJSONString(result.getData()), SkuInfo.class);
for (SkuInfo skuInfo : skuInfos) {
skuInfo.setPrice(skuInfo.getPrice());
skuInfo.setSpecMap(JSON.parseObject(skuInfo.getSpec(), Map.class));
}
searchMapper.saveAll(skuInfos);
}
}
2.3.8 根据spuId导入索引库数据
(1)changgou_service_search项目下, 创建com.changgou.controller包, 包下创建SearchController
@RestController
@RequestMapping("/sku_search")
public class SearchController {
@Autowired
private EsManagerService esManagerService;
@Autowired
private SearchService searchService;
@GetMapping("/createIndexAndMapping")
public Result createIndexAndMapping() {
esManagerService.createIndexAndMapping();
return new Result(true, StatusCode.OK, "创建成功");
}
/**
* 导入所有审核通过的库存数据到ES索引库
* @return
*/
@GetMapping("/importAll")
public Result importAllDataToES() {
esManagerService.importAll();
return new Result(true, StatusCode.OK, "导入数据成功!");
}
/**
* 全文检索
* @return
*/
@GetMapping
public Map search(@RequestParam Map<String, String> paramMap) throws Exception {
Map resultMap = searchService.search(paramMap);
return resultMap;
}
}
2.3.9 接收mq消息执行导入
(1)changgou_service_search工程的pom.xml文件中引入依赖包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
(2)changgou_service_search工程创建com.changgou.listener包,包下创建类
@Component
@RabbitListener(queues = "search_add_queue")
public class SpuAddListener {
@Autowired
private EsManagerService esManagerService;
@RabbitHandler
public void addDataToES(String spuId) {
System.out.println("===接收到需要商品上架的spuId为======" + spuId);
esManagerService.importDataToESBySpuId(spuId);
}
}
测试:
注意: 测试前将ES中现有的SkuInfo索引库删除干净
(1)启动环境 eureka 、elasticsearch 、canal服务端、canal数据监控微服务、RabbitMQ
(2)启动商品微服务、搜索微服务
(3)先访问 http://localhost:9009/sku_search/createIndexAndMapping 创建索引库结构
( 4 ) 修改tb_spu某记录的is_marketable值为1,观察控制台输出,启动kibana查询记录是否导入成功
3. 商品下架索引库删除数据
3.1 需求分析
商品下架后将商品从索引库中移除。
3.2 实现思路
与商品上架的实现思路非常类似。
(1)当管理员操作商品微服务, 调用下架操作时, 首先更新mysql的SPU表中的is_marketable状态为0(下架)。
(2)在RabbitMQ管理后台创建商品下架交换器(Exchanges)。使用分列模式(Fanout)的交换器是考虑商品下架会有很多种逻辑需要处理,索引库删除数据只是其中一项,另外还有删除商品详细页等操作。
(3)搜索微服务从RabbitMQ的的队列中提取spu的id,通过调用elasticsearch的高级restAPI 将相关的sku列表从索引库删除。