网创优客建站品牌官网
为成都网站建设公司企业提供高品质网站建设
热线:028-86922220
成都专业网站建设公司

定制建站费用3500元

符合中小企业对网站设计、功能常规化式的企业展示型网站建设

成都品牌网站建设

品牌网站建设费用6000元

本套餐主要针对企业品牌型网站、中高端设计、前端互动体验...

成都商城网站建设

商城网站建设费用8000元

商城网站建设因基本功能的需求不同费用上面也有很大的差别...

成都微信网站建设

手机微信网站建站3000元

手机微信网站开发、微信官网、微信商城网站...

建站知识

当前位置:首页 > 建站知识

pushConsumer拉取消息流程是怎样的

这篇文章主要介绍“pushConsumer拉取消息流程是怎样的”,在日常操作中,相信很多人在pushConsumer拉取消息流程是怎样的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”pushConsumer拉取消息流程是怎样的”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

从事服务器托管雅安,服务器租用,云主机,网站空间,空间域名,CDN,网络代维等服务。

这是一段RocketMq经典的consumer异步获取broker消息的代码:

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");    consumer.setNamesrvAddr(Constants.NameServerAddr);    consumer.subscribe("topic01","*");    consumer.setMessageModel(MessageModel.BROADCASTING);//广播消息,所有相同组,定于topic的消费端都能收到消息    //consumer.setMessageModel(MessageModel.CLUSTERING);//集群消息--默认(相同组内的topic,集群消息只有一端会接收到)    consumer.registerMessageListener(new MessageListenerConcurrently(){      @Override      public ConsumeConcurrentlyStatus consumeMessage(List list,          ConsumeConcurrentlyContext consumeConcurrentlyContext) {        for (MessageExt messageExt:list){          System.out.println(new java.lang.String(messageExt.getBody()));        }        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;      }    });    consumer.start();  }

consumer start()方法跟踪

  1. this.defaultMQPushConsumerImpl.start();

  2. 刚启动serviceState状态为 CREATE_JUST,进入这个状态的switch处理逻辑

3. 先用checkCoing()检查consumer的各个配置是否配置ok

4. 然后 copySubscription()用于根据subject构建本地的rebalance的conhurrentHashMapInner

5. 接着构建MqClientFactory的一个Instance

6. 构建PullWrapper,用于去Broker注册过滤消息

7. 再根据MessageMode是广播模式还是集群模式获取offset。(广播模式是从consumer本地的store获取,集群模式则是需要去broker去请求获取)

8. 根据监听消息的类型是OrderLy还是Concurrently去构建一个consumeMessageService对象

9.启动刚才创建的consumerMessageService对象,调用其start方法

10. 使用MqClientFactory Instance实例registerConsumer进行注册

11. 把当前的serviceState状态变为Running状态

12.然后就开始从broker获取消息,请看下面的pushConsumer拉取消息流程

pushConsumer拉取消息流程介绍 

consumer  --DefaultMqPushConsumerImpl 使用pullMessage(pullRequest)拉取消息,pullAPIWrapper.pullKernelImpl(传递pullReuest,回调callback等参数)根据是否同步pullMessageSync还是异步pullMessageAsync, 拉取回来的消息PullResult经过解析处理存放到ProcessQueue 队列里的TreeMap(offset,messageExt)

到此,关于“pushConsumer拉取消息流程是怎样的”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


网站名称:pushConsumer拉取消息流程是怎样的
网页路径:http://bjjierui.cn/article/iggiid.html

其他资讯