diff --git a/src/director.rs b/src/director.rs index 590441c3023de1c6f838aaec72015a3bbdca724c..04378b05569fc7d0388c9bd7ac37009d37550b82 100644 --- a/src/director.rs +++ b/src/director.rs @@ -30,13 +30,14 @@ use std::path::{Path, PathBuf}; /// Dispatch jobs to registered handlers. /// -/// Jobs are sorted into `accept` and `reject` directories based on whether they were accepted by -/// th relevant handler. Once handled, a `.stamp` file containing the timestamp of when the job was -/// completed is created beside the final location. In addition, rejected jobs have a `.reason` -/// file describing why it was rejected. +/// Jobs are sorted into `accept`, `reject`, and `fail` directories based on whether they were +/// accepted by the relevant handler. Once handled, a `.stamp` file containing the timestamp of +/// when the job was completed is created beside the final location. In addition, rejected and +/// failed jobs have a `.reason` file describing what happened. pub struct Director<'a> { accept: PathBuf, reject: PathBuf, + fail: PathBuf, handlers: HashMap<String, &'a Handler>, } @@ -44,9 +45,10 @@ pub struct Director<'a> { impl<'a> Debug for Director<'a> { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, - "Director {{ accept: {:?}, reject: {:?}, handlers: {:?} }}", + "Director {{ accept: {:?}, reject: {:?}, fail: {:?}, handlers: {:?} }}", self.accept, self.reject, + self.fail, self.handlers.keys().collect::<Vec<_>>()) } } @@ -93,12 +95,15 @@ impl<'a> Director<'a> { let accept_dir = root.join("accept"); let reject_dir = root.join("reject"); + let fail_dir = root.join("fail"); try!(fs::create_dir_all(&accept_dir)); try!(fs::create_dir_all(&reject_dir)); + try!(fs::create_dir_all(&fail_dir)); Ok(Director { accept: accept_dir, reject: reject_dir, + fail: fail_dir, handlers: HashMap::new(), }) @@ -120,19 +125,14 @@ impl<'a> Director<'a> { } // Move the job file into the appropriate directory. - fn tag(&self, accept: bool, file: &Path) -> Result<PathBuf, Error> { - let mut target_path = if accept { - self.accept.clone() - } else { - self.reject.clone() - }; + fn tag(&self, target_dir: &Path, file: &Path) -> Result<PathBuf, Error> { + let mut target_path = target_dir.to_path_buf(); target_path.push(file.file_name().unwrap()); // Write the stamp file. let mut stamp_file = try!(File::create(target_path.with_extension("stamp"))); let time = UTC::now(); - try!(stamp_file.write_all(time.to_string().as_bytes())); - try!(stamp_file.write_all("\n".as_bytes())); + try!(write!(stamp_file, "{}\n", time.to_string())); // Rename the file into the target path. try!(fs::rename(file, &target_path)); @@ -239,7 +239,7 @@ impl<'a> Director<'a> { let ret = match try!(self.handle(&payload)) { HandlerResult::Accept => { debug!("accepted '{:?}'", file); - try!(self.tag(true, file)); + try!(self.tag(&self.accept, file)); RunResult::Continue }, HandlerResult::Defer(ref reason) => { @@ -257,30 +257,35 @@ impl<'a> Director<'a> { .map_err(|err| { io::Error::new(io::ErrorKind::Other, err) })); - let target_file = try!(self.tag(false, file)); + let target_file = try!(self.tag(&self.reject, file)); let mut reason_file = try!(File::create(target_file.with_extension("reason"))); - try!(reason_file.write_all(reason.as_bytes())); - try!(reason_file.write_all("\n".as_bytes())); + try!(write!(reason_file, "{}\n", reason)); RunResult::Continue }, HandlerResult::Reject(ref reason) => { debug!("rejecting '{:?}': {}", file, reason); - let target_file = try!(self.tag(false, file)); + let target_file = try!(self.tag(&self.reject, file)); + let mut reason_file = try!(File::create(target_file.with_extension("reason"))); + try!(write!(reason_file, "{}\n", reason)); + RunResult::Continue + }, + HandlerResult::Fail(ref reason) => { + debug!("failed '{:?}': {:?}", file, reason); + let target_file = try!(self.tag(&self.fail, file)); let mut reason_file = try!(File::create(target_file.with_extension("reason"))); - try!(reason_file.write_all(reason.as_bytes())); - try!(reason_file.write_all("\n".as_bytes())); + try!(write!(reason_file, "{:?}\n", reason)); RunResult::Continue }, HandlerResult::Restart => { info!(target: "director", "restarting via '{:?}'", file); - try!(self.tag(true, file)); + try!(self.tag(&self.accept, file)); RunResult::Restart }, HandlerResult::Done => { info!(target: "director", "completed via '{:?}'", file); - try!(self.tag(true, file)); + try!(self.tag(&self.accept, file)); RunResult::Done }, }; diff --git a/src/handler.rs b/src/handler.rs index 50cd214ac1f4d87e31d33380ac1e84d9f5a0f3f5..a58e15edbab0022fde500c9c6ac47a746a17032d 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -12,6 +12,8 @@ use self::serde_json::Value; use super::director::Director; use super::error::Error; +use std::error; + #[derive(Debug)] /// Results from an event. pub enum HandlerResult { @@ -21,6 +23,8 @@ pub enum HandlerResult { Defer(String), /// The event was rejected for the given reason. Reject(String), + /// The event failed with the given error. + Fail(Box<error::Error>), /// The director should be restarted. Restart, /// The event was the last one which should be processed. @@ -46,6 +50,9 @@ impl HandlerResult { }, (defer @ HandlerResult::Defer(_), _) | (_, defer @ HandlerResult::Defer(_)) => defer, + // Failures are handled next. + (fail @ HandlerResult::Fail(_), _) | + (_, fail @ HandlerResult::Fail(_)) => fail, // All we have left are rejections; combine their messages. (HandlerResult::Reject(left), HandlerResult::Reject(right)) => { HandlerResult::Reject(format!("{}\n{}", left, right)) diff --git a/src/test.rs b/src/test.rs index 48240013da8203cffd092ee374af0fa96949bd23..5ac19188a5bc1e678619129ffbb5c1fe9ede43e6 100644 --- a/src/test.rs +++ b/src/test.rs @@ -78,13 +78,15 @@ fn drop_job(path: &Path, kind: &str) { } fn check_queues(path: &Path, accept: &[&str], reject: &[&str], reject_reasons: &[&str], - ignored: &[&str]) { + fail: &[&str], fail_reasons: &[&str], ignored: &[&str]) { let accept_files = files_in_path(&path.join("accept")); let reject_files = files_in_path(&path.join("reject")); + let fail_files = files_in_path(&path.join("fail")); let ignored_files = files_in_path(path); assert_eq!(accept_files, accept); assert_eq!(reject_files, reject); + assert_eq!(fail_files, fail); assert_eq!(ignored_files, ignored); let reasons = reject_files.into_iter() @@ -98,6 +100,18 @@ fn check_queues(path: &Path, accept: &[&str], reject: &[&str], reject_reasons: & }) .collect::<Vec<_>>(); assert_eq!(reasons, reject_reasons); + + let reasons = fail_files.into_iter() + .filter(|fail| fail.ends_with(".reason")) + .map(|file_name| { + let reason_fname = path.join("fail").join(file_name); + let mut reason_file = File::open(&reason_fname).unwrap(); + let mut reason = String::new(); + reason_file.read_to_string(&mut reason).unwrap(); + reason + }) + .collect::<Vec<_>>(); + assert_eq!(reasons, fail_reasons); } #[derive(Default)] @@ -116,6 +130,7 @@ impl Handler for TestHandler { try!(director.add_handler("accept", self)); try!(director.add_handler("defer", self)); try!(director.add_handler("reject", self)); + try!(director.add_handler("fail", self)); try!(director.add_handler("restart", self)); try!(director.add_handler("done", self)); try!(director.add_handler("error", self)); @@ -131,6 +146,10 @@ impl Handler for TestHandler { "defer" => Ok(HandlerResult::Defer("deferring".to_string())), "reject" => Ok(HandlerResult::Reject("rejecting".to_string())), "restart" => Ok(HandlerResult::Restart), + "fail" => { + let err = io::Error::new(io::ErrorKind::Other, "fail"); + Ok(HandlerResult::Fail(Box::new(err))) + }, "done" => Ok(HandlerResult::Done), "error" => Err(Error::Io(io::Error::new(io::ErrorKind::Other, "error"))), _ => unreachable!(), @@ -226,6 +245,8 @@ fn test_ignore_directories() { &["0-done.json", "0-done.stamp"], &[], &[], + &[], + &[], &[]); } @@ -250,6 +271,8 @@ fn test_ignore_wrong_extension() { &["1-done.json", "1-done.stamp"], &[], &[], + &[], + &[], &["0-ignored.txt"]); } @@ -277,6 +300,8 @@ fn test_ignore_invalid_json() { &["1-done.json", "1-done.stamp"], &[], &[], + &[], + &[], &["0-invalid.json"]); } @@ -312,6 +337,8 @@ fn test_ignore_wrong_json_type() { &["5-done.json", "5-done.stamp"], &[], &[], + &[], + &[], &["0-array.json", "1-bool.json", "2-null.json", "3-number.json", "4-string.json"]); } @@ -339,6 +366,8 @@ fn test_reject_missing_kind() { &["1-done.json", "1-done.stamp"], &["0-missing-kind.json", "0-missing-kind.reason", "0-missing-kind.stamp"], &["no 'kind'\n"], + &[], + &[], &[]); } @@ -394,6 +423,8 @@ fn test_reject_invalid_kind() { "'kind' is not a string\n", "'kind' is not a string\n", "'kind' is not a string\n"], + &[], + &[], &[]); } @@ -421,6 +452,8 @@ fn test_reject_missing_data() { &["1-done.json", "1-done.stamp"], &["0-missing-data.json", "0-missing-data.reason", "0-missing-data.stamp"], &["no 'data'\n"], + &[], + &[], &[]); } @@ -477,6 +510,8 @@ fn test_reject_invalid_retry() { "'retry' is not an object\n", "'retry' is not an object\n", "'retry' is not an object\n"], + &[], + &[], &[]); } @@ -540,6 +575,8 @@ fn test_reject_invalid_retry_object() { "retry reason is not a string\n", "retry reason is not a string\n", "retry reason is not a string\n"], + &[], + &[], &[]); } @@ -564,6 +601,8 @@ fn test_reject_no_handler() { &["1-done.json", "1-done.stamp"], &["0-no-handler.json", "0-no-handler.reason", "0-no-handler.stamp"], &["no handler for kind 'no-handler'\n"], + &[], + &[], &[]); } @@ -583,7 +622,7 @@ fn test_receive_handler_error() { } assert_eq!(handler.jobs(), ["error"]); - check_queues(tempdir.path(), &[], &[], &[], &["0-error.json"]); + check_queues(tempdir.path(), &[], &[], &[], &[], &[], &["0-error.json"]); } #[test] @@ -607,6 +646,8 @@ fn test_accept() { &["0-accept.json", "0-accept.stamp", "1-done.json", "1-done.stamp"], &[], &[], + &[], + &[], &[]); } @@ -635,6 +676,8 @@ fn test_defer() { &accept_files_ref, &["0-defer.json", "0-defer.reason", "0-defer.stamp"], &["deferring\n"], + &[], + &[], &[]); } @@ -659,6 +702,35 @@ fn test_reject() { &["1-done.json", "1-done.stamp"], &["0-reject.json", "0-reject.reason", "0-reject.stamp"], &["rejecting\n"], + &[], + &[], + &[]); +} + +#[test] +fn test_fail() { + let tempdir = test_workspace_dir("test_fail"); + let handler = TestHandler::default(); + + { + let mut director = Director::new(tempdir.path()).unwrap(); + + handler.add_to_director(&mut director).unwrap(); + drop_job(&tempdir.path(), "fail"); + drop_job(&tempdir.path(), "done"); + + let res = director.watch_directory(tempdir.path()).unwrap(); + assert_eq!(res, RunResult::Done); + } + + assert_eq!(handler.jobs(), ["fail", "done"]); + check_queues(tempdir.path(), + &["1-done.json", "1-done.stamp"], + &[], + &[], + &["0-fail.json", "0-fail.reason", "0-fail.stamp"], + &["Error { repr: Custom(Custom { kind: Other, error: StringError(\"fail\") }) \ + }\n"], &[]); } @@ -682,6 +754,8 @@ fn test_restart() { &["0-restart.json", "0-restart.stamp"], &[], &[], + &[], + &[], &[]); } @@ -706,5 +780,7 @@ fn test_done() { &["0-done.json", "0-done.stamp"], &[], &[], + &[], + &[], &["1-after.json"]); }