大家好,我是君哥。

曲松网站建设公司创新互联,曲松网站设计制作,有大型网站制作公司丰富经验。已为曲松超过千家提供企业网站建设服务。企业网站搭建\外贸网站建设要多少钱,请找那个售后服务好的曲松做网站的公司定做!
RocketMQ 消息消费有两种模式,PULL 和 PUSH,今天我们来看一下这两种模式有什么区别。
首先看一段 RocketMQ 推模式的一个官方示例:
public static void main(String[] args) throws InterruptedException, MQClientException {
    Tracer tracer = initTracer();
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
    consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageOpenTracingHookImpl(tracer));
    consumer.subscribe("TopicTest", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.setConsumeTimestamp("20181109221800");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
} 消费者会定义一个消息监听器,并且把这个监听器注册到 DefaultMQPushConsumer,同时也会注册到 DefaultMQPushConsumerIm-pl,当拉取到消息时,就会使用这个监听器来处理消息。那这个监听器是什么时候调用呢?看下面的 UML 类图:
消费者真正拉取请求的类是 DefaultMQPush-ConsumerImpl,这个类的 pullMessage 方法调用了 PullAPIWrapper 的 pullKernelImpl 方法,这个方法有一个参数是回调函数 Pull-Callback,当 PULL 状态是 PullStatus.FOU-ND 时,代表拉取消息成功,处理逻辑如下:
PullCallback pullCallback = new PullCallback() {
    @Override
    public void onSuccess(PullResult pullResult) {
        if (pullResult != null) {
            pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                subscriptionData);
            switch (pullResult.getPullStatus()) {
                case FOUND:
                    //省略部分逻辑
                        DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                            pullResult.getMsgFoundList(),
                            processQueue,
                            pullRequest.getMessageQueue(),
                            dispatchToConsume);
                        //省略部分逻辑
                    break;
                //省略其他case
                default:
                    break;
            }
        }
    }
    @Override
    public void onException(Throwable e) {
        //省略
    }
};这个处理逻辑调用了 ConsumeMessage-Service 类的 submitConsumeRequest 方法,我们看一下并发消费消息的处理逻辑,代码如下:
public void submitConsumeRequest(
final Listmsgs, 
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
//分批处理,跟上面逻辑一致
}
ConsumeRequest 类是一个线程类,run 方法里面调用了消费者定义的消息处理方法,代码如下:
public void run() {
    //省略逻辑
    MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
    //省略逻辑
    try {
        //调用消费方法
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        //省略逻辑
    }
    //省略逻辑
}下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:
下面是来自官方的一段 PULL 模式拉取消息的代码:
DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("lite_pull_consumer_test");
litePullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
litePullConsumer.subscribe("TopicTest", "*");
litePullConsumer.start();
try {
    while (running) {
        List messageExts = litePullConsumer.poll();
        System.out.printf("%s%n", messageExts);
    }
} finally {
    litePullConsumer.shutdown();
} 这里我们看到,PULL 模式需要在处理逻辑里不停的去拉取消息,比如上面代码中写了一个死循环。那 PULL 模式中 poll 函数是怎么实现的呢?我们看下面的 UML 类图:
跟踪源码可以看到,消息拉取最终是从 DefaultLitePullConsumerImpl 类中的一个 LinkedBlockingQueue 上面拉取。那消息是什么时候 put 到 LinkedBlockingQueue 呢?
官方拉取消息的代码中有一个 subscribe 方法订阅了 Topic,这里相关的 UML 类图如下:
这个 subscribe 方法最终调用了 DefaultLite-PullConsumerImpl 类的 subscribe,代码如下:
public synchronized void subscribe(String topic, String subExpression) throws MQClientException {
    try {
        //省略逻辑
        this.defaultLitePullConsumer.setMessageQueueListener(new MessageQueueListenerImpl());
        assignedMessageQueue.setRebalanceImpl(this.rebalanceImpl);
        //省略逻辑
    } catch (Exception e) {
        throw new MQClientException("subscribe exception", e);
    }
}这里给 DefaultLitePullConsumer 类的 messageQueueListener 这个监听器进行了赋值。当监听器监听到 MessageQueue 发送变化时,就会启动消息拉取消息的线程 Pull-TaskImpl,代码如下:
public void run() {
    //省略部分逻辑
 if (!this.isCancelled()) {
  long pullDelayTimeMills = 0;
  try {
   PullResult pullResult = pull(messageQueue, subscriptionData, offset, defaultLitePullConsumer.getPullBatchSize());
   switch (pullResult.getPullStatus()) {
          case FOUND:
            final Object objLock = messageQueueLock.fetchLockObject(messageQueue);
            synchronized (objLock) {
              if (pullResult.getMsgFoundList() != null && !pullResult.getMsgFoundList().isEmpty() && assignedMessageQueue.getSeekOffset(messageQueue) == -1) {
                processQueue.putMessage(pullResult.getMsgFoundList());
                submitConsumeRequest(new ConsumeRequest(pullResult.getMsgFoundList(), messageQueue, processQueue));
              }
            }
            break;
    //省略其他 case
   }
  } 
  //省略 catch
  if (!this.isCancelled()) {
         //启动下一次拉取
      scheduledThreadPoolExecutor.schedule(this, pullDelayTimeMills, TimeUnit.MILLISECONDS);
  } else {
     log.warn("The Pull Task is cancelled after doPullTask, {}", messageQueue);
  }
 }
}拉取消息成功后,调用 submitConsume-Request 方法把拉取到的消息放到 consumeRequestCache,然后启动下一次拉取。
这样就清除了示例代码中 poll 消息的逻辑,那还有一个问题,监听器是什么时候触发监听事件呢?
在消费者启动时,会启动 RebalanceService 这个线程,这个线程的 run 方法如下:
public void run() {
    while (!this.isStopped()) {
        this.waitForRunning(waitInterval);
        this.mqClientFactory.doRebalance();
    }
}下面的 UML 类图显示了 doRebalance 方法的调用关系:
可以看到最终调用了 最终调用了 Rebalance-LitePullImpl 的 messageQueueChanged 方法,代码如下:
public void messageQueueChanged(String topic, SetmqAll, Set mqDivided) { 
MessageQueueListener messageQueueListener = this.litePullConsumerImpl.getDefaultLitePullConsumer().getMessageQueueListener();
if (messageQueueListener != null) {
try {
messageQueueListener.messageQueueChanged(topic, mqAll, mqDivided);
} catch (Throwable e) {
log.error("messageQueueChanged exception", e);
}
}
}
这里最终触发了监听器。
下面以并发消费方式下的同步拉取消息为例总结一下消费者消息处理过程:
通过本文的讲解,可以看到 PUSH 模式和 PULL 模式本质上都是客户端主动拉取,RocketMQ并没有真正实现 Broker 推送消息的 PUSH 模式。RocketMQ 中 PULL 模式和 PUSH 模式的区别如下:
PULL 模式是从 Broker 拉取消息后放入缓存,然后消费端不停地从缓存取出消息来执行客户端定义的处理逻辑,而 PUSH 模式是在死循环中不停的从 Broker 拉取消息,拉取到后调用回调函数进行处理,回调函数中调用客户端定义的处理逻辑。
PUSH 模式拉取消息依赖死循环来不停唤起业务,而 PULL 模式拉取消息是通过 MessageQueue 监听器来触发消息拉取线程,消息拉取线程会在拉取完一次后接着下一次拉取。
                分享名称:面试官:RocketMQ 的推模式和拉模式有什么区别?
                
                浏览地址:http://www.csdahua.cn/qtweb/news33/127283.html
            
网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等
声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网