04_其它消息
顺序消息
概述
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。区别如下:
生产消息时在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
全局有序

分区有序

全局有序
全局有序比较简单,主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可。
分区有序
在电商业务场景中,一个订单的流程是:创建、付款、推送、完成。在加入RocketMQ后,一个订单会分别产生对于这个订单的创建、付款、推送、完成等消息,如果我们把所有消息全部送入到RocketMQ中的一个主题中,这里该如何实现针对一个订单的消息顺序性呢!如下图:

要完成分区有序性,在生产者环节使用自定义的消息队列选择策略,确保订单号尾数相同的消息会被先后发送到同一个队列中(案例中主题有3个队列,生产环境中可设定成10个满足全部尾数的需求),然后再消费端开启负载均衡模式,最终确保一个消费者拿到的消息对于一个订单来说是有序的。
代码案例
生产者代码
/**
* @author: gongyuzhuo
* @since: 2024-06-12 06:47
* @description:
*/
public class ProducerInOrder {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer orderProducer = new DefaultMQProducer("OrderProducer");
orderProducer.setNamesrvAddr("127.0.0.1:9876");
orderProducer.start();
List<Order> orderList = new ProducerInOrder().buildOrders();
for (int i = 0; i < orderList.size(); i++) {
String body = orderList.get(i).toString();
Message message = new Message("PartOrder", null, "KEY:" + i, body.getBytes(StandardCharsets.UTF_8));
SendResult sendResult = orderProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
//订单id
int id = (int) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderList.get(i).getOrderId());
System.out.println(String.format("sendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body)
);
}
}
private List<Order> buildOrders() {
List<Order> orderList = new ArrayList<>();
Order orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("创建");
orderList.add(orderDemo);//001:付款
orderDemo = new Order();
orderDemo.setOrderId(002);
orderDemo.setDesc("创建");
orderList.add(orderDemo);//002:创建
orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("付款");
orderList.add(orderDemo);//001:付款
orderDemo = new Order();
orderDemo.setOrderId(003);
orderDemo.setDesc("创建");
orderList.add(orderDemo);//003:创建
orderDemo = new Order();
orderDemo.setOrderId(002);
orderDemo.setDesc("付款");
orderList.add(orderDemo);//002:付款
orderDemo = new Order();
orderDemo.setOrderId(003);
orderDemo.setDesc("付款");
orderList.add(orderDemo);//003:付款
orderDemo = new Order();
orderDemo.setOrderId(002);
orderDemo.setDesc("推送");
orderList.add(orderDemo);//002:推送
orderDemo = new Order();
orderDemo.setOrderId(003);
orderDemo.setDesc("推送");
orderList.add(orderDemo);//003:推送
orderDemo = new Order();
orderDemo.setOrderId(002);
orderDemo.setDesc("完成");
orderList.add(orderDemo);//002:完成
orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("推送");
orderList.add(orderDemo);//001:推送
orderDemo = new Order();
orderDemo.setOrderId(003);
orderDemo.setDesc("完成");
orderList.add(orderDemo);//003:完成
orderDemo = new Order();
orderDemo.setOrderId(001);
orderDemo.setDesc("完成");
orderList.add(orderDemo);//001:完成
return orderList;
}
private static class Order {
private int orderId;
private String desc;
public int getOrderId() {
return orderId;
}
public void setOrderId(int orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "Order{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
}发送日志
sendResult status:SEND_OK, queueId:1, body:Order{orderId=1, desc='创建'}
sendResult status:SEND_OK, queueId:2, body:Order{orderId=2, desc='创建'}
sendResult status:SEND_OK, queueId:1, body:Order{orderId=1, desc='付款'}
sendResult status:SEND_OK, queueId:3, body:Order{orderId=3, desc='创建'}
sendResult status:SEND_OK, queueId:2, body:Order{orderId=2, desc='付款'}
sendResult status:SEND_OK, queueId:3, body:Order{orderId=3, desc='付款'}
sendResult status:SEND_OK, queueId:2, body:Order{orderId=2, desc='推送'}
sendResult status:SEND_OK, queueId:3, body:Order{orderId=3, desc='推送'}
sendResult status:SEND_OK, queueId:2, body:Order{orderId=2, desc='完成'}
sendResult status:SEND_OK, queueId:1, body:Order{orderId=1, desc='推送'}
sendResult status:SEND_OK, queueId:3, body:Order{orderId=3, desc='完成'}
sendResult status:SEND_OK, queueId:1, body:Order{orderId=1, desc='完成'}消费者代码
消费时,同一个OrderId获取到的肯定是同一个队列。从而确保一个订单中处理的顺序。
/**
* @author: gongyuzhuo
* @since: 2024-06-12 07:33
* @description:创建一个顺序消费者,订阅特定主题的消息,并设置消息监听器以处理接收到的消息。
*/
public class ConsumerInOrder {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer2");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置消费者从最新的消息偏移量开始消费。
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 订阅主题"PartOrder",使用通配符"*"表示订阅所有标签。
consumer.subscribe("PartOrder", "*");
// 注册一个顺序消息监听器,用于处理接收到的消息。
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
Random random = new Random();
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
//可以看到每个queue有唯一的consume线程来消费,订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + ",queueId=" + msg.getQueueId() + ",content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e) {
e.printStackTrace();
//这个点要注意:意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started...");
}
}消费消息:
22:06:27.172 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started...
consumeThread=ConsumeMessageThread_2,queueId=1,content:Order{orderId=1, desc='创建'}
consumeThread=ConsumeMessageThread_3,queueId=3,content:Order{orderId=3, desc='创建'}
consumeThread=ConsumeMessageThread_1,queueId=2,content:Order{orderId=2, desc='创建'}
consumeThread=ConsumeMessageThread_1,queueId=2,content:Order{orderId=2, desc='付款'}
consumeThread=ConsumeMessageThread_3,queueId=3,content:Order{orderId=3, desc='付款'}
consumeThread=ConsumeMessageThread_1,queueId=2,content:Order{orderId=2, desc='推送'}
consumeThread=ConsumeMessageThread_3,queueId=3,content:Order{orderId=3, desc='推送'}
consumeThread=ConsumeMessageThread_2,queueId=1,content:Order{orderId=1, desc='付款'}
consumeThread=ConsumeMessageThread_3,queueId=3,content:Order{orderId=3, desc='完成'}
consumeThread=ConsumeMessageThread_2,queueId=1,content:Order{orderId=1, desc='推送'}
consumeThread=ConsumeMessageThread_2,queueId=1,content:Order{orderId=1, desc='完成'}
consumeThread=ConsumeMessageThread_1,queueId=2,content:Order{orderId=2, desc='完成'}注意事项
用顺序消息:首先要保证消息是有序进入MQ的,消息放入MQ之前,对id等关键字进行取模,放入指定messageQueue,同时consume消费消息失败时,不能返回reconsume——later,这样会导致乱序,所以应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
延时消息
概念
延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。
适用场景
消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时向RocketMQ发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
使用案例
Apache RocketMQ目前只支持固定精度的定时消息,因为如果要支持任意的时间精度,在 Broker 层面,必须要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的产生巨大性能开销。(RocketMQ的商业版本Aliware MQ提供了任意时刻的定时消息功能,Apache的RocketMQ并没有,阿里并没有开源)
Apache RocketMQ发送延时消息是设置在每一个消息体上的,在创建消息时设定一个延时时间长度,消息将从当前发送时间点开始延迟固定时间之后才开始投递。
延迟消息的level,区分18个等级:level为1,表示延迟1秒后消费;level为2表示延迟5秒后消费;level为3表示延迟10秒后消费;以此类推;最大level为18表示延迟2个小时消费。具体标识如下:
| level | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
|---|---|---|---|---|---|---|---|---|---|
| 延迟 | 1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m |
| level | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
| 延迟 | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
是这生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。
生产者
/**
* @author: gongyuzhuo
* @since: 2024-06-13 22:51
* @description:
*/
public class ScheduledMessageProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 定义需要发送的消息总数。
int totalMessagesToSend = 10;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("ScheduledTopic", ("Hello scheduled message" + i).getBytes(StandardCharsets.UTF_8));
// 设置消息的延迟时间级别,这里设置为3,表示延迟10秒后发送。
message.setDelayTimeLevel(3);
producer.send(message);
}
producer.shutdown();
}
}消费者
/**
* @author: gongyuzhuo
* @since: 2024-06-13 23:05
* @description:
*/
public class ScheduledMessageConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ScheduledConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("ScheduledTopic", "*");
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
// 打印消息ID和消息生成到消费的时间间隔。
System.out.println("Receive message[msgId=" + msg.getMsgId() + "]" + (msg.getStoreTimestamp() - msg.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}查看消费者消息信息,打印消费延迟与生产时设定符合。
Receive message[msgId=7F0000013DFC18B4AAC242CFE7460000]10005ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE74E0001]10000ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE7500002]10001ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE7540003]10015ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE7550004]10014ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE7570005]10013ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE7580006]10012ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE7590007]10011ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE75B0008]10009ms later
Receive message[msgId=7F0000013DFC18B4AAC242CFE75C0009]10008ms later批量消息
在高并发场景中,批量发送消息能显著提高传递消息发送时的性能(减少网络连接及IO的开销)。使用批量消息时的限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK(集群时会细讲),且不能是延时消息。
在发送批量消息时先构建一个消息对象集合,然后调用send(Collection msg)系列的方法即可。由于批量消息的4MB限制,所以一般情况下在集合中添加消息需要先计算当前集合中消息对象的大小是否超过限制,如果超过限制也可以使用分割消息的方式进行多次批量发送。
使用案例
一般批量发送(不考虑消息分割)
因为批量消息是一个Collection,所以送入消息可以是List,也可以使Set,这里为方便起见,使用List进行批量组装发送。
public class SimpleBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
producer.shutdown();
e.printStackTrace();
}
// 关闭Producer实例
producer.shutdown();
}
}批量切分发送
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割,案例中以1M大小进行消息分割。
我们需要发送10万元素的数组,这个量很大,怎么快速发送完。使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.example.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class SplitBatchProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
//large batch
String topic = "BatchTest";
List<Message> messages = new ArrayList<>(100 * 1000);
for (int i = 0; i < 100 * 1000; i++) {
messages.add(new Message(topic, "Tag", "OrderID" + i, ("Hello world " + i).getBytes()));
}
//split the large batch into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
List<Message> listItem = splitter.next();
producer.send(listItem);
Thread.sleep(100);
}
producer.shutdown();
System.out.println("Consumer Started .%n");
}
}
class ListSplitter implements Iterator<List<Message>> {
/**
* 1M
*/
private int sizeLimit = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
/**
* 获取下一个消息列表,该列表的总大小不超过指定的限制。
*
* @return 消息列表,其总大小不超过指定的限制。
*/
@Override
public List<Message> next() {
// 初始化下一个索引为当前索引
int nextIndex = currIndex;
// 初始化总大小为0
int totalSize = 0;
// 遍历消息列表,直到找到满足条件的分片或遍历结束
for (; nextIndex < messages.size(); nextIndex++) {
// 获取当前消息
Message message = messages.get(nextIndex);
// 计算当前消息的大小,包括主题和正文的长度
int tmpSize = message.getTopic().length() + message.getBody().length;
// 获取当前消息的属性
Map<String, String> properties = message.getProperties();
// 遍历属性,累加属性键值对的长度到当前消息的大小
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
// 添加额外的20字节开销到当前消息的大小
// 增加日志的开销 20字节
tmpSize = tmpSize + 20;
// 如果当前消息的大小超过了限制,则根据当前是否在首个消息进行处理
if (tmpSize > sizeLimit) {
// 如果是首个消息超过限制,移动下一个索引并退出当前循环
// 单个消息超过了最大限制 1M,否则会阻塞进程
if (nextIndex - currIndex == 0) {
// 假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则退出循环
nextIndex++;
}
break;
}
// 如果当前消息加上已选中的消息总大小超过了限制,则退出循环
if (tmpSize + totalSize > sizeLimit) {
break;
} else {
// 否则,累加当前消息大小到总大小
totalSize += tmpSize;
}
}
// 从当前索引到下一个索引的范围构成子列表,即为本次返回的消息列表
List<Message> subList = messages.subList(currIndex, nextIndex);
// 更新当前索引为下一个索引,为下一次调用做准备
currIndex = nextIndex;
// 返回子列表
return subList;
}
}消息过滤
Tag过滤
使用Tag过滤的方式是在消息生产时传入感兴趣的Tag标签,然后在消费端就可以根据Tag来选择您想要的消息。具体的操作是在创建Message的时候添加,一个Message只能有一个Tag。
使用案例
生产者发送60条消息,分别打上三种tag标签。
/**
* @author: gongyuzhuo
* @since: 2024-06-16 13:46
* @description:
*/
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("TagFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 60; i++) {
Message message = new Message("TagFilterTopic", tags[i % tags.length], ("Hello" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}消费者消费时只选择TagA和TagB的消息。
/**
* @author: gongyuzhuo
* @since: 2024-06-16 13:46
* @description:
*/
public class TagsFilterConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("FilterConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TagFilterTopic", "TagA||TagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String body = new String(msg.getBody());
String tas = msg.getTags();
String property = msg.getProperty("a");
System.out.println("收到消息:" + "topic :" + topic + ",tag :" + tas + ",body :" + body + ",a :" + property);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started .%n");
}
}注意事项
Tag过滤的形式非常简单,||代表或、*代表所有,所以使用Tag过滤这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。
Sql过滤
SQL特性可以通过发送消息时的属性来进行消息的过滤计算。具体的操作是使用SQL92标准的sql语句,前提是只有使用push模式的消费者才能用(消费的模式就是push)
SQL基本语法
数值比较:比如:>,>=,<,<=,BETWEEN,=;
字符比较:
- 比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
逻辑符号:AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
注意事项
Sql过滤需要Broker开启这项功能(如果消费时使用SQL过滤抛出异常错误,说明Sql92功能没有开启),需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然后重启Broker服务。
使用案例
消息生产者,发送消息时加入消息属性,你能通过putUserProperty来设置消息的属性,以下案例中生产者发送10条消息,除了设置Tag之外,另外设置属性a的值。
/**
* @author: gongyuzhuo
* @since: 2024-06-16 14:24
* @description:
*/
public class SqlFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("SqlFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC"};
for (int i = 0; i < 10; i++) {
Message message = new Message("SqlFilterTopic", tags[i % tags.length], ("Hello World" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
message.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(message);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}
}用MessageSelector.bySql来使用sql筛选消息
/**
* @author: gongyuzhuo
* @since: 2024-06-16 14:26
* @description:
*/
public class SqlFilterConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("SqlFilterConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("SqlFilterTopic",
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA','TagB')) " +
"And (a is not null and a between 0 and 3)"));
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String tags = msg.getTags();
String msgBody = new String(msg.getBody(), "utf-8");
String msgPro = msg.getProperty("a");
System.out.println("收到消息:" + ",topic:" + topic + ",tag:" + tags + ",body:" + msgBody + ",a:" + msgPro);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started .%n");
}
}消费结果:按照Tag和SQL过滤消费3条消息。
