Joke Collection Website - Blessing messages - Message Delay and Filtering Design of Message Queue

Message Delay and Filtering Design of Message Queue

After receiving the request, Message Queuing will sequentially write the messages to the physical log file. For delayed messages, they are divided and stored asynchronously in hours according to the time to be delivered. Messages of every hour are written into delayed physical files in turn, and the indexes are stored in delayed index files, and the indexes will record {offset and size. Due to the limitation of memory and delayed message distribution, we only read and write the delayed log file sequence of the last two hours through mmap memory mapping mechanism, and the messages delayed for more than two hours are directly written into disk files.

However, when distributing messages, all index files need to be loaded into memory for one hour at a time. Because the message index of every hour is written into the delay index in sequence, and the delivery time of messages is random, the writing order is not consistent with the delivery order of messages. Therefore, after the index is loaded into memory, it needs to be sorted according to the second-level timestamp of the specific delivery of the message, and then the messages in the delay log are read according to the sorted index for distribution and delivery.

This storage scheme has the following problems: 1. It needs to load the whole hour's message index into the memory at one time. If the concurrency is high, the memory pressure will be greater. 2. After sorting according to the second timestamp of message delivery, new messages arriving in real time need to be inserted and sorted in real time, which has low performance and long delay.

In order to solve the above problems, we change the index meta-information {offset, size, delivery timestamp} in delay index to {offset, size, local index, global index, preglobal index}, where:

GlobalIndex can directly locate the index unit in the delay index, so as to determine a message in the delay log. preGlobalIndex can locate the last message in the same second, so as long as the index ID of the last message every hour is stored on the ground, all messages per second can be found in reverse order. There are only 3600 seconds in an hour, and only 3600 index IDs of 16 bytes need to be loaded into memory, so that messages can be loaded in real time every second.

In order to reduce the delay of message distribution, the latest message index of 10s can be preloaded into memory. For messages received in real time, the index globalIndex and the reverse index preGlobalIndex of the latest message in this second can be updated according to the timestamp, without sorting, and the complexity of message insertion and reading is O( 1).

The multi-level time wheel mechanism realized by array and linked list is second level and hour level respectively. The hour level time wheel moves forward by one slot, corresponding to one rotation of the second level time wheel. There are 3600 slots on the second-level time wheel, and the maximum time span of each slot is 1s, and the time wheel moves forward one slot per second. Hourly practice theory: the time span of each slot is 1 hour. Move a slot every hour, open the memory map for the next two hours, and clear the memory map for the delayed log file two hours ago.

When we only have 2 hours and 5 minutes of messages to send, the second-level time wheel needs to be pushed twice, that is, the hour-level time wheel is moved by 2 slots, delayed for 5 minutes, and then downgraded to the second-level time wheel. This is the so-called time cycle.

Generally, every used slot will be put into the DelayQueue, and then the time wheel will be promoted according to the DelayQueue to prevent empty promotion. For example, if there is a task with a delay of 500s, we will not only mount it in the time wheel, but also put it in the DelayQueue, so that the head node of the DelayQueue is delayed by 500 s. If there is no delayed task less than 500s added, we just need to wait for 500 s and push the time wheel once. If there is a new scheduled task with less than 500 seconds, we just need to wake up the DelayQueue and recalculate the waiting time.

That is, when adding a timed task, if the corresponding slot is a new slot (that is, the added task is the first task of the slot), a delayed task is added to the DelayQueue, and it is judged whether it is the head node. If it is, the DelayQueue is awakened to recalculate the waiting time.

When the master node drifts or the network is abnormal, it is necessary to switch the time round distribution control from the original master node to the new master node. In order to ensure the continuity and consistency of the distribution state, the master node synchronizes the tick information distributed in two time rounds to other slave nodes with a time interval of 50 ms Tick can be used to determine the specific number of seconds to be distributed, but it cannot be used to determine the number of messages to be distributed in that second. Therefore, a synchronization parameter localIndex is added to the secondary time wheel to record the number of messages distributed to the current second, and each node will periodically persist the distribution status.

Whenever the master node switches, the original master node switches to the slave node, which will immediately stop the distribution task of the current time round and clear the distribution status; The new master node initializes the two-stage time wheel according to the current synchronous distribution state, but the switching of the master node will have a certain delay period or in extreme cases, the clocks between different nodes will be biased. After the new master initializes the tick of the time wheel, the second-level timestamp corresponding to the tick may be inconsistent with the actual time of the node, and special adjustment is needed before the distribution task is started. If the tick timestamp is less than the current time, the distribution task sleep will wait until the time is aligned. If the tick timestamp is greater than the current time, it means that there are expired messages that have not been distributed. At this point, tick migration is continuously promoted, and expired messages are delivered directly asynchronously until the timestamp corresponding to tick is less than the current time.

Normal main handover can be divided into two situations. One is to release the master actively, such as the node restarting the load balancing process with the master. In this case, before discarding the master node, the node will first synchronize the time wheel distribution state to other slave nodes, and the master node will switch the time wheel distribution completely continuously. The other is the passive drift of the owner under some abnormal circumstances. At this time, on the new master node, the distribution status of the time wheel may be delayed by up to 50ms, and some messages will be distributed repeatedly. Real-time synchronization can be realized by encapsulating the time-round distribution status information into the extended field of the due delivery message protocol. paxos requests synchronization messages with time-round status.

As we all know, RocketMQ supports message filtering, that is, when sending a message, you can set a label for the message. When subscribing to a topic, you can set that only messages with certain tags are consumed, which plays the role of message filtering.

When the client pulls the message, it obtains the hash set codeSet of the tag at the server, and then obtains a record from the consumerQueue to judge whether the recorded hashCode is in the codeSet, so as to achieve the purpose of message filtering and decide whether to send the message to the consumer.

Because there are conflicts in the Hash, the filtering is not completely accurate, so the client will accurately filter the message again after receiving it.

There is also a filtering method, which converts tags into long through hash, and indexes and stores the bitwise OR results of hash values of all tags. When extracting messages, a bitwise AND operation is performed between the token hash value set by the subscription and the hash value in the index. If the result is equal to the hash value of the label set by the subscription, it means that the message corresponding to the index may meet the requirements, and the second precise filtering is still carried out at the client. Otherwise, it definitely does not meet the requirements and is directly filtered out.