利用RabbitMQ实现mysql与ElasticSearch的数据同步
生产者消费者导入MQ的依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
添加配置
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: 123456
virtual-host: myHost
RabbitMQ的配置信息
两个消息队列 分别是数据增改 和 删除的队列
创建主题(topic)类型的交换机,并绑定刚刚创建的两个消息队列,并分别设置相应的key,消费者可以通过不同的key来判断该消费那一条消息队列的数据。
生产者的配置
所谓的生产者就是我们数据库的服务方,当我们对数据库的数据进行增删改的时候,我们应该像消息队列发送消息来通知ES我们进行了增删改操作,以便ES进行数据的同步。
/** * RabbitMQ的配置 */ @Configuration public class RabbitMQConfig { public static final String QUEUE_COURSE_SAVE = "queue.course.save"; public static final String QUEUE_COURSE_REMOVE = "queue.course.remove"; public static final String KEY_COURSE_SAVE = "key.course.save"; public static final String KEY_COURSE_REMOVE = "key.course.remove"; public static final String COURSE_EXCHANGE = "edu.course.exchange"; @Bean public Queue queueCourseSave() { return new Queue(QUEUE_COURSE_SAVE); } @Bean public Queue queueCourseRemove() { return new Queue(QUEUE_COURSE_REMOVE); } @Bean public TopicExchange topicExchange() { return new TopicExchange(COURSE_EXCHANGE); } @Bean public Binding bindCourseSave() { return BindingBuilder.bind(queueCourseSave()).to(topicExchange()).with(KEY_COURSE_SAVE); } @Bean public Binding bindCourseRemove() { return BindingBuilder.bind(queueCourseRemove()).to(topicExchange()).with(KEY_COURSE_REMOVE);
生产者控制层
生产者发送消息的主要方法
@Autowired RabbitTemplate rabbitTemplate; rabbitTemplate.convertAndSend(交换机的名称,消息的key,消息内容);
@Slf4j @RestController public class CourseController { @Autowired private ICourseService courseService; @Autowired RabbitTemplate rabbitTemplate; @PostMapping("/course-upload") public ResponseEntity<String> upload(MultipartFile file) throws IOException { //创建文件输入流 InputStream inputStream = file.getInputStream(); //获得文件名 String filename = file.getOriginalFilename(); //调用文件上传方法 OSSUtil.upload(inputStream,filename); //回调上传的文件 String url = OSSUtil.getURL(filename); //返回前端 return ResponseEntity.ok(url); } @GetMapping("/courses") public ResponseEntity<Page<Course>> findAllPage(@RequestParam("current") Integer current, @RequestParam("PAGE_SIZE") Integer PAGE_SIZE){ Page<Course> page = new Page<>(current,PAGE_SIZE); return ResponseEntity.ok(courseService.page(page)); } @GetMapping("/course") public ResponseEntity<Course> findOne(@RequestParam("id") Integer id){ return ResponseEntity.ok(courseService.findOne(id)); } @PostMapping("/course") public ResponseEntity<String> add(@RequestBody Course course){ courseService.save(course); rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE, JSON.toJSONString(course)); return ResponseEntity.ok("ok"); } @PutMapping("/course") public ResponseEntity<String> modify(@RequestBody Course course){ courseService.updateById(course); rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_SAVE,JSON.toJSONString(course)); return ResponseEntity.ok("ok"); } @DeleteMapping("/course/{id}") public ResponseEntity<String> deleteProductHandovers(@PathVariable("id") Integer id){ courseService.removeById(id); rabbitTemplate.convertAndSend(RabbitMQConfig.COURSE_EXCHANGE,RabbitMQConfig.KEY_COURSE_REMOVE,id); return ResponseEntity.ok("ok"); } }
消费者对消息队列进行监听
所谓的消费者就是ES服务的操作方,通过实时的对消息队列的监听,通过消息队列对应的key值来进行选择服务的调用,不同的key调用不同的服务,获取服务方传输的数据,然后进行数据的同步。
@Slf4j @Component public class CourseMQListener { public static final String QUEUE_COURSE_SAVE = "queue.course.save"; public static final String QUEUE_COURSE_REMOVE = "queue.course.remove"; public static final String KEY_COURSE_SAVE = "key.course.save"; public static final String KEY_COURSE_REMOVE = "key.course.remove"; public static final String COURSE_EXCHANGE = "course.exchange"; @Autowired ICourseService courseService; /** * 监听课程添加操作 */ @RabbitListener(bindings = { @QueueBinding(value = @Queue(value = QUEUE_COURSE_SAVE, durable = "true"), exchange = @Exchange(value = COURSE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true") , key = KEY_COURSE_SAVE)}) public void receiveCourseSaveMessage(String message) { try { log.info("课程添加:{}",message); Course course = JSON.parseObject(message,Course.class); //将消息转为课程,保存到es中 courseService.saveOrUpdate(course); log.info("添加完成:{}",course); } catch (Exception ex) { ex.printStackTrace(); } } /** * 监听课程删除操作 */ @RabbitListener(bindings = { @QueueBinding(value = @Queue(value = QUEUE_COURSE_REMOVE, durable = "true"), exchange = @Exchange(value = COURSE_EXCHANGE, type = ExchangeTypes.TOPIC, ignoreDeclarationExceptions = "true") , key = KEY_COURSE_REMOVE)}) public void receiveCourseDeleteMessage(Long id) { try { courseService.removeById(id); log.info("课程删除完成:{}",id); } catch (Exception ex) { ex.printStackTrace(); } } }
本期内容就到这里啦~以上内容均可在 方包博客「http://fang1688.cn」 网站直接搜索名称访问哦。欢迎感兴趣的小伙伴试试,如果本文对您有帮助,也请帮忙点个 赞 + 在看 啦!❤️
欢迎大家加入方包的「优派编程」学习圈子,和多名小伙伴们一起交流学习,向方包 1 对 1 提问、跟着方包做项目、领取大量编程资源等。Q群「763256989」欢迎想一起学习进步的小伙伴~
另外方包最近开发了一款工具类的小程序「方包工具箱」,功能包括:抖音、小红书、快手去水印,天气预报,小说在线免费阅读(内含上万部热门小说),历史今天,生成图片二维码,图片识别文字,ai伪原创文章,数字摇号抽奖,文字转语音MP3功能...
送福利!关注下方的公众号:「优派编程」回复「资料」,即可获得软件app下载资源和python、java等编程学习资料! 点击卡片关注「优派编程」
定期分享 it编程干货
学习了