Innovation Series, Tech/Engineering

How Druva Leverages Stateless Schedulers with Throttling Support

Introduction

Schedulers help us to run jobs and tasks at specified intervals. At Druva, we use schedulers for automated backups, to trigger user-defined backup schedules, and to track several background activities. Schedulers enable us to separate our business logic from time tracking. 

However, our existing implementation of schedulers was causing CPU spikes and affecting the performance. We wanted something that could trigger schedules in a staggered manner. This blog explains in detail the problem we had, our scheduler requirements, how we designed it, and finally its performance. 

Challenges with existing schedulers

In the initial phases of the product, different teams used different scheduler implementations to meet their specific needs. But with Druva’s fast-growing customer base, existing schedulers started hitting their limits such as inability to handle spikes, lag in triggers from the desired time, high cpu/mem during load/reload activity, and so on. Thus, the product team decided to adopt a common solution that can be used across all Druva products. 

During requirement gathering, we observed that in addition to simple cron schedules (schedules that are triggered at a particular point in time) there are many scenarios where we need scheduled batch processing. Scheduled batch processing means that we have a batch to process within a defined time window. Although we don’t need to start processing every item at the start of the window, we should complete processing the whole batch within that window. 

In our existing implementation, we created schedules for every item in a batch with the window start time as the trigger time for all those schedules. This resulted in a CPU spike on the consumer at the start of the window. This was impacting the overall performance of the consumer service. It would have been great if the scheduler could send triggers in a staggered manner. 

Such schedulers could be used for cron schedules as well. Cron schedules would be a special case where staggering (throttling policy) will not be applied. 

Managing the load on the consumer due to triggered schedules is never the responsibility of the scheduler. We could not find any off-the-shelf scheduler that can cater to this specific requirement of scheduled batch processing. Thus we decided to build such a scheduler in-house.

Our scheduler requirements

  • The scheduler must be stateless, scalable, and a consumer-independent service.
  • The scheduler should support schedule triggers via SQS events or HTTP callback.
  • The consumer must be able to create a cron-like-expression based schedule that can be associated with a particular timezone.
  • The consumer must receive triggers as per the timings identified in its cron expression.
  • The consumer should be able to define an event (schedule trigger) count n for a schedule. The scheduler should trigger n unique events on the consumer whenever the schedule is triggered. For cron schedules, it will be 1, but for schedules created for batch processing, the number can be set to the batch size.
  • Finally, the consumer should be able to define the throttling policy. The policy will define the maximum number of events that can be triggered on the consumer at any given point in time. This limit will be applicable to events across all schedules where this policy is applied.

Components of the scheduler

Schedule Master

This component will manage the consumer service defined entities. The scheduler master exposes the Rest API to manipulate these entities. The primary entity is ScheduleJob as described below:

ScheduleJob

Represents schedules based on cron like expressions. For example:

{

    “Month”: “*”

    “DayOfMonth”: “*”

    “DayOfWeek”: “*”

    “Hour”: “3,6,9”

    “Minute”: “*”

}

This expression represents 3 triggers every day at 03:00, 06:00, and 09:00.

Whenever a consumer defines any ScheduleJob, along with that scheduler, it creates another type of backend entity instance called ScheduleChild. Every child entity has 2 attributes viz. LaunchExpression and Minute. These 2 attributes combined create one trigger that is represented by the cron expression. Thus for the above schedule, 3 children will be created for everyday triggers at 03:00, 06:00, and 09:00.

Schedule Event Emitter

This is the backend component that acts as a schedule scanner. It runs periodically and scans upcoming schedules. For all such schedules, it then creates Schedule-Events. Every event will have LaunchTS associated with it and will contain all the info required to trigger the schedule. These events are added to the event queue which is then processed by the next component.

EventHandler

This is also a backend component. It scans Schedule-Events emitted by the emitter. It also runs periodically to pick events due for trigger in the next few minutes.

Design of the scheduler

At its core, the scheduler can be thought of as a collaboration of 2 scanners running periodically viz. schedule scanner (emitter component) and event scanner (event handler component). 

The schedule scanner runs periodically for a particular scan window. The scan window is the time interval between 2 consecutive runs of the scanner. It picks scheduled children having their trigger time in its scan window. For every such schedulechild, n events with unique IDs are created and pushed to the event queue (a priority queue). 

The event scanner runs every m minutes (our duration was 5 minutes). It picks events having LaunchTS in the next m minutes. Every event that is picked is handed over to some worker. Events will have all the details required for processing including schedule ID and SQS details provided at the time of schedule creation. In other words, event processing is independent of the original schedule for which the event is created. On reception of the event, the worker goes to sleep till the exact trigger time of the event occurs. At the trigger time, the worker wakes up and sends the event to the appropriate SQS or HTTP endpoint. Once the event is triggered successfully, it will be moved to a special event queue named D (Done queue).

Efficient and sharded schedule scanning

As mentioned above, the scheduler service runs a scanner periodically to create schedule events in the scan window. It’s not necessary that all the schedule children will have their trigger time in each scan window. Thus we need not read all the schedules in each scan. To reduce data to be read in each scan, schedule-children are organized in a way that the scanner would be able to deduce the children having trigger time within its scan window.

As mentioned above, schedule child represents one of the distinct trigger times of the schedule. LaunchExpression and Minute, these 2 attributes of the child store the trigger time information. LaunchExpression is derived from the most significant non-* field of a cron expression. That way the schedule-children are classified into the following classes.

  • classMonthDOM(“MM”)
    • This represents a class of trigger that occur at a specific time (example “3”, “18”) or every hour (“*”) of a specific day of a specific month (example Every 3rd August)
  • classMonthDOW(“MW”)
    • This represents the class of triggers that occur at a specific time (example “3”, “18”) or every hour (“*”) of a specific day or every week-day of a specific month (example Every Monday of August or 3rd Sunday of Jan)
  • classDOM(“M”)
    • This represents the class of triggers that occur at a specific time (example “3”, “18”) or every hour (“*”) of a specific date of the month (example 5th of Every month).
  • classDOW(“W”)
    • This represents the class of triggers that occur at a specific time (example “3”, “18”) or every hour (“*”) of a specific day of the week (example Every Thursday)

  • classHour(“H”)
    • This represents the class of triggers that occur at a specific time (example “3”, “18”) or every hour (“*”)

With such LaunchExpression, we can deduce if any particular child will be triggered within a given scan window. Also, we can deduce LaunchExpressions which should be scanned for the given scan window. 

To support schedules in different time zones, the scheduler stores a list of all the time zones in which consumers have created schedules. During the scan, the scheduler translates the scan window in all those timezone and only eligible LaunchExpressions are scanned. This way the scanner doesn’t have to scan all the schedules in every run. Scanning only the eligible LaunchExpressions would be enough. Since this arrangement (index) is based on static information of the schedule, it does not need any rearrangement (hashing) after every scan.

The scanning process of each LaunchExpression is independent of others. Thus, each LaunchExpression can be thought of as a shard. Shard processing will be assumed as complete when we have created events for all trigger times for all the schedule-children from that shard. With such sharding and shard locking mechanism (to avoid race in processing of the same shard) we are able to scale schedule scanning horizontally.

Event handling can also be sharded by creating multiple event queues. The scanner will create events and enqueue them in some random queue. Whenever the scanner is triggered, multiple queues will be processed in parallel.

Schedule event throttling

Now we have a scalable scheduler that can cater to all of our requirements and is able to make efficient cron scheduling. For throttling support, we used the token buckets approach.

As mentioned above, the throttling policy defines the limit of the maximum number of scheduled events that can be triggered at any particular point in time.

We create a token bucket for every policy. At the start, every bucket will have tokens equal to the limit (say m) defined by the policy. Any schedule where this policy is applied will have to acquire a token from that bucket in order to trigger a schedule on the consumer. If there is no token available in the bucket, it means the maximum number of allowed jobs has already been triggered and are running on the consumer. This guarantees that at any point in time the consumer will not be loaded with more than m schedule for processing.

On successful acquisition of a token, the event will be handed over to the worker for the actual trigger. The consumer on reception will execute the job associated with the schedule. Whenever this job’s execution is complete, the consumer will have to call the scheduler to release the token acquired for this particular scheduled event. After this, the events waiting for a token from the same bucket can now be triggered. In this fashion, all pending events that have a particular policy applied will be triggered gradually.

In the release workflow, instead of actually releasing the token, the released events are put in the same queue with the highest priority. Then the token queue scanner is notified. The scanner first releases tokens acquired by these processed events and then waiting events are processed. This approach not only serializes operations on the token queue, but also avoids any wastage of the batch processing window.

The technology stack that we used

Conclusion

We have this scheduler implementation running on production. It serves end-user defined backup schedules (cron schedules) as well as scheduled batch processing with optimum load on the consumer services. Now there are no CPU spikes and things are much more stable. 

This scheduler handles approximately 5 million schedules in a day with trigger time adherence SLA of 1 minute. Out of these, more than 80% schedules will have throttling applied. Now we do not observe issues like schedule misses or performance degradation of batch processing consumers.