05_消息发送时或消费时重要方法及属性
消息发送时的重要方法/属性
属性

producerGroup:生产者所属组
defaultTopicQueueNums:默认主题在每一个Broker队列数量
sendMsgTimeout:发送消息默认超时时间,默认3s
compressMsgBodyOverHowmuch:消息体超过该值则启用压缩,默认4k
retryTimesWhenSendFailed:同步方式发送消息重试次数,默认为2,总共执行3次
retryTimesWhenSendAsyncFailed:异步方法发送消息重试次数,默认为2
retryAnotherBrokerWhenNotStoreOK:消息重试时选择另外一个Broker时,是否不等待存储结果就返回,默认为false
maxMessageSize:允许发送的最大消息长度,默认为4M
方法
启动
void start() throws MQClientException;关闭
void shutdown();查找该主题下所有消息队列
List<MessageQueue> fetchPublishMessageQueues(final String topic) throws MQClientException;单向发送
//发送单向消息
void sendOneway(final Message msg) throws MQClientException, RemotingException,
InterruptedException;
//选择指定队列单向发送消息
void sendOneway(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, InterruptedException;同步发送
//同步发送消息
SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
InterruptedException;
//同步超时发送消息
SendResult send(final Message msg, final long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;
//选择指定队列同步发送消息
SendResult send(final Message msg, final MessageQueue mq) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException;异步发送
//异步发送消息
void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
RemotingException, InterruptedException;
//异步超时发送消息
void send(final Message msg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException;
//选择指定队列异步发送消息
void send(final Message msg, final MessageQueue mq, final SendCallback sendCallback)
throws MQClientException, RemotingException, InterruptedException;代码
/**
* @author: gongyuzhuo
* @since: 2024-06-16 15:13
* @description:
*/
public class ProducerDetails {
public static void main(String[] args) throws Exception {
// 生产者所属组(针对事务消息,高可用)
DefaultMQProducer producer = new DefaultMQProducer("producer_details");
// 默认主题在每个Broker上队列数量(对于新建主题有效)
producer.setDefaultTopicQueueNums(8);
// 发送消息默认超时时间,默认3s(3000ms)
producer.setSendMsgTimeout(1000 * 3);
// 消息体超过该值启动压缩,默认4k
producer.setCompressMsgBodyOverHowmuch(1024 * 4);
// 同步发送消息失败时重试次数,默认2(总共发送3次)
producer.setRetryTimesWhenSendFailed(2);
// 消息重试发送到另外一个Broker(消息没有储存成功是否发送到另外一个Broker),默认为false
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
// 单条消息最大4M
producer.setMaxMessageSize(1024 * 1024 * 4);
// 设置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 启动
producer.start();
// 获取主题下的所有消息队列
List<MessageQueue> queueList = producer.fetchPublishMessageQueues("TopicTest");
for (int i = 0; i < queueList.size(); i++) {
System.out.println(queueList.get(i).getQueueId());
}
for (int i = 0; i < 10; i++) {
final int index = i;
Message message = new Message("TopicTest", "TagA", "OrderId888", "Hello World".getBytes(StandardCharsets.UTF_8));
// ============================单向发送============================
// 1.1发送单向消息
producer.sendOneway(message);
// // 1.2指定队列发送单向消息
producer.sendOneway(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
}, null);
// 1.3指定队列发送单向消息(根据之前查找出来的主题)
producer.sendOneway(message, queueList.get(0));
// ============================同步发送============================
// 2.1同步发送消息
SendResult sendResult = producer.send(message);
// 2.1同步超时发送消息(属性设置:sendMsgTimeout 发送消息默认超时时间,默认3s)
SendResult sendResult1 = producer.send(message, 1000 * 3);
// 2.2指定队列发送同步消息(使用select方法)
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
}, null);
// 2.3指定队列同步发送消息(根据之前查找出来的主题队列消息)
SendResult sendResult2 = producer.send(message, queueList.get(0));
// ============================异步发送============================
// 3.1 异步发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
// 3.2异步超时发送消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
}, 1000 * 3);
// 3.3选择指定队列异步发送消息(根据之前查找出来的主题队列信息)
producer.send(message, queueList.get(0), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
// 3.4选择指定队列异步发送消息(使用select方法)
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return mqs.get(0);
}
}, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
}
});
}
Thread.sleep(10000);
// 如果不再发送消息,关闭Producer实例
producer.shutdown();
}
}消息消费时重要方法/属性
属性

// 消费者组
private String consumerGroup;
// 消息消费模式
private MessageModel messageModel = MessageModel.CLUSTERING;
// 指定消费开始偏移量(最大偏移量、最小偏移量、启动时间戳)开始消费
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
ConsumeFromTimestamp模式下只会在订阅组(消费者群组)第一次启动的时候,过滤掉小于当前系统时间戳的消息,后续如果进程停掉或者崩溃,但是又生产了新消息。下次启动消费者时,会继续消费停掉期间新生产的消息。后续行为和ConsumeFromLastOffset类似
// 消费者最小线程数量
private int consumeThreadMin = 20;
// 消费者最大线程数量
private int consumeThreadMax = 20;
// 推模式下任务间隔时间
private long pullInterval = 0;
// 推模式下任务拉取的条数,默认32条
private int pullBatchSize = 32;
// 消息重试次数,-1代表16次
private int maxReconsumeTimes = -1;
// 消息消费超时时间
private long consumeTimeout = 15;方法
// 订阅消息,并指定队列选择器
void subscribe(final String topic, final MessageSelector selector)
// 取消消息订阅
void unsubscribe(final String topic)
// 获取消费者对主题分配了那些消息队列
Set<MessageQueue> fetchSubscribeMessageQueues(final String topic)
// 注册并发事件监听器
void registerMessageListener(final MessageListenerConcurrently messageListener)
// 注册顺序消息事件监听器
void registerMessageListener(final MessageListenerOrderly messageListener)
消费确认(ACK)
业务实现消费回调的时候,当且仅当此回调函数返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,RocketMQ才会认为这批消息(默认是1条)是消费完成的中途断电,抛出异常等都不会认为成功——即都会重新投递。
返回ConsumeConcurrentlyStatus.RECONSUME_LATER,RocketMQ就会认为这批消息消费失败了。
如果业务的回调没有处理好而抛出异常,会认为是消费失败ConsumeConcurrentlyStatus.RECONSUME_LATER处理。
为了保证消息是肯定被至少消费成功一次,RocketMQ会把这批消息重发回Broker(topic不是原topic而是这个消费组的RETRY topic),在延迟的某个时间点(默认是10秒,业务可设置)后,再次投递到这个ConsumerGroup。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。
另外如果使用顺序消费的回调MessageListenerOrderly时,由于顺序消费是要前者消费成功才能继续消费,所以没有RECONSUME_LATER的这个状态,只有SUSPEND_CURRENT_QUEUE_A_MOMENT来暂停队列的其余消费,直到原消息不断重试成功为止才能继续消费