首页 > MQ

若想实现RabbitMQ的消息回调,需要在配置文件下的rabbit选项下添加一行设置:

 

然后在发送者发送消息之前,需要先自行绑定好一个回调方法。

下面是创建回调方法与绑定的全部代码:

 

这样就已经成功实现回调了。但是发送者发送消息使用的RabbitTemplate是Spring自动创建的,因为Spring默认的Bean是单例的,所以针对不同的确认方案需要配置不同的bean.

比如我上边的代码,用的是默认的,那么我在这设置一次后,其他使用默认RabbitTemplate的发送者发送消息都会触发这个回调。这个看业务场景具体设置吧。

 

关于确认,看到回调方法里边的那个ack参数了吗?这个参数是代表消息已经投递了,消费端那边也拿到了,并且处理了(即发送ack了)。至于到底有没有消费,就看返回值到底是true还是false了。

不过不管他有没有消费,这条消息肯定已经不再消息队列中了。

如果消费者发送了ack,RabbitMq将会把这条消息从待确认中删除。如果是nack并且指明不要重新入队列,那么该消息也会删除。但是如果是nack且指明了重新入队列那么这条消息将会进入队列,然后重新发送给消费者。被重新投递的消息消息头amqp_redelivered属性会被设置成true,客户端可以依靠这点来判断消息是否被确认,可以好好利用这一点,如果不做判定每次失败都无脑重新回队列有可能导致同一消息不停的被发送和拒绝。消费者在确认消息之前和RabbitMq失去了连接那么消息也会被重新投递。

 

关于消费端的手动确认,我也写了一份代码

 

如果不想手动确认,在配置文件里把 acknowledge-mode: manual 改为auto,SpringBoot就会帮你自动确认。这个的机制是如果方法成功执行完毕无事发生,那就ack,要是途中报错了,就nack。

我个人觉得还是手动确认比较好,比较可控。

如果要使用SpringBoot帮你处理消息重试机制的话可以直接在yml中配

 

关于发送消息的回调,和接收消息后的重试,这两个东西不要随便乱配。

如果不是必须保证消息的投靠特别稳定、数据不能出现一点丢失。那么完全可以不用配这几个东西。嘛,具体还是看业务啦。

阅读全文

自然,依赖是少不了的。除了spring-boot-starter-web依赖外。

就这个是最主要的依赖了,其他的看着办就是了。我用的是gradle,用maven的看着弄也一样的。无非就是包+包名+版本

这里有一个坑。导致我后来发送消息时一直连不上去。报错: java.net.SocketException: socket closed。

我去网上寻找了许多方案。大致都是一个意思。没有设置远程连接权限。让我添加一个用户,并且设置最大权限。

 

下面是添加rabbitmq用户的命令

我用完之后去管控台(http://ip:15672)看了一下用户列表。确实已经添加上去了,也是最大权限。

然鹅并没有什么卵用

后来强行摸索出来了,原来是版本差异的原因。我SpringBoot本来是使用的是2.0.3版本,然后AMQP我使用的是2.0.4。可能有什么不兼容的地方。

把Springboot和AMQP的版本给同步成一个就好了。别的版本差一点根本没啥问题,就AMQP特殊,也是醉了。

 

 

使用SpriongBoot的yml配置:重点是rabbitmq那一栏

设置好登录用户、密码、地址端口、虚拟地址、超时时间就可以了

这里又有个小坑,这个rabbitmq的超时时间(connection-timeout)配的我真的是醉了,我看的教程里写的是15000,表示15秒,我一输之后IDEA直接报红线啊。

网上一找,全特么用毫秒值配的,行吧,应该我们用的不是一个版本。

点开看下这参数接受一个java.time.Duration对象,百思不得其解。这玩意咋配?我不会啊。找了二十分钟的攻略才知道是这样子配的,使用数字+时间标志。比如1h、1M、1m、1d、1s、1ms这种格式就行了。


咳咳,配置文件弄好后也就差不多可以使用rabbitmq发消息了。

生产端发消息。只需要使用 RabbitTemplate 类就够了,看到这个名字,有没有一种很熟悉的感觉?

Redis也有个这玩意 叫 RedisTemplate,关于SpringBoot操作Redis的可转看:SpringBoot下对于Redis的操作——RedisTemplate

 

关于发消息,在这儿最好还是先指定好exchange和routingKey,即交换机和路由键。

这样发过去的消息才能被发到指定的交换机上,然后交换机在通过你的routingKey来发送给绑定了该routingKey的所有队列。

所以首先登陆管控台(http://ip:15672),到ExchangesQueues菜单下,创建好交换机和队列,还有他们之间的routingKey。这个步骤我就不详细描述了。单靠语言不怎么能够描述清楚。估计得配很多图,有需要的自行google把。

 

万事俱备。正式开始发送消息。

先准备一个要发的玩意。根据业务需求自己创个model就行。我这随便写一个。

关于这个messageId,及消息唯一ID。他的作用是将该条消息数据和RabbitMQ发送的消息绑定起来。不要也不是不行。只是最好还是设置一个这个参数。

 

要发送的数据模型已经准备好,接下来这个类是一个重点。即发送消息的类。

注入RabbbitTemplate,然后就可以通过他的 convertSendAndReceive() 方法进行消息的发送。

他有很多种重载,最好是选用我这种,比较可控。交换机、路由键、消息唯一ID全部指定好。

emmmm,是不是感觉还是挺简单的。一个方法调用,消息就过去了。就发送到指定的交换机了。交换机再通过你的routingKey转发给绑定在上边的队列。生产端这边就完事了。

 

写个测试类测试一下。

 

运行完毕后。登陆管控台(http://ip:15672),进入Queues菜单。即可发现消息队列中已接收到一条消息,会是一个等待消费的状态。

至于到底是哪个消息队列来处理嘛,那就得看你的exchange通过你的routingKey具体把消息转发到哪儿了。这个都是在管控台里边配置的。

 

生产端准备完毕。接下来是消费端。消费端也很简单,yml需要添加消费端的配置。签收模式最好选择手动签收。可控。

 

 

具体的消费者,具体解释都写在注释中了。

 

关于@Exchange注解中设置的交换机的type属性,主要是用这些值:

  • fanout:会把所有发到Exchange的消息路由到所有和它绑定的Queue
  • direct:会把消息路由到routing key和binding key完全相同的Queue,不相同的丢弃
  • topic:direct是严格匹配,那么topic就算模糊匹配,routing key和binding key都用.来区分单词串,比如A.B.C,*匹配任意单词,#匹配任意多个或0个单词,比如。A.B.*可以匹配到A.B.C
  • headers:不依赖routing key和binding key,通过对比消息属性中的headers属性,对比Exchange和Queue绑定时指定的键值对,相同就路由过来

basicAck()方法可以确认消息消费。执行后,消息队列中这条消息就没了。multiple参数表示是否批量消费,一般都选false。

把消费端的服务打开后,就已经在监听了。若监听的队列中已有消息,则会立即处理。直到队列中没消息为止。

若队列为空,他就不会动,这个时候我启动一下生产者那边的测试,消息一发出去,立马就被消费。非常完美。就是这个效果。

 

呼,偶尔也不想咸鱼了啊,今天一天大概把RabbitMQ搞明白一些了,配置也会配了,消息也会发了。踩了一万个坑,有不少是那种比较SB的采坑方式,一般人应该踩不到,我就不打出来了。

还是感觉有很多收获的。就是累成麻瓜了。

 

阅读全文

AMQP核心概念:

Server:又称Broker,接收客户端的连接,实现AMQP实体服务

Connection:连接,应用程序与Broker的网络连接。

Channel:网络信道,几乎所有的操作都在Channel中进行,每个Channel代表一个会话任务。

Message:消息,服务器和应用程序之间传送的数据,由Properties和Body组成。Properties可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body则就是消息体内容。

Virtual host:虚拟地址,用于进行逻辑隔离。最上层的消息路由,一般用于在同一台服务器上隔离不同的项目,一个Virtual host里面可以有若干个Exchange和Queue,同一个Virtual host里面不能够有相同名称的Exchange和Queue。

Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列(Queue)上。

Binding: Exchange和Queue之间的虚拟连接,binding中可以包含Routing key

Routing key:一个路由规则,虚拟机可用它来确定如何路由一个特定消息。

Queue:也成为Message Queue,消息队列,保存消息并将它们转发给消费者

 

 

RabbitMQ消息是如何流转的?

我说下我的理解,我看的是视频教程。网上搜了一下,关于RabbitMQ的消息流转流程这一块的讲解略少。故自己整理一下。

 

Server应该可以理解为服务器(我也不确定这么说准不准确)。服务器通过tcp接收到客户端的连接(Connection)。三次握手之后,开启一个信道(Channel)(网络通道)。话说这一段感觉似曾相识…用啥中间件都有这步骤额,就是连数据库也是这个流程啊。

生产者这边发送一个消息(Message)到指定的虚拟地址 (Virtual host)上,需要指定交换机。对应的交换机(Exchange)即开始对该条消息的路由规则(Routing key)进行解析,如果该条路由规则绑定了一个或多个队列(Queue)则会将消息转发到绑定的队列上,若该路由规则在此交换机上并没有绑定任何队列则该条消息丢失。然后队列接收到消息时,监听该队列的消费者就会得到通知,来对队列中的消息进行处理。这应该就是一个大概的消息流转流程了。

 

 

阅读全文

过不久就需要亲身参与/负责一个分布式架构。得用到消息进行服务器之间的通信。技术选型为RabbitMQ,赶紧自学一下。

今天太晚了之后的就不弄了。也就安装完了进了下管控台。特么主要是我想用虚拟机里边的mysql,偏偏这mysql密码我忘了,搞了好久。

 

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言来编写的,并且RabbitMQ是基于AMQP协议的

 

优点是:开源、性能优秀、稳定

 

可以和SpringAMQP( AMQP:高级消息队列协议 )完美的整合,API丰富

集群模式丰富、表达式配置、HA模式、镜像队列模型

保证数据不丢失的前提做到高可靠性、可用性

 

 

关于我的rabbitmq的安装:

首先下载必要的包(我的系统是CentOS7)

Rpm包下载地址,我用 wget 命令下挺慢,我是直接用浏览器下载然后用FileZilla传到服务器上边去的。

Erlang:

http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm

Rabbitmq:

http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.6/rabbitmq-server-3.6.6-1.el7.noarch.rpm

 

下完后用rpm -ivh来安装,先安装erlang在安装rabbimq。

安装rabbitmq前先使用 yum install socat 命令安装rabbitmq的依赖

 

安装完毕后可以用rabbitmq-server restart试一下,然后要进管控台的话先输入命令: /sbin/rabbitmq-plugins enable rabbitmq_management进行管控台插件的安装

 

然后将ebin目录下rabbit.app中loopback_users里的<<“guest”>>删除并重新启动rabbitmq。具体路径的话: /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.6/ebin/rabbit.app

 

它没有重启rabbitmq服务这个命令,想重启只能通过两个命令来实现:

rabbitmqctl stop :停止rabbitmq

rabbitmq-server restart : 开启rabbitmq

 

然后就可以进管控台了。

输入http://ip:15672/ 即可进入管控台,账号密码都是guest。只要做了将ebin目录下rabbit.app中loopback_users里的<<“guest”>>删除这个操作就能登陆成功

 

进不去的可能是开了防火墙,端口没开

关于防火墙:

 

 

 

阅读全文
EA PLAYER &

历史记录 [ 注意:部分数据仅限于当前浏览器 ]清空

      00:00/00:00