Joke Collection Website - Blessing messages - Load balancing of RocketMQ consumer message queue

Load balancing of RocketMQ consumer message queue

First, simply sort out the process of message queue loading from the overall process.

The Rebalance thread loads the message queue every 20s by default, obtains the subject queue information mqSet and all current consumers cidAll of the consumer group, and then allocates the queue according to a certain loading algorithm. The allocation principle is that the same consumer can allocate multiple message queues, and the same message consumption queue will only be allocated to one consumer at the same time. At this point, you can calculate the message queue set allocated by the current consumer and compare the original loading queue with the current allocation queue. If the new queue set does not contain the original queue, the message usage of the original queue is stopped and deleted, and if the original queue does not contain the new allocation queue, a PullRequest is created.

Load balancing is started in the RebalanceService thread, and an mqclientstatus saves a RebalanceService implementation, which starts with the start of mqclientstatus.

As can be seen from the above, MQClientinstance traverses the registered users and executes the doRebalance method on the users.

The above is to traverse the subscription information and reload the queue of each topic. Next, the rebalanceByTopic method will be executed, which will be handled in different ways according to the broadcast mode or the cluster mode. Only the method in cluster mode is explained here.

Get the queue information under this topic and all current user IDs in this user group. Each DefaultMQPushConsumerImpl holds a separate RebalanceImpl object.

Sort the queue information under this topic and all current user IDs in this user group to ensure that members of a user group see the same order and prevent the same user queue from being allocated by multiple users.

AllocateResult records the message queue allocated by the current consumer.

Call updateprocessqueuetableinrebalance to compare whether the message queue has changed.

As can be seen from the above, processQueueTable records the message queue cache table of the current consumer load, and the mqSet in this method records the message queue set of the current consumer after load distribution. If the message queue in processQueueTable does not exist in mqSet, it means that the message queue has been allocated to other consumers, so it is necessary to suspend the message consumption of the message queue, and use * * pq.setdropped (true); This statement will do.

Then use the method of removeuncessarymessagequeue * * to determine whether mq is deleted from the cache.

After that, it began to traverse the message queue allocated to consumers this time and combined with mqSet. If the message queue is not included in processQueueTable, it means that this is a newly added message queue.

First, delete the message progress of the message queue from memory, and then call computePullFromWhere to read the consumption progress of the message queue from disk and create a PullRequest object.

As can be seen from the above, there are three main methods to calculate the progress of messages, some of which are similar.

First, get the consumption progress of message queue from disk. If it is greater than 0, it means that the message queue has been consumed, and the next consumption will continue from this position. If it is equal to-1, try to operate the message storage timestamp as the timestamp initiated by the consumer, and return the found offset if it can be found, and return 0 if it cannot be found; If it is less than-1, it means that the wrong offset is stored in the message progress file, and-1 is returned.

At the end of this method, the dispatchPullRequest method will be called and the PullRequest will be added to the PullMessageService to wake up the PullMessageService thread and pull the message.

At this point, the consumer load balancing aspect has ended.