在大多数情况下, TAG 是一个简单而有用的设计,其可以来选择您想要的消息。例如:

1
2
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含 TAGATAGBTAGC 的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用 SQL 表达式筛选消息。SQL 特性可以通过发送消息时的属性来进行计算。在 RocketMQ 定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------

# 基本语法

RocketMQ 只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如: >>=<<=BETWEEN=
  • 字符比较,比如: =<>IN
  • IS NULL 或者 IS NOT NUL L;
  • 逻辑符号 ANDORNOT
  • 常量支持类型为:
  • 数值,比如: 1233.1415
  • 字符,比如: 'abc' ,必须用单引号包裹起来;
  • NULL ,特殊的常量
  • 布尔值, TRUEFALSE
  • 只有使用 push 模式的消费者才能用使用 SQL92 标准的 sql 语句,接口如下:
    1
    public void subscribe(finalString topic, final MessageSelector messageSelector)

# 使用样例

# 生产者样例

发送消息时,你能通过 putUserProperty 来设置消息的属性

1
2
3
4
5
6
7
8
9
10
11
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// 设置一些属性
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);

producer.shutdown();

# 消费者样例

MessageSelector.bySql 来使用 sql 筛选消息

1
2
3
4
5
6
7
8
9
10
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// 只有订阅的消息有这个属性a, a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();

# 最后

期望和你一起遇见更好的自己