Scheduler Plus is a cloud-oriented and fault-tolerant workflow engine layer

Motivation

The system had a number of upstream datasources which could often be unavailable and regularly slow to respond. These systems returned relatively static responses which had a maximum lifetime supplied in a s-maxage HTTP header. So we knew we could keep this data for that amount of time. We could have used our current cache pattern which was to insert and provide an expiry then re-request the data when the maximum age had expired, but this way we were dropping data out of our cache and were often unable to re-request it.

So code was implemented to wrap a request to the upstream source with a time to execute this request. We could then persist the response from the upstream source in our cache forever and refresh it when it expires. So we would always be able to return something but have fresh data if at all possible. This approach allowed us to achieve solid sub 200ms responses from an upstream system that regularly was unavailable or had response times in the multiple seconds.

Description

Scheduler Plus is a workflow engine layer using Akka Scheduler and a Redis database instance that provides a cloud-oriented and fault-tolerant mechanism for scheduling work that must be repeated at intervals, such as obtaining an object from an upstream datasource and adding it to a cache. You can then return the lifetime of this object (for example the time from a max-age HTTP response header for a service you have called) and the system will re-execute the work again when that duration has elapsed, i.e. when the item should no longer be cached. However, in practice, this could be any time and is just when your job should be next scheduled for.

Using Scheduler Plus

To use Scheduler Plus you need to create your callback for the type of job you are interested in. A job is how Scheduler Plus tracks work and is a response to a job request, which is covered later.

Firstly, to create your callback (the work you want to do) you’ll need to extend the Callbacks trait, like:

import bbc.schedulerplus.client.Callbacks

object TheCallbacks extends Callbacks {
  
  def callbackFor(job: Job):() => CallbackResponse = {

    job.`type` match {
      case "the_job_type" => () => {
        
        // do your work here
        
        val someTimeInMillis = 10000
        
        CallbackResponse(lifetimeInMillis = someTimeInMillis)
      }
    }
  }

  def keys: scala.List[String] = List("the_job_type")
}

You’ll notice there is a job.`type` match… which returns an anonymous function, like:

() => {
    // your stuff
    val someTimeInMillis = 10000
    
    CallbackResponse(lifetimeInMillis = someTimeInMillis)
}

…which forms an enclosure around the work you want to perform and returns a CallbackResponse which contains a lifetimeInMillis which is when you’d like this job to run again. In practice the job runs near to the time you ask for, with some entropy to spread system load and avoid ‘lumping’ of requests.

Strictly-speaking you don’t need the match statement and you could return the anonymous function regardless of the job.`type` but it’s good practice to check the job type first before you return a callback for it (note that it has backticks around the type keyword). You also must add the job type to the keys function list too so the system knows you will deal with that particular job type, otherwise Scheduler Plus will never deliver jobs to you, regardless of what callbacks you have in the callbackFor function. The anonymous function returned by callbackFor will be executed when Scheduler Plus finds a matching job request.

Then to start up the system you can use an ask:

val scheduler = system.actorOf(Props[SchedulerPlusActor], "scheduler-actor")
val response = scheduler ? TheCallbacks

…or, if you don’t want to know the response, using a tell:

system.actorOf(Props[SchedulerPlusActor], "scheduler-actor") ! TheCallbacks

The system will then start up and monitor the cache for any job requests. At this stage the system is not doing any work, other than polling the cache for job requests.

Creating Job Requests

To actually run your task, a job request must be created in the cache that Scheduler Plus is listening to. This can be done, in redis-cli for example, by using:

SET bbc.schedulerplus.request:the_job_type_123456 "id=123456|type=the_job_type|status=live"

…or your code could add this message to the cache to trigger a job.

Soon, Scheduler Plus will find this request and will schedule it. It will create a job in the cache with a similar key, which is used to ensure the job isn’t lost if the system crashes and also that only one Scheduler Plus instance is running this particular job.

When scheduling the job request, Scheduler Plus will look for a callback (in your callbacks) for the the_job_type type and store that in the job to be executed when the job runs.

Job Request Statuses

Requests can have the following statuses: live, paused, and cancelled. Live requests execute the callback then are re-scheduled again. Paused requests’ callbacks do not execute but the requests are re-scheduled again. Cancelled jobs are ignored. Cancelled jobs are equivalent to not having jobs at all, but are explicit and will remove any matching job in the cache. In practice you’d probably just remove the job request from the cache but the system will attempt to delete any jobs relating to cancelled job requests which can help keep the cache tidy.

Job requests can be set to any status at any time, by overwriting the job request in the cache, say with:

SET bbc.schedulerplus.request:the_job_type_123456 "id=123456|type=the_job_type|status=paused"

A request which has been scheduled but hasn’t yet been executed, could have the status of live. However if the status was changed from live to paused before the job callback actually executes then it will act like a paused job and not actually execute. This is so that you can pause requests, including ones that are scheduled and currently in memory.

Configuration

To adjust the initial delay before Scheduler Plus starts running or the interval that Scheduler Plus will check for new jobs, create an config file containing values like:

 schedulerplus {
   monitor {
     initial_delay_seconds = 15
     interval_seconds = 30
   }
   
   cache {
     host = localhost
     port = 6379
   }
 }

Running

Scheduler Plus is published (currently locally) with:

sbt publish

…which will build a jar to include in your project dependencies at:

{project_directory}/bbc/scheduler-plus_2.11-0.1-SNAPSHOT.jar

Tests are run with:

sbt test

Scalastyle with:

sbt scalastyle

Scoverage test coverage with:

sbt clean coverage test

Generate Scoverage test report with:

sbt coverageReport

About Scheduler Plus

Scheduler Plus came from a subsystem in the Radio and Music Services ‘Blur - Business Layer for Radio’ service called ‘Scheduler’. Originally designed to asynchronously request data and build items from upstream services, it can actually be used to execute any code.

Caveat

Scheduler Plus is a brand new component so under fairly heavy development. Stay tuned!

'Man Plus' Lister from Red Dwarf as ‘Man Plus’