//
// Copyright (c) 2014, Novant LLC
// Licensed under the MIT License
//
// History:
// 10 Sep 2014 Andy Frank Creation
//
using concurrent
**
** CronService
**
const class CronService : Service
{
//////////////////////////////////////////////////////////////////////////
// Constructor
//////////////////////////////////////////////////////////////////////////
** Constructor.
new make(|This|? f := null)
{
if (f != null) f(this)
}
** Directory for job config and logs.
const File dir := Env.cur.workDir + `cron/`
** Number of logs to keep per job.
const Int jobLogLimit := 30
** Start service.
override Void onStart() { actor.send(CronMsg("init")) }
** Stop service will block until all jobs are complete.
override Void onStop()
{
actor.pool.stop.join
jobPool.stop.join
}
//////////////////////////////////////////////////////////////////////////
// Jobs
//////////////////////////////////////////////////////////////////////////
** List current jobs.
[Str:Obj?][] jobs()
{
actor.send(CronMsg("list")).get(5sec)
}
** Add a CronJob to this service.
This addJob(Str name, Method method, Obj schedule)
{
// allow "unwrapped" schedule string
CronSchedule? s
try { s = schedule as CronSchedule ?: CronSchedule(schedule) }
catch (Err err) throw ArgErr("Invalid scheulde arg '$schedule'")
// queue add job message
actor.send(CronMsg("add", CronJob(name, method, s))).get(5sec)
return this
}
** Remove given job from service.
This removeJob(Str name)
{
actor.send(CronMsg("remove", name)).get(5sec)
return this
}
//////////////////////////////////////////////////////////////////////////
// Actor
//////////////////////////////////////////////////////////////////////////
private const Actor actor := Actor(ActorPool { name="CronService" })
|msg| { actorReceive(msg) }
private const ActorPool jobPool := ActorPool { name="CronService-Jobs" }
private Obj? actorReceive(CronMsg msg)
{
if (msg.op == "init") return onInit
cx := Actor.locals["cx"] as CronCx
if (cx == null) throw IOErr("CronService not started -- call start() first")
switch (msg.op)
{
case "list": return onList(cx)
case "add": return onAdd(cx, msg.a)
case "remove": return onRemove(cx, msg.a)
case "check": return onCheck(cx)
case "clean": return onClean(cx)
default: throw ArgErr("Unknown op: $msg.op")
}
}
** Init service.
private Obj? onInit()
{
Actor.locals["cx"] = CronCx()
actor.sendLater(checkFreq, checkMsg)
actor.sendLater(cleanFreq, cleanMsg)
log.info("CronService started")
return null
}
** List jobs and state.
private Obj? onList(CronCx cx)
{
list := Obj[,]
cx.jobs.each |job|
{
map := Str:Obj?[:] { ordered=true }
map["name"] = job.name
map["method"] = job.method
map["schedule"] = job.schedule
map["lastRun"] = cx.lastRun[job]
list.add(map.toImmutable)
}
return list.toImmutable
}
** Add a new job.
private Obj? onAdd(CronCx cx, CronJob job)
{
if (cx.jobs.contains(job)) throw Err("Job already exists: $job.name")
// add job
cx.jobs.add(job)
// look up job.props
props := dir + `$job.name/${job.name}.props`
if (props.exists)
{
map := props.readProps
ts := DateTime.fromStr(map["lastRun"] ?: "", false)
cx.lastRun[job] = ts
}
log.info("job added: $job")
return null
}
** Remove a job.
private Obj? onRemove(CronCx cx, Str name)
{
job := cx.jobs.find |j| { j.name == name }
if (job != null)
{
cx.jobs.remove(job)
log.info("job removed: $job")
}
return null
}
** Check if any jobs need to run.
private Obj? onCheck(CronCx cx)
{
try
{
now := DateTime.now
cx.jobs.each |job|
{
if (job.schedule.trigger(now, cx.lastRun[job]))
{
cx.lastRun[job] = now
CronJobActor(jobPool, this, job, now).send(null)
}
}
}
catch (Err err) { log.err("Check failed", err) }
finally { actor.sendLater(checkFreq, checkMsg) }
return null
}
private Obj? onClean(CronCx cx)
{
try
{
cx.jobs.each |job|
{
logs := (dir + `$job.name/`).listFiles.findAll |f| { f.ext == "log" }
if (logs.size > jobLogLimit)
{
logs.sort |a,b| { a.modified <=> b.modified }
logs.eachRange(0..<(logs.size-jobLogLimit)) |f| { f.delete }
}
}
}
catch (Err err) { log.err("Clean failed", err) }
finally { actor.sendLater(cleanFreq, cleanMsg) }
return null
}
private const Log log := Log.get("cron")
private const Duration checkFreq := 1sec
private const Duration cleanFreq := 1hr
private const CronMsg checkMsg := CronMsg("check")
private const CronMsg cleanMsg := CronMsg("clean")
}
**************************************************************************
** CronCx
**************************************************************************
** CronCx manages the runtime state of cron jobs inside CronService.
internal class CronCx
{
** Job list.
CronJob[] jobs := [,]
** Last run map.
CronJob:DateTime lastRun := [:]
}
**************************************************************************
** CronMsg
**************************************************************************
internal const class CronMsg
{
new make(Str op, Obj? a := null, Obj? b := null)
{
this.op = op
this.a = a
this.b = b
}
const Str op
const Obj? a
const Obj? b
}