八、代码实现-自动/手动应答( 四 )


启动消费者1 , 再启动消费者2.然后启动生产者 , 可以看到 , 消息是轮询发送给两个消费者 。
八、代码实现-自动/手动应答 概念
1)自动应答:消息从队列发送给消费者时 , 就已经默认消费成功 。
优点:效率高 。
缺点:一方面如果消费者在消费消息时候如果断开了 , 则消费者没有成功处理消息 , 而队列默认消费成功 , 就会造成数据丢失 。另一方面 , 如果消费者系统性能交叉 , 没法及时处理消息 , 就会造成消息积压 , 内存耗尽而崩溃 。
2)手动应答:消息从队列发送给消费者时 , 消费者需要手动确认消息 , 队列才会认为消费成功 。
优点:数据传输较为安全 , 而且可操作性较高 。
缺点:效率低手动应答环境下的可操作性
1)根据不同消费者应答消息的效率 , 队列可以动态分配消息给消费者
2)对于一些特殊的队列信息 , 可以选择拒收 , 重新放回队列代码实现
生产者
package com.rabbitmq3;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** @author 天真热* @create 2022-02-08 14:52* @desc**/public class Product {//发消息public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//生成队列,//1.名称//2.队列消息是否持久化(否:存内存 , 是:存磁盘 。默认否)//3.队列是否只供一个消费者消费 , 默认否//4.最后一个消费者断开连接后 , 是否自动删除 。//5.其他参数channel.queueDeclare(RabbitmqUtils.RABBITMQ_QUEUE, true, false, false, null);//持续发送消息for (int i = 0; i < 10; i++) {//发消息String message = "this is Product"+i;//1.交换机 , 简单版本不考虑 , 直接空字符串即可(默认/无名交换机)//2.路由key , 直接写队列名即可//3.参数 , (消息持久化 , 需要队列开启持久化才有效)//4.消息体channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}System.out.println("消息发送成功");}}
【八、代码实现-自动/手动应答】消费者1(接受 , 每秒接受1条 , 初始取5条)
package com.rabbitmq3;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/public class Consume01 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程1....");Channel channel = RabbitmqUtils.getChannel();//0是轮询,默认值//1是不公平分发 , 即哪个消费者效率高 , 哪边分配的多//大于1则是预取值 , 即消费者一定会消费的消息数量 。预取后 , 再进行不公平分发 。channel.basicQos(5);//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {//睡眠try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));//手动应答//1.消息确认标记 , 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(), false);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功 , 否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, false, deliverCallback, cancelCallback);}}