Joke Collection Website - Public benefit messages - DianWada distributed task scheduling system-DaJob
DianWada distributed task scheduling system-DaJob
Background
With the development of the Internet, the number of scheduled tasks in application services is increasing day by day. Conventional vertical application architecture can no longer cope with it, and distributed service architecture is imperative. At the same time, there is an urgent need for a distributed task scheduling system to manage scheduled tasks in distributed services.
Single application architecture
When the website traffic is small, only one application is needed to deploy all functions together to reduce deployment nodes and costs. At this time, it would be okay if there are not many scheduled tasks in the application, but if there are many, it means that every time the execution time of a scheduled task is changed, the entire application needs to be redeployed, causing the entire application to stagnate for a period of time.
Vertical application architecture
When the number of visits gradually increases, the acceleration brought by adding machines to a single application becomes smaller and smaller. The application is split into several unrelated applications. Improve efficiency. At this time, the corresponding tasks will also be split vertically, and the impact of each task change will be reduced accordingly.
Distributed service architecture
When there are more and more vertical applications, there may be inevitable interactions between applications. At this time, the core business should be extracted to form a separate service , a variety of services gradually form a stable service center, allowing front-end applications to respond to changing market demands faster. At this time, the distributed service framework used to improve business reuse and integration is the key. At the same time, because the services are independent, scheduled tasks can generally be independent. Therefore, the impact of task changes on the overall system is minimal. .
Distributed task scheduling
Based on the distributed service architecture, since the number of independent businesses may be large, if the scheduled tasks are implemented separately in the service, it is likely to Situations that are difficult to manage occur, and business restarts caused by changes in scheduled tasks cannot be avoided. Therefore, an independent distributed task scheduling system is necessary, which can be used to globally coordinate and manage all scheduled tasks, and at the same time, separate task configurations. If it is extracted as the function of the distributed task scheduling system, changes in scheduled tasks will not affect any business or the entire system.
Architecture design
Design idea
With Dubbo core, scheduling is abstracted separately and becomes a dispatch center. The dispatch center itself is not responsible for implementing any business logic. Simply initiate a scheduling request based on the scheduling configuration
Abstract the task into an ExecutorService, and the task executor will implement the specific task and be responsible for receiving the scheduling request and executing it. In this way, the design can completely resolve the task and the scheduling center. Coupling to improve the scalability of the entire system and facilitate access
Extract some of the calling operations of the dispatch center to the task executor to form a separate management console that can be used to view the task execution status, and at the same time The management console is implemented through H5 and provides external Restful API to facilitate expansion and access
Through various middleware, some necessary operations, such as alarms, monitoring, log collection and statistics, etc. are implemented, completely Guaranteeing the security and stability of the entire system
The dispatch center cluster stores the consistent HashCode of each Scheduler through Zookeeper to allocate the relationship between Jobs and Schedulers: the new Job will pass The consistency HashCode of each Scheduler obtains its corresponding Job number, calculates the weight of each Scheduler based on the number of Jobs and the relevant properties of the Scheduler, and determines which Scheduler the currently newly added Job should be assigned to based on the weight. .
When a new Scheduler is added, the Job of the new Scheduler is normally 0. Therefore, under normal conditions, the weight of the new Scheduler will be relatively large.
At the same time, the dispatch center implements active and backup switching of Schedular through Zookeeper to ensure system stability
System composition
Schedular
Based on Quartz Implements scheduling and provides an interface for executor operations, which is used to operate task scheduling configuration, scheduling triggering and other operations; it does not participate in the implementation of task logic and will not be limited by tasks
Executor
Responsible for receiving scheduling requests initiated by the dispatching center, implementing corresponding business logic, completing task execution, and processing task logic, recording corresponding logs and sending them to the dispatching center after the task is completed
Management Console
The management console is responsible for displaying task status, execution status, task execution log and other report data. At the same time, you can configure new tasks, operation task status, pause/resume, etc. through the management console; in addition, Provides access to external Restful API and H5
Dubbo Monitor
Real-time monitoring of dispatch center interface calls, statistics of dispatch frequency, success and failure, QPS, etc., which can be optimized through these report data Task scheduling, system optimization
ELK
Use ELK (ElasticSearch Logstash Kibana) to collect the execution logs of the scheduling center and each execution machine, and analyze the statistics to form reports, which can be conveniently provided Observation
Alarm
Alarm system, configure alarm rules through the Chronograf console, and send email and SMS alarms through Kapacitor as soon as a problem occurs, which can effectively improve the timeliness of error prompts. In addition, the time consumed from error occurrence to error resolution is reduced, and the losses caused by the production environment are reduced. Alarm data is obtained through the business dashboard.
For example: you can configure the current CPU usage of the online machine, set the threshold to 50, and the policy is to send an SMS alarm when the set threshold is exceeded. At this time, when the CPU of a certain online machine exceeds 50, an SMS alarm will be sent
Business Dashboard
Collect interface monitoring data in real time through management, transmit it to kafka through logstash, and then distribute it to jstorm through kafka for processing. After processing, it is stored in influxdb to form a business Dashboard, and finally generate monitoring reports through the Grafana console
Configuration Center
All services in the entire system manage the corresponding configurations through the configuration center to form a distributed configuration management mechanism to facilitate the system. Configuration consistency and accuracy of each service
After configuration, you can implement your own task logic
After access, you can view it online in real time through the log management console Task execution status
In addition, due to the alarm system, when the task execution is abnormal, alarm emails and text messages will be generated and sent in real time to inform the task access person and the corresponding R&D personnel
Configuration Center attribute configuration
Features
Support dynamic suspension/resumption of tasks
When the task status is stopped, the task will no longer be triggered. If the task is executed during execution, Pause, the executing task will not be blocked (due to the timeout failure status in the task execution result status, if the currently executing task is blocked when the pause button is clicked, the execution status of the current task will become timeout, which is inconsistent with The real meaning of the timeout status) will delay the stop, that is, wait until the current task is completed before actually stopping the task
The dispatch center is implemented based on Quartz, and realizes active and backup isolation through Zookeeper to ensure the HA of the dispatch center
When the active node goes down, the cold standby node will load all the tasks that are being scheduled in the active node and become the new active node to ensure the accurate execution of task scheduling
The execution machine supports cluster deployment. Distributed task execution, unified scheduling through the dispatch center
Executor load balancing, by default, the execution machine scheduling weight is calculated based on the number of executions of the task on a certain execution machine, and the task scheduling is selected based on the weight. Each executor can achieve load balancing and can manually change the executor selection strategy
Executor cluster mode,
failover, failfast, failsafe, failback, forking, the default is failover (failover) ), when the call fails, retry other servers; failfast (fast failure), only initiates a call and will not retry, and an error is reported immediately upon failure; failsafe (failure safety), when an exception occurs, it is ignored directly; failback (failure recovery) , when the call fails, it will be resent regularly until it succeeds, and the restart will be lost; forking, multiple execution machines are called in parallel, and as long as one succeeds, it will be returned
Shard tasks, support task sharding, and send parameters to Task execution machine, the execution machine can develop sharding jobs by judging parameters, and supports dynamic sharding to use sharding parameters and execution
The row machine performs latitude sharding and supports dynamic expansion of execution machines and sharding parameters
For example, an order table is sharded by latitude based on the order ID, and is sharded modulo 3. Then you can set three sharding parameters, (0, 1, 2). After getting one of the sharding parameters through the context, the execution machine can make a judgment on the sharding parameter to perform specific data operations, such as when taking When the obtained shard parameter is 0, perform data query operations on the shard corresponding to 0, and so on
For another example, an order table is sharded two-dimensionally based on the order ID and Status. ID Modulo 3 is used for sharding, and the status is divided into two states: success and failure. At this time, 6 sharding parameters can be set, ({'id': 0, 'status': 0}, {' id': 1, 'status': 0}, {'id': 2, 'status': 0}, {'id': 0, 'status': 1}, {'id': 1, 'status' :1}, {'id': 2, 'status': 1}), after the executor gets one of the sharding parameters through the context, it can judge its json parameter to implement the sharding operation p>
Task execution consistency, each task will only be executed by one executor; for sharded tasks, when the executor cluster is deployed, a task schedule will broadcast and trigger the execution of the corresponding number of executors in the corresponding cluster. Once the task is completed, the sharding parameters are passed at the same time, and the sharding tasks can be developed based on the sharding parameters.
When the sharding parameter is greater than the number of executors, one or more executors of the current sharding task will execute multiple sharding tasks according to the executor routing policy
For example: the current sharding task sharding parameters are (a, b, c), and there are 3 current task execution machines (A, B, C), the sharding parameters will be sent to a certain machine evenly and randomly. Execution machines, and the three execution machines only receive one shard parameter at a time and perform task processing once, that is (a-gt; A, b-gt; B, c-gt; C) | (a-gt; A, b -gt;C,c-gt;B)|(a-gt;B,b-gt;A,c-gt;C)|(a-gt;B,b-gt;C,c-gt;A )|(a-gt; C, b-gt; A, c-gt; B) | (a-gt; C, b-gt; B, c-gt; A);
When When there are only two task execution machines (A, B), each time the task is scheduled, a certain execution machine receives two fragmentation parameters and processes the two fragmentation parameters respectively, that is (a-gt; A, b -gt;B,c-gt;A)|(a-gt;A,b-gt;B,c-gt;B)|(a-gt;B,b-gt;A,c-gt;A )|(a-gt; B, b-gt; A, c-gt; B);
When the task execution machine is greater than the number of sharding parameters, it is 4 (A, B, C , D), (a-gt; A, b-gt; B, c-gt; C)|(a-gt; A, b-gt; C, c-gt; D)|(a-gt; A, b-gt; D, c-gt; B)| (a-gt; B, b-gt; C, c-gt; D)| (a-gt; B, b-gt; D, c- gt;A)|(a-gt;B,b-gt;A,c-gt;C)|(a-gt;C,b-gt;A,c-gt;B)|(a-gt; C, b-gt; B, c-gt; D)| (a-gt; C, b-gt; D, c-gt; A)| (a-gt; D, b-gt; A, c- gt;B)|(a-gt;D,b-gt;B,c-gt;C)|(a-gt;D,b-gt;C,c-gt;A), and statistically, no matter The number of executors is greater than, equal to, or less than the number of sharding parameters. The sharding parameters distributed to the executors are always consistent (the total sharding parameters received by each executor are uniform)
Alarm system, the system is connected to the internal alarm system, and supports email, SMS, DingTalk, phone and other alarms when a task fails
Flexible expansion and contraction, the dispatch center will detect the task execution machine in real time, so once If an execution machine goes online or offline, it will be detected. If it is not detected by the dispatch center, you can manually detect the execution machine. After the execution machine is detected, the next schedule will reassign tasks p>
Task dependency supports configuring task dependencies. When the parent task is completed, it will automatically trigger the execution of the subtask. For example: there are two tasks, A and B, and the execution condition of B is to confirm task A. After the execution, B can be executed. Otherwise, the B task will not be executed. The dependency relationship at this time is that the B task depends on the A task. At this time, after the A task is configured, set the B task as a dependent task, and the dependency relationship is A; and For example, there are 6 tasks, namely A1, A2, A3, A4, A5, and B. The execution of task B needs to ensure that all five tasks A1-5 are executed before task B is executed. At this time, the dependencies of B That is, task B depends on A1-5. At this time, after configuring the five tasks A1-5, set B as a dependent task, and the dependency relationship is A1-5
Supports viewing task execution status at runtime , the abnormal execution recovery mechanism for statistical information such as the number of tasks, the number of calls, and the number of executors, etc., sometimes encounters uncontrollable situations, that is, the execution results of the executor after execution are inconsistent due to uncontrollable factors such as network disconnection.
It can be sent to the dispatch center. At this time, it can be temporarily recorded through the abnormal execution recovery mechanism. The next time the execution machine starts normally, it will be retried and sent to the dispatch center for scheduling to trigger manual execution. Under special needs, the schedule may be required to be executed manually, such as scheduling After a task fails, you may need to perform a manual schedule to compensate
Parallel/serial strategy. When the timing time is much larger than the task execution time, you can use the parallel strategy, and the task is called and executed asynchronously to improve the accuracy of task scheduling; When the task execution time may be longer than the scheduled time, but the task needs to be scheduled according to a certain timing rule, you can use the serial strategy. The last trigger of the current task scheduled by the scheduling center. If it has not been executed, the next trigger of the current execution machine The scheduled time point will not be triggered when and only when the task execution ends to prevent the time uncertainty of some continuous scheduled tasks from causing data confusion when asynchronously triggered. For example: the scheduled time of a certain task is 10 seconds, but the task itself may be executed for more than 10 seconds. If it exceeds 10 seconds, if tasks at two points in time are executed in parallel, data confusion will occur. At this time, a serial strategy can be used to ensure that the current If a task on the execution machine is not completed, new execution will not be triggered
Supports scheduling interface data monitoring and generates monitoring reports for easy observation.
Summary
For Internet companies, time is money, and efficiency determines everything. During the nearly three months since it was connected to the beginning of August, this system performed exceptionally well and was scheduled approximately 1 million times, providing convenience for task scheduling for various services within the company.
Original text
- Previous article:Knocking text messages.
- Next article:Send blessing messages to leaders during Mid-Autumn Festival.
- Related articles
- What should I do if the SMS service fee of the bank card cannot be deducted?
- Method for open and entering honor privacy space
- Will wechat grab the red envelope display?
- Why can't ICBC mobile banking transfer exceed 1 10,000?
- Is there no SMS verification code for the withdrawal of wealth management balance treasure?
- What secret does the National Anti-Bombing Center choose for SMS settings?
- You broke up with your first love, but two years later, on Valentine's Day, you received a message like "I miss you very much" from your first love, but you were not together and there was no connecti
- Is there a difference between lte and the full netcom version of the tablet?
- A woman can cheat a man for a lifetime with one word.
- The Mid-Autumn Festival is coming. Talking about hair bands.