Group管理,查看GroupID下單個消費端堆棧信息,期望只展示與該GroupID相關" />
    • <bdo id="qgeso"></bdo>
        • <strike id="qgeso"></strike>
        • <sup id="qgeso"></sup><center id="qgeso"></center>
        • <input id="qgeso"></input>

          RocketMQ控制臺消費者堆棧信息展示優化分析-當前播報

          首頁 > 探索 > > 正文

          日期:2023-03-28 10:14:01    來源:今日頭條    
          背景介紹

          專有云企業版v_3_12,消息隊列RocketMQ控制臺->Group管理,查看Group ID下單個消費端堆棧信息,期望只展示與該Group ID相關的堆棧信息,在以下場景與期望不符。


          (相關資料圖)

          場景介紹

          在同一個程序中創建兩個不同Group ID的消費端實例,在控制臺中查看一個Group ID下單個消費端堆棧信息,堆棧信息中包含了兩個Group ID消費端的堆棧信息,給排查問題造成了困擾。

          示例代碼pom
            com.aliyun.openservices  ons-client  1.8.8.3.Final
          code
          import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.aliyun.openservices.ons.api.batch.BatchMessageListener;import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;import com.aliyun.openservices.ons.api.bean.Subscription;import java.util.HashMap;import java.util.Map;import java.util.Properties;public class Main {    public static void main(String[] args){        String nameSrvAddr = "xxx";        String accessKey = "xxx";        String secretKey = "xxx";        String groupId1 = "Goup_ID_1";        String topic1 = "xxx_1";        String tag1 = "xxx_1";        BatchMessageListener batchMessageListener1 = (messages, context) -> Action.CommitMessage;        BatchConsumerBean batchConsumerBean1 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,                                                                 groupId1,topic1,tag1,batchMessageListener1);        batchConsumerBean1.start();        String groupId2 = "Goup_ID_2";        String topic2 = "xxx_2";        String tag2 = "xxx_2";        BatchMessageListener batchMessageListener2 = (messages, context) -> Action.CommitMessage;        BatchConsumerBean batchConsumerBean2 = batchConsumerBean(nameSrvAddr,accessKey,secretKey,                                                                 groupId2,topic2,tag2,batchMessageListener2);        batchConsumerBean2.start();    }    private static BatchConsumerBean batchConsumerBean(String nameSrvAddr,String accessKey,String secretKey,String groupId,String topic,String tag,BatchMessageListener batchMessageListener){        BatchConsumerBean batchConsumerBean = new BatchConsumerBean();        Properties properties = new Properties();        properties.put(PropertyKeyConst.NAMESRV_ADDR,nameSrvAddr);        properties.put(PropertyKeyConst.AccessKey,accessKey);        properties.put(PropertyKeyConst.SecretKey,secretKey);        properties.put(PropertyKeyConst.GROUP_ID,groupId);        batchConsumerBean.setProperties(properties);        Subscription subscription = new Subscription();        subscription.setTopic(topic);        subscription.setExpression(tag);        Map subscriptionTable = new HashMap<>();        subscriptionTable.put(subscription,batchMessageListener);        batchConsumerBean.setSubscriptionTable(subscriptionTable);        return batchConsumerBean;    }}
          分析過程

          首先分析示例代碼中與BatchConsumerBean相關聯的對象,然后分析控制臺展示消費端堆棧信息的流程,最后分析下不同版本的RocketMQ Client SDK對消費端消費線程命名方式的變化。

          BatchConsumerBean

          示例代碼中創建了兩個BatchConsumerBean實例,與BatchConsumerBean實例相關聯的對象如下:

          與BatchConsumerBean關聯的對象

          從上圖看,BatchConsumerBean實例是比較重的,所以上面的示例代碼可以優化為只創建一個BatchConsumerBean實例,與該問題不太相關,暫時忽略;上圖中與該問題直接相關的是ClientRemotingProcessor、MQClientInstance、DefaultMQPushConsumerImpl、ConsumerStatsManager,下面繼續分析。

          堆棧信息展示流程

          下面描述的是在瀏覽器請求一個Group ID單個消費端堆棧信息的流程。

          堆棧信息展示流程

          瀏覽器請求控制臺應用

          當在控制臺單機某個消費端堆棧信息的時候,瀏覽器會向控制臺應用發起http請求,主要請求參數是:GroupID,ClientId,其中每個MQClientInstance實例對應一個ClientId。

          控制臺應用請求Broker

          控制臺應用收到瀏覽器請求后,主要進行以下操作:

          String topic = MixAll.RETRY_GROUP_TOPIC_PREFIX + consumerGroup;TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);List brokerDatas = topicRouteData.getBrokerDatas();if (brokerDatas != null) {    for (BrokerData brokerData : brokerDatas) {        String addr = brokerData.selectBrokerAddr();        if (addr != null) {            return this.mqClientInstance.getMQClientAPIImpl().getConsumerRunningInfo(addr, consumerGroup, clientId, jstack,timeoutMillis * 3);        }    }}
          根據%RETRY% + GroupIID查找對應的TopicRouteData從TopicRouteData中選擇一個Broker的地址發送getConsumerRunningInfo請求Broker請求Consumer

          Broker收到請求后,主要進行以下操作:

          ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);newRequest.setExtFields(request.getExtFields());newRequest.setBody(request.getBody());return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);
          AdminBrokerProcessor響應查詢請求根據GroupID和ClientId找到對應Consumer實例的channel socket通過channel socket發送請求到Consumer實例Consumer處理邏輯

          Consumer收到請求后,主要進行以下操作:

          ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());if (requestHeader.isJstackEnable()) {  Map map = Thread.getAllStackTraces();  String jstack = UtilAll.jstack(map);  consumerRunningInfo.setJstack(jstack);}
          通過MQClientInstance實例請求Consumer實例的consumerRunningInfo方法獲取Consumer運行信息,如:pullRT、pullTPS、consumeRT、consumeOKTPS、consumeFailedTPS等信息獲取JVM所有線程棧信息將獲取到的ConsumerRunningInfo返回給Broker。

          其中第2步【獲取JVM所有線程棧信息】就是我們需要查看的堆棧信息,目前控制臺主要展示了以ConsumeMessageThread__開頭的線程和RebalanceService線程,這塊期望只展示與該消費端相關的ConsumeMessageThread__線程和Rebalance線程,不期望將不相關的消費端線程也展示出來。

          ConsumeMessageThread線程的命名

          在當前版本中處理業務的消費者線程名的形式是:ConsumeMessageThread_數字,ConsumeMessageConcurrentlyService類中相關代碼如下:

          //該線程池用于處理業務邏輯this.consumeExecutor = new ThreadPoolExecutor(  this.defaultMQPushConsumer.getConsumeThreadMin(),  this.defaultMQPushConsumer.getConsumeThreadMax(),  1000 * 60,  TimeUnit.MILLISECONDS,  this.consumeRequestQueue,  new ThreadFactoryImpl("ConsumeMessageThread_"));

          新版本中線程的命名中增加了GroupId,相關代碼如下:

          String consumeThreadPrefix = null;if (consumerGroup.length() > 100) {    consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup, 0, 100).append("_").toString();} else {    consumeThreadPrefix = new StringBuilder("ConsumeMessageThread_").append(consumerGroup).append("_").toString();}this.consumeExecutor = new ThreadPoolExecutor(    this.defaultMQPushConsumer.getConsumeThreadMin(),    this.defaultMQPushConsumer.getConsumeThreadMax(),    1000 * 60,    TimeUnit.MILLISECONDS,    this.consumeRequestQueue,    new ThreadFactoryImpl(consumeThreadPrefix));

          線程名形式為:ConsumeMessageThread_GroupId__數字,從一定程度對以上問題進行了優化。

          總結ONS SDK對RocketMQ Client進行了封裝,更加方便業務的使用,Consumer對象比較重,需要根據業務采用合理的初始化方式ConsumerStatsManager記錄了消費端的一些統計信息ConsumeMessageConcurrentlyService對消費端線程命名進行了優化?

          關鍵詞:

          下一篇:諸多精進行跡(六)
          上一篇:最后一頁

          科技

           
          国产三级日本三级日产三级66,五月天激情婷婷大综合,996久久国产精品线观看,久久精品人人做人人爽97
          • <bdo id="qgeso"></bdo>
              • <strike id="qgeso"></strike>
              • <sup id="qgeso"></sup><center id="qgeso"></center>
              • <input id="qgeso"></input>
                主站蜘蛛池模板: 最近中文字幕视频高清| 无码喷水一区二区浪潮AV| 9999国产精品欧美久久久久久| 再深点灬舒服灬太大了69| 女人把腿给男人桶视频app| 美女色又黄一级毛片| 久久99精品久久久大学生| 三上悠亚ssni_229在线播放| 自拍偷自拍亚洲精品播放| 无翼乌口工全彩无遮挡里| 国产www视频| 中文字幕人妻色偷偷久久| 要灬要灬再深点受不了好舒服| 中文字幕一区在线观看| 国产欧美日韩精品丝袜高跟鞋| 最新亚洲人成无码网站| 调教办公室在线观看| 丁香伊人五月综合激激激| 催眠体验馆最新章节| 国自产精品手机在线观看视频| 最近中文字幕国语免费完整| 老子影院伦不卡欧美| www.狠狠操| 久久综合九色综合欧美狠狠| 国产真实伦在线视频免费观看| 日本边添边摸边做边爱喷水| 欧美欧洲性色老头老妇| 亚洲国产精品自产在线播放| 国产欧美精品区一区二区三区| 亚洲午夜无码久久久久小说| 亚洲乱码国产乱码精品精| 午夜成人免费视频| 中文字幕人成乱码中国| 粉色视频成年免费人15次| 在线观看的黄网| 亚洲国产小视频| 高清无码一区二区在线观看吞精 | 国产成人综合美国十次| 久久久无码精品午夜| 精品国偷自产在线不卡短视频| 女性特黄一级毛片|