首先我们来学习一个简单的消费生产和消费的 demo
.
# 生产消息
RocketMQ
, 给我们提供了三种简单的消息生产方式, 1.同步发送
, 2.异步发送
, 3.直接发送,不关心发送结果
。
这三种方式,分别对应三种不同的应用场景
# 同步发送
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。这种发送方式,在发送成功之后,才会返回,否则会一直阻塞,直到抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class SyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.1.65:9876"); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
|
# 异步发送
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker
的响应。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| public class AsyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.1.65:9876"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount); for (int i = 0; i < messageCount; i++) { final int index = i; Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }
|
# 单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public class OnewayProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("192.168.1.65:9876"); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); producer.sendOneway(msg);
} producer.shutdown(); } }
|
# 消息消费
# Push 模式 消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
consumer.setNamesrvAddr("192.168.1.65:9876");
consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
|
# Pull 模式消费 - Assign
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import java.util.ArrayList; import java.util.Collection; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue;
public class LitePullConsumerAssign {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("please_rename_unique_group_name"); litePullConsumer.setAutoCommit(false); litePullConsumer.start(); Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("TopicTest"); List<MessageQueue> list = new ArrayList<>(mqSet); List<MessageQueue> assignList = new ArrayList<>(); for (int i = 0; i < list.size() / 2; i++) { assignList.add(list.get(i)); } litePullConsumer.assign(assignList); litePullConsumer.seek(assignList.get(0), 10); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s %n", messageExts); litePullConsumer.commitSync(); } } finally { litePullConsumer.shutdown(); } } }
|
# Pull 模式消费 - SUBCRIBE
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import java.util.List; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt;
public class LitePullConsumerSubscribe {
public static volatile boolean running = true;
public static void main(String[] args) throws Exception { DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test"); litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); litePullConsumer.subscribe("TopicTest", "*"); litePullConsumer.start(); try { while (running) { List<MessageExt> messageExts = litePullConsumer.poll(); System.out.printf("%s%n", messageExts); } } finally { litePullConsumer.shutdown(); } } }
|
# 最后
期望和你一起遇见更好的自己
扫码或搜索:方家小白
发送 290992
即可立即永久解锁本站全部文章