commit 73ff8d79f70b36483d1d33587cdc9549c8e472bd
parent a82c04910f259fd0296085cc9aa9280df5881a87
Author: Jeremy Lin <jeremy.lin@gmail.com>
Date: Fri, 2 Apr 2021 20:16:49 -0700
Add a generic job scheduler
Also rewrite deletion of old sends using the job scheduler.
Diffstat:
9 files changed, 126 insertions(+), 31 deletions(-)
diff --git a/.env.template b/.env.template
@@ -56,6 +56,19 @@
# WEBSOCKET_ADDRESS=0.0.0.0
# WEBSOCKET_PORT=3012
+## Job scheduler settings
+##
+## Job schedules use a cron-like syntax (as parsed by https://crates.io/crates/cron),
+## and are always in terms of UTC time (regardless of your local time zone settings).
+##
+## How often (in ms) the job scheduler thread checks for jobs that need running.
+## Set to 0 to globally disable scheduled jobs.
+# JOB_POLL_INTERVAL_MS=30000
+##
+## Cron schedule of the job that checks for Sends past their deletion date.
+## Defaults to hourly. Set blank to disable this job.
+# SEND_PURGE_SCHEDULE="0 0 * * * *"
+
## Enable extended logging, which shows timestamps and targets in the logs
# EXTENDED_LOGGING=true
diff --git a/Cargo.lock b/Cargo.lock
@@ -161,6 +161,7 @@ dependencies = [
"handlebars",
"html5ever",
"idna 0.2.2",
+ "job_scheduler",
"jsonwebtoken",
"lettre",
"libsqlite3-sys",
@@ -402,6 +403,17 @@ dependencies = [
]
[[package]]
+name = "cron"
+version = "0.9.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e009ed0b762cf7a967a34dfdc67d5967d3f828f12901d37081432c3dd1668f8f"
+dependencies = [
+ "chrono",
+ "nom 4.1.1",
+ "once_cell",
+]
+
+[[package]]
name = "crypto-mac"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1098,6 +1110,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
[[package]]
+name = "job_scheduler"
+version = "1.2.1"
+source = "git+https://github.com/jjlin/job_scheduler?rev=ee023418dbba2bfe1e30a5fd7d937f9e33739806#ee023418dbba2bfe1e30a5fd7d937f9e33739806"
+dependencies = [
+ "chrono",
+ "cron",
+ "uuid",
+]
+
+[[package]]
name = "js-sys"
version = "0.3.49"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1160,7 +1182,7 @@ dependencies = [
"idna 0.2.2",
"mime 0.3.16",
"native-tls",
- "nom",
+ "nom 6.1.2",
"once_cell",
"quoted_printable",
"rand 0.8.3",
@@ -1477,6 +1499,15 @@ checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
[[package]]
name = "nom"
+version = "4.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c349f68f25f596b9f44cf0e7c69752a5c633b0550c3ff849518bfba0233774a"
+dependencies = [
+ "memchr",
+]
+
+[[package]]
+name = "nom"
version = "6.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7413f999671bd4745a7b624bd370a569fb6bc574b23c83a3c5ed2e453f3d5e2"
diff --git a/Cargo.toml b/Cargo.toml
@@ -73,6 +73,9 @@ chrono = { version = "0.4.19", features = ["serde"] }
chrono-tz = "0.5.3"
time = "0.2.26"
+# Job scheduler
+job_scheduler = "1.2.1"
+
# TOTP library
oath = "0.10.2"
@@ -136,3 +139,10 @@ rocket_contrib = { git = 'https://github.com/SergioBenitez/Rocket', rev = '263e3
# For favicon extraction from main website
data-url = { git = 'https://github.com/servo/rust-url', package="data-url", rev = '540ede02d0771824c0c80ff9f57fe8eff38b1291' }
+
+# The maintainer of the `job_scheduler` crate doesn't seem to have responded
+# to any issues or PRs for almost a year (as of April 2021). This hopefully
+# temporary fork updates Cargo.toml to use more up-to-date dependencies.
+# In particular, `cron` has since implemented parsing of some common syntax
+# that wasn't previously supported (https://github.com/zslayton/cron/pull/64).
+job_scheduler = { git = 'https://github.com/jjlin/job_scheduler', rev = 'ee023418dbba2bfe1e30a5fd7d937f9e33739806' }
diff --git a/src/api/core/mod.rs b/src/api/core/mod.rs
@@ -5,7 +5,7 @@ mod organizations;
pub mod two_factor;
mod sends;
-pub use sends::start_send_deletion_scheduler;
+pub use sends::purge_sends;
pub fn routes() -> Vec<Route> {
let mut mod_routes = routes![
diff --git a/src/api/core/sends.rs b/src/api/core/sends.rs
@@ -9,7 +9,7 @@ use serde_json::Value;
use crate::{
api::{ApiResult, EmptyResult, JsonResult, JsonUpcase, Notify, UpdateType},
auth::{Headers, Host},
- db::{models::*, DbConn},
+ db::{models::*, DbConn, DbPool},
CONFIG,
};
@@ -27,21 +27,13 @@ pub fn routes() -> Vec<rocket::Route> {
]
}
-pub fn start_send_deletion_scheduler(pool: crate::db::DbPool) {
- std::thread::spawn(move || {
- loop {
- if let Ok(conn) = pool.get() {
- info!("Initiating send deletion");
- for send in Send::find_all(&conn) {
- if chrono::Utc::now().naive_utc() >= send.deletion_date {
- send.delete(&conn).ok();
- }
- }
- }
-
- std::thread::sleep(std::time::Duration::from_secs(3600));
- }
- });
+pub fn purge_sends(pool: DbPool) {
+ debug!("Purging sends");
+ if let Ok(conn) = pool.get() {
+ Send::purge(&conn);
+ } else {
+ error!("Failed to get DB connection while purging sends")
+ }
}
#[derive(Deserialize)]
diff --git a/src/api/mod.rs b/src/api/mod.rs
@@ -10,8 +10,8 @@ use serde_json::Value;
pub use crate::api::{
admin::routes as admin_routes,
+ core::purge_sends,
core::routes as core_routes,
- core::start_send_deletion_scheduler,
icons::routes as icons_routes,
identity::routes as identity_routes,
notifications::routes as notifications_routes,
diff --git a/src/config.rs b/src/config.rs
@@ -316,6 +316,14 @@ make_config! {
/// Websocket port
websocket_port: u16, false, def, 3012;
},
+ jobs {
+ /// Job scheduler poll interval |> How often the job scheduler thread checks for jobs to run.
+ /// Set to 0 to globally disable scheduled jobs.
+ job_poll_interval_ms: u64, false, def, 30_000;
+ /// Send purge schedule |> Cron schedule of the job that checks for Sends past their deletion date.
+ /// Defaults to hourly. Set blank to disable this job.
+ send_purge_schedule: String, false, def, "0 0 * * * *".to_string();
+ },
/// General settings
settings {
diff --git a/src/db/models/send.rs b/src/db/models/send.rs
@@ -205,6 +205,13 @@ impl Send {
}}
}
+ /// Purge all sends that are past their deletion date.
+ pub fn purge(conn: &DbConn) {
+ for send in Self::find_by_past_deletion_date(&conn) {
+ send.delete(&conn).ok();
+ }
+ }
+
pub fn update_users_revision(&self, conn: &DbConn) {
match &self.user_uuid {
Some(user_uuid) => {
@@ -223,12 +230,6 @@ impl Send {
Ok(())
}
- pub fn find_all(conn: &DbConn) -> Vec<Self> {
- db_run! {conn: {
- sends::table.load::<SendDb>(conn).expect("Error loading sends").from_db()
- }}
- }
-
pub fn find_by_access_id(access_id: &str, conn: &DbConn) -> Option<Self> {
use data_encoding::BASE64URL_NOPAD;
use uuid::Uuid;
@@ -271,4 +272,13 @@ impl Send {
.load::<SendDb>(conn).expect("Error loading sends").from_db()
}}
}
+
+ pub fn find_by_past_deletion_date(conn: &DbConn) -> Vec<Self> {
+ let now = Utc::now().naive_utc();
+ db_run! {conn: {
+ sends::table
+ .filter(sends::deletion_date.lt(now))
+ .load::<SendDb>(conn).expect("Error loading sends").from_db()
+ }}
+ }
}
diff --git a/src/main.rs b/src/main.rs
@@ -16,6 +16,7 @@ extern crate diesel;
#[macro_use]
extern crate diesel_migrations;
+use job_scheduler::{JobScheduler, Job};
use std::{
fs::create_dir_all,
panic,
@@ -23,6 +24,7 @@ use std::{
process::{exit, Command},
str::FromStr,
thread,
+ time::Duration,
};
#[macro_use]
@@ -56,7 +58,9 @@ fn main() {
create_icon_cache_folder();
- launch_rocket(extra_debug);
+ let pool = create_db_pool();
+ schedule_jobs(pool.clone());
+ launch_rocket(pool, extra_debug); // Blocks until program termination.
}
const HELP: &str = "\
@@ -301,17 +305,17 @@ fn check_web_vault() {
}
}
-fn launch_rocket(extra_debug: bool) {
- let pool = match util::retry_db(db::DbPool::from_config, CONFIG.db_connection_retries()) {
+fn create_db_pool() -> db::DbPool {
+ match util::retry_db(db::DbPool::from_config, CONFIG.db_connection_retries()) {
Ok(p) => p,
Err(e) => {
error!("Error creating database pool: {:?}", e);
exit(1);
}
- };
-
- api::start_send_deletion_scheduler(pool.clone());
+ }
+}
+fn launch_rocket(pool: db::DbPool, extra_debug: bool) {
let basepath = &CONFIG.domain_path();
// If adding more paths here, consider also adding them to
@@ -334,3 +338,30 @@ fn launch_rocket(extra_debug: bool) {
// The launch will restore the original logging level
error!("Launch error {:#?}", result);
}
+
+fn schedule_jobs(pool: db::DbPool) {
+ if CONFIG.job_poll_interval_ms() == 0 {
+ info!("Job scheduler disabled.");
+ return;
+ }
+ thread::Builder::new().name("job-scheduler".to_string()).spawn(move || {
+ let mut sched = JobScheduler::new();
+
+ // Purge sends that are past their deletion date.
+ if !CONFIG.send_purge_schedule().is_empty() {
+ sched.add(Job::new(CONFIG.send_purge_schedule().parse().unwrap(), || {
+ api::purge_sends(pool.clone());
+ }));
+ }
+
+ // Periodically check for jobs to run. We probably won't need any
+ // jobs that run more often than once a minute, so a default poll
+ // interval of 30 seconds should be sufficient. Users who want to
+ // schedule jobs to run more frequently for some reason can reduce
+ // the poll interval accordingly.
+ loop {
+ sched.tick();
+ thread::sleep(Duration::from_millis(CONFIG.job_poll_interval_ms()));
+ }
+ }).expect("Error spawning job scheduler thread");
+}