Joke Collection Website - Public benefit messages - DianWada distributed task scheduling system-DaJob

DianWada distributed task scheduling system-DaJob

Background

With the development of the Internet, the number of scheduled tasks in application services is increasing day by day. Conventional vertical application architecture can no longer cope with it, and distributed service architecture is imperative. At the same time, there is an urgent need for a distributed task scheduling system to manage scheduled tasks in distributed services.

Single application architecture

When the website traffic is small, only one application is needed to deploy all functions together to reduce deployment nodes and costs. At this time, it would be okay if there are not many scheduled tasks in the application, but if there are many, it means that every time the execution time of a scheduled task is changed, the entire application needs to be redeployed, causing the entire application to stagnate for a period of time.

Vertical application architecture

When the number of visits gradually increases, the acceleration brought by adding machines to a single application becomes smaller and smaller. The application is split into several unrelated applications. Improve efficiency. At this time, the corresponding tasks will also be split vertically, and the impact of each task change will be reduced accordingly.

Distributed service architecture

When there are more and more vertical applications, there may be inevitable interactions between applications. At this time, the core business should be extracted to form a separate service , a variety of services gradually form a stable service center, allowing front-end applications to respond to changing market demands faster. At this time, the distributed service framework used to improve business reuse and integration is the key. At the same time, because the services are independent, scheduled tasks can generally be independent. Therefore, the impact of task changes on the overall system is minimal. .

Distributed task scheduling

Based on the distributed service architecture, since the number of independent businesses may be large, if the scheduled tasks are implemented separately in the service, it is likely to Situations that are difficult to manage occur, and business restarts caused by changes in scheduled tasks cannot be avoided. Therefore, an independent distributed task scheduling system is necessary, which can be used to globally coordinate and manage all scheduled tasks, and at the same time, separate task configurations. If it is extracted as the function of the distributed task scheduling system, changes in scheduled tasks will not affect any business or the entire system.

Architecture design

Design idea

With Dubbo core, scheduling is abstracted separately and becomes a dispatch center. The dispatch center itself is not responsible for implementing any business logic. Simply initiate a scheduling request based on the scheduling configuration

Abstract the task into an ExecutorService, and the task executor will implement the specific task and be responsible for receiving the scheduling request and executing it. In this way, the design can completely resolve the task and the scheduling center. Coupling to improve the scalability of the entire system and facilitate access

Extract some of the calling operations of the dispatch center to the task executor to form a separate management console that can be used to view the task execution status, and at the same time The management console is implemented through H5 and provides external Restful API to facilitate expansion and access

Through various middleware, some necessary operations, such as alarms, monitoring, log collection and statistics, etc. are implemented, completely Guaranteeing the security and stability of the entire system

The dispatch center cluster stores the consistent HashCode of each Scheduler through Zookeeper to allocate the relationship between Jobs and Schedulers: the new Job will pass The consistency HashCode of each Scheduler obtains its corresponding Job number, calculates the weight of each Scheduler based on the number of Jobs and the relevant properties of the Scheduler, and determines which Scheduler the currently newly added Job should be assigned to based on the weight. .

When a new Scheduler is added, the Job of the new Scheduler is normally 0. Therefore, under normal conditions, the weight of the new Scheduler will be relatively large.

At the same time, the dispatch center implements active and backup switching of Schedular through Zookeeper to ensure system stability

System composition

Schedular

Based on Quartz Implements scheduling and provides an interface for executor operations, which is used to operate task scheduling configuration, scheduling triggering and other operations; it does not participate in the implementation of task logic and will not be limited by tasks

Executor

Responsible for receiving scheduling requests initiated by the dispatching center, implementing corresponding business logic, completing task execution, and processing task logic, recording corresponding logs and sending them to the dispatching center after the task is completed

Management Console

The management console is responsible for displaying task status, execution status, task execution log and other report data. At the same time, you can configure new tasks, operation task status, pause/resume, etc. through the management console; in addition, Provides access to external Restful API and H5

Dubbo Monitor

Real-time monitoring of dispatch center interface calls, statistics of dispatch frequency, success and failure, QPS, etc., which can be optimized through these report data Task scheduling, system optimization

ELK

Use ELK (ElasticSearch Logstash Kibana) to collect the execution logs of the scheduling center and each execution machine, and analyze the statistics to form reports, which can be conveniently provided Observation

Alarm

Alarm system, configure alarm rules through the Chronograf console, and send email and SMS alarms through Kapacitor as soon as a problem occurs, which can effectively improve the timeliness of error prompts. In addition, the time consumed from error occurrence to error resolution is reduced, and the losses caused by the production environment are reduced. Alarm data is obtained through the business dashboard.

For example: you can configure the current CPU usage of the online machine, set the threshold to 50, and the policy is to send an SMS alarm when the set threshold is exceeded. At this time, when the CPU of a certain online machine exceeds 50, an SMS alarm will be sent

Business Dashboard

Collect interface monitoring data in real time through management, transmit it to kafka through logstash, and then distribute it to jstorm through kafka for processing. After processing, it is stored in influxdb to form a business Dashboard, and finally generate monitoring reports through the Grafana console

Configuration Center

All services in the entire system manage the corresponding configurations through the configuration center to form a distributed configuration management mechanism to facilitate the system. Configuration consistency and accuracy of each service

After configuration, you can implement your own task logic

After access, you can view it online in real time through the log management console Task execution status

In addition, due to the alarm system, when the task execution is abnormal, alarm emails and text messages will be generated and sent in real time to inform the task access person and the corresponding R&D personnel

Configuration Center attribute configuration

Features

Support dynamic suspension/resumption of tasks

When the task status is stopped, the task will no longer be triggered. If the task is executed during execution, Pause, the executing task will not be blocked (due to the timeout failure status in the task execution result status, if the currently executing task is blocked when the pause button is clicked, the execution status of the current task will become timeout, which is inconsistent with The real meaning of the timeout status) will delay the stop, that is, wait until the current task is completed before actually stopping the task

The dispatch center is implemented based on Quartz, and realizes active and backup isolation through Zookeeper to ensure the HA of the dispatch center

When the active node goes down, the cold standby node will load all the tasks that are being scheduled in the active node and become the new active node to ensure the accurate execution of task scheduling

The execution machine supports cluster deployment. Distributed task execution, unified scheduling through the dispatch center

Executor load balancing, by default, the execution machine scheduling weight is calculated based on the number of executions of the task on a certain execution machine, and the task scheduling is selected based on the weight. Each executor can achieve load balancing and can manually change the executor selection strategy

Executor cluster mode,

failover, failfast, failsafe, failback, forking, the default is failover (failover) ), when the call fails, retry other servers; failfast (fast failure), only initiates a call and will not retry, and an error is reported immediately upon failure; failsafe (failure safety), when an exception occurs, it is ignored directly; failback (failure recovery) , when the call fails, it will be resent regularly until it succeeds, and the restart will be lost; forking, multiple execution machines are called in parallel, and as long as one succeeds, it will be returned

Shard tasks, support task sharding, and send parameters to Task execution machine, the execution machine can develop sharding jobs by judging parameters, and supports dynamic sharding to use sharding parameters and execution

The row machine performs latitude sharding and supports dynamic expansion of execution machines and sharding parameters

For example, an order table is sharded by latitude based on the order ID, and is sharded modulo 3. Then you can set three sharding parameters, (0, 1, 2). After getting one of the sharding parameters through the context, the execution machine can make a judgment on the sharding parameter to perform specific data operations, such as when taking When the obtained shard parameter is 0, perform data query operations on the shard corresponding to 0, and so on

For another example, an order table is sharded two-dimensionally based on the order ID and Status. ID Modulo 3 is used for sharding, and the status is divided into two states: success and failure. At this time, 6 sharding parameters can be set, ({'id': 0, 'status': 0}, {' id': 1, 'status': 0}, {'id': 2, 'status': 0}, {'id': 0, 'status': 1}, {'id': 1, 'status' :1}, {'id': 2, 'status': 1}), after the executor gets one of the sharding parameters through the context, it can judge its json parameter to implement the sharding operation

Task execution consistency, each task will only be executed by one executor; for sharded tasks, when the executor cluster is deployed, a task schedule will broadcast and trigger the execution of the corresponding number of executors in the corresponding cluster. Once the task is completed, the sharding parameters are passed at the same time, and the sharding tasks can be developed based on the sharding parameters.

When the sharding parameter is greater than the number of executors, one or more executors of the current sharding task will execute multiple sharding tasks according to the executor routing policy

For example: the current sharding task sharding parameters are (a, b, c), and there are 3 current task execution machines (A, B, C), the sharding parameters will be sent to a certain machine evenly and randomly. Execution machines, and the three execution machines only receive one shard parameter at a time and perform task processing once, that is (a-gt; A, b-gt; B, c-gt; C) | (a-gt; A, b -gt;C,c-gt;B)|(a-gt;B,b-gt;A,c-gt;C)|(a-gt;B,b-gt;C,c-gt;A )|(a-gt; C, b-gt; A, c-gt; B) | (a-gt; C, b-gt; B, c-gt; A);

When When there are only two task execution machines (A, B), each time the task is scheduled, a certain execution machine receives two fragmentation parameters and processes the two fragmentation parameters respectively, that is (a-gt; A, b -gt;B,c-gt;A)|(a-gt;A,b-gt;B,c-gt;B)|(a-gt;B,b-gt;A,c-gt;A )|(a-gt; B, b-gt; A, c-gt; B);

When the task execution machine is greater than the number of sharding parameters, it is 4 (A, B, C , D), (a-gt; A, b-gt; B, c-gt; C)|(a-gt; A, b-gt; C, c-gt; D)|(a-gt; A, b-gt; D, c-gt; B)| (a-gt; B, b-gt; C, c-gt; D)| (a-gt; B, b-gt; D, c- gt;A)|(a-gt;B,b-gt;A,c-gt;C)|(a-gt;C,b-gt;A,c-gt;B)|(a-gt; C, b-gt; B, c-gt; D)| (a-gt; C, b-gt; D, c-gt; A)| (a-gt; D, b-gt; A, c- gt;B)|(a-gt;D,b-gt;B,c-gt;C)|(a-gt;D,b-gt;C,c-gt;A), and statistically, no matter The number of executors is greater than, equal to, or less than the number of sharding parameters. The sharding parameters distributed to the executors are always consistent (the total sharding parameters received by each executor are uniform)

Alarm system, the system is connected to the internal alarm system, and supports email, SMS, DingTalk, phone and other alarms when a task fails

Flexible expansion and contraction, the dispatch center will detect the task execution machine in real time, so once If an execution machine goes online or offline, it will be detected. If it is not detected by the dispatch center, you can manually detect the execution machine. After the execution machine is detected, the next schedule will reassign tasks

Task dependency supports configuring task dependencies. When the parent task is completed, it will automatically trigger the execution of the subtask. For example: there are two tasks, A and B, and the execution condition of B is to confirm task A. After the execution, B can be executed. Otherwise, the B task will not be executed. The dependency relationship at this time is that the B task depends on the A task. At this time, after the A task is configured, set the B task as a dependent task, and the dependency relationship is A; and For example, there are 6 tasks, namely A1, A2, A3, A4, A5, and B. The execution of task B needs to ensure that all five tasks A1-5 are executed before task B is executed. At this time, the dependencies of B That is, task B depends on A1-5. At this time, after configuring the five tasks A1-5, set B as a dependent task, and the dependency relationship is A1-5

Supports viewing task execution status at runtime , the abnormal execution recovery mechanism for statistical information such as the number of tasks, the number of calls, and the number of executors, etc., sometimes encounters uncontrollable situations, that is, the execution results of the executor after execution are inconsistent due to uncontrollable factors such as network disconnection.

It can be sent to the dispatch center. At this time, it can be temporarily recorded through the abnormal execution recovery mechanism. The next time the execution machine starts normally, it will be retried and sent to the dispatch center for scheduling to trigger manual execution. Under special needs, the schedule may be required to be executed manually, such as scheduling After a task fails, you may need to perform a manual schedule to compensate

Parallel/serial strategy. When the timing time is much larger than the task execution time, you can use the parallel strategy, and the task is called and executed asynchronously to improve the accuracy of task scheduling; When the task execution time may be longer than the scheduled time, but the task needs to be scheduled according to a certain timing rule, you can use the serial strategy. The last trigger of the current task scheduled by the scheduling center. If it has not been executed, the next trigger of the current execution machine The scheduled time point will not be triggered when and only when the task execution ends to prevent the time uncertainty of some continuous scheduled tasks from causing data confusion when asynchronously triggered. For example: the scheduled time of a certain task is 10 seconds, but the task itself may be executed for more than 10 seconds. If it exceeds 10 seconds, if tasks at two points in time are executed in parallel, data confusion will occur. At this time, a serial strategy can be used to ensure that the current If a task on the execution machine is not completed, new execution will not be triggered

Supports scheduling interface data monitoring and generates monitoring reports for easy observation.

Summary

For Internet companies, time is money, and efficiency determines everything. During the nearly three months since it was connected to the beginning of August, this system performed exceptionally well and was scheduled approximately 1 million times, providing convenience for task scheduling for various services within the company.

Original text