03_普通消息
三种消息发送方式
发送同步消息
同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求。这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

代码演示
// org.apache.rocketmq.example.simple.SyncProducer
public class SyncProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
//1.创建生产者
DefaultMQProducer producer = new DefaultMQProducer("group_test");
//设置NameServer地址
producer.setNamesrvAddr("127.0.0.1:9876");
//2.启动实例
producer.start();
//3.构造消息(Topic,tag,消息体)、发送消息
for (int i = 0; i < 10; i++) {
Message message = new Message("TopicTest", "mobile", ("Hello" + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message);
System.out.println("%s%n" + sendResult);
}
//5.关闭生产者Producer
producer.shutdown();
}
}发送结果分析
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B33680000, offsetMsgId=C0A8010500002A9F000000000000078A, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B33700001, offsetMsgId=C0A8010500002A9F000000000000084B, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B33720002, offsetMsgId=C0A8010500002A9F000000000000090C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B33770003, offsetMsgId=C0A8010500002A9F00000000000009CD, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B337A0004, offsetMsgId=C0A8010500002A9F0000000000000A8E, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B337C0005, offsetMsgId=C0A8010500002A9F0000000000000B4F, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B337F0006, offsetMsgId=C0A8010500002A9F0000000000000C10, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B33810007, offsetMsgId=C0A8010500002A9F0000000000000CD1, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B33830008, offsetMsgId=C0A8010500002A9F0000000000000D92, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=2], queueOffset=4]
SendResult [sendStatus=SEND_OK, msgId=7F0000010FE418B4AAC22B7B33850009, offsetMsgId=C0A8010500002A9F0000000000000E53, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=3], queueOffset=4]msgId
消息的全局唯一标识(RocketMQ的ID生成是使用机器IP和消息偏移量的组成),由消息队列 MQ 系统自动生成,唯一标识某条消息。
sendStatus
发送的标识:成功,失败等
queueId
queueId是Topic的分区;Producer发送具体一条消息的时,对应选择的该Topic下的某一个Queue的标识ID。
queueOffset
Message queue是无限长的数组。一条消息进来下标就会涨1,而这个数组的下标就是queueOffset,queueOffset是从0开始递增。
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

代码演示
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group_test");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++) {
final int index = i;
//keys:通常用于唯一标识一条消息或者作为消息的业务键,比如订单ID。消息的键可以帮助实现消息的去重、查询等功能。
Message message = new Message("ToicTest", "TagA","OrderID888", ("Hello" + i).getBytes(StandardCharsets.UTF_8));
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s%n", sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
producer.shutdown();
}
}发送结果:
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0008, offsetMsgId=C0A8010500002A9F0000000000000FD2, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0001, offsetMsgId=C0A8010500002A9F00000000000010A0, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0005, offsetMsgId=C0A8010500002A9F000000000000116E, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0000, offsetMsgId=C0A8010500002A9F000000000000123C, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0006, offsetMsgId=C0A8010500002A9F000000000000130A, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0007, offsetMsgId=C0A8010500002A9F00000000000013D8, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0004, offsetMsgId=C0A8010500002A9F00000000000014A6, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0002, offsetMsgId=C0A8010500002A9F0000000000001574, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0009, offsetMsgId=C0A8010500002A9F0000000000001642, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=7F0000014D0418B4AAC230F9745D0003, offsetMsgId=C0A8010500002A9F0000000000001710, messageQueue=MessageQueue [topic=ToicTest, brokerName=broker-a, queueId=2], queueOffset=2]单向发送
这种方式主要用在不特别关心发送结果的场景,例如日志发送。单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

代码演示
public class OneWayProducer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("group_tets");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++) {
Message message = new Message("Topic-OneWay", "TagA", ("OneWay" + i).getBytes(StandardCharsets.UTF_8));
producer.sendOneway(message);
}
producer.shutdown();
}
}消息发送的权衡

两种消息消费方式
负载均衡模式(集群消费)
消费者采用负载均衡方式消费消息,一个分组(Group)下的多个消费者共同消费队列消息,每个消费者处理的消息不同。一个Consumer Group中的各个Consumer实例分摊去消费消息,即一条消息只会投递到一个Consumer Group下面的一个实例。例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。

代码演示
public class BalanceConsumer {
public static void main(String[] args) throws MQClientException {
//实例化消息推送消费者,并指定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
//设置NameServer地址,用于消费者查找生产者和主题的路由信息。
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅指定主题的所有消息,使用通配符"*"表示订阅所有标签。
consumer.subscribe("TopicTest", "*");
//默认集群消费
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册消息监听器,用于处理消费逻辑。
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
// 提取消息的主题、标签和内容
String topic = msg.getTopic();
String tags = msg.getTags();
String body = new String(msg.getBody());
System.out.println("收到消息:" + "topic:" + topic + ",tag:" + tags + ",body:" + body);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 消息处理成功,返回成功标志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Start.%n");
}
}广播消费

广播消费模式中消息将对一个Consumer Group下的各个Consumer实例都投递一遍。即使这些 Consumer属于同一个Consumer Group,消息也会被Consumer Group 中的每个Consumer都消费一次。实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。
代码演示
public class BroadConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
//订阅主题
consumer.subscribe("TopicTest", "*");
//广播模式消费
consumer.setMessageModel(MessageModel.BROADCASTING);
//注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
// 提取消息的主题、标签和内容
String topic = msg.getTopic();
String tags = msg.getTags();
String body = new String(msg.getBody());
System.out.println("收到消息:" + "topic:" + topic + ",tag:" + tags + ",body:" + body);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
// 消息处理成功,返回成功标志
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Start.%n");
}
}消息消费时的权衡
负载均衡模式:适用场景&注意事项
- 消费端集群化部署,每条消息只需要被处理一次。
- 由于消费进度在服务端维护,可靠性更高。
- 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。
- 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上,因此处理消息时不应该做任何确定性假设。
广播模式:适用场景&注意事项
- 每条消息都需要被相同逻辑的多台机器处理。
- 消费进度在客户端维护,出现重复的概率稍大于集群模式。
- 广播模式下,消息队列 RocketMQ 保证每条消息至少被每台客户端消费一次,但是并不会对消费失败的消息进行失败重投,因此业务方需要关注消费失败的情况。
- 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。
- 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。
- 目前仅 Java 客户端支持广播模式。
- 广播消费模式下不支持顺序消息。
- 广播消费模式下不支持重置消费位点。
- 广播模式下服务端不维护消费进度,所以消息队列 RocketMQ 控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。