八、代码实现-自动/手动应答( 五 )


消费者2(拒收 , 每10s拒收一条 , 初始取2条)
package com.rabbitmq3;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import com.rabbitmqUtils.RabbitmqUtils;import java.io.IOException;import java.util.concurrent.TimeoutException;/*** 这是一个工作线程** @author 天真热* @create 2022-02-08 16:36* @desc**/public class Consume02 {//接收消息public static void main(String[] args) throws IOException, TimeoutException {System.out.println("这是工作线程2....");Channel channel = RabbitmqUtils.getChannel();//0是轮询、1是不公平分发 , 大于1则是预取值 , 默认为0 。预取后 , 再进行不公平分发 。channel.basicQos(2);//消费者成功消费时候的回调方法DeliverCallback deliverCallback = (consumerTag, message) -> {//睡眠try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("消费者成功消费时候的回调" + new String(message.getBody()));//1.消息否定确认标记 , 2.消息是否重新被放回队列channel.basicReject(message.getEnvelope().getDeliveryTag(), true);};//消费者取消消费的回调方法CancelCallback cancelCallback = consumerTag -> {System.out.println("消费者取消消费的回调方法");};//消费消息//1.队列名//2.消费成功后是否自动应答(是则默认成功 , 否则需要)//3.消费者未成功消费时候的回调//4.消费者取消消费的回调方法channel.basicConsume(RabbitmqUtils.RABBITMQ_QUEUE, false, deliverCallback, cancelCallback);}}
可以去界面查看
结果:
九、代码实现-消息发布确认的三种方式
消息发布确认就是指生产者将消息发送到 后 , 如果收到消息 , 则会给我们生产者一个应答 。生产者进行接收应答 , 用来确定这条消息是否正常的发送到。对于消息发布确认一共有三种方式 。
单个确认为每发送一次消息就进行一次确认 , 优点是准确无误 , 缺点是资源占用较大 , 速度较慢 。1000条数据测试时间为
/*** 单个确认*/public static void publishDg() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//发送消息for (int i = 0; i < 1000; i++) {channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, ("message" + i).getBytes());//发布确认boolean flag = channel.waitForConfirms();if (flag) {System.out.println("发送成功");}}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));}
批量确认为每发送一批消息再进行一次确认 , 优点是比单个确认更快 , 但是无法精确定位到发送失败消息 。1000条数据测试时间为170ms
public static void publishPl() throws IOException, TimeoutException, InterruptedException {//获取信道Channel channel = RabbitmqUtils.getChannel();//开启发布确认channel.confirmSelect();//开始时间long begin = System.currentTimeMillis();//批量确认大小int batchSize = 100;//批量发送消息、确认for (int i = 0; i < 1000; i++) {channel.basicPublish("", RabbitmqUtils.RABBITMQ_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, ("message" + i).getBytes());if (i % batchSize == 0) {channel.waitForConfirms();}}//结束时间long end = System.currentTimeMillis();System.out.println("单独确认花费时间=" + (end - begin));}
异步确认为在发送前创建一个支持高并发的Map , key存储消息tag , 存储 , 并调用监听器进行监听消息发送 。一般在每次发送后用Map记录下发送消息 , 监听器根据结果回调相关函数 , 若发送成功 , 回调成功函数 , 在Map中删去该消息 。发送失败 , 回调失败函数 , 在失败函数中通过Map显示该消息 。异步确认在所有确认中综合性能最佳 。1000条数据测试时间为43ms