run.rs
Raw
extern crate toml;
extern crate walkdir;
use chrono;
use job;
use config;
use std::error::Error;
use std::fmt;
use std::fs;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::process;
#[derive(Debug,Clone)]
pub struct Run {
work_base_dir: PathBuf,
work_dir: Option<PathBuf>,
archive_base_dir: PathBuf,
archive_dir: Option<PathBuf>,
run_start: Option<chrono::DateTime<chrono::offset::Local>>,
run_end: Option<chrono::DateTime<chrono::offset::Local>>,
job: job::Job,
}
#[derive(Debug,Clone)]
pub struct Output {
pub archive_dir: PathBuf,
pub work_dir: PathBuf,
pub run_start: chrono::DateTime<chrono::offset::Local>,
pub run_end: chrono::DateTime<chrono::offset::Local>,
}
#[derive(Debug)]
pub enum RunError {
Io(io::Error),
SetupIncomplete,
RunIncomplete,
OldRun,
}
impl From<io::Error> for RunError {
fn from(err: io::Error) -> RunError {
RunError::Io(err)
}
}
impl fmt::Display for RunError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match *self {
RunError::Io(ref err) => err.fmt(f),
_ => write!(f, "{}", self.description()),
}
}
}
impl Error for RunError {
fn description(&self) -> &str {
match *self {
RunError::Io(ref err) => err.description(),
RunError::SetupIncomplete => "Attempted to use a run without calling setup()",
RunError::RunIncomplete => "Attempted to collect outputs or artifacts of an incomplete run",
RunError::OldRun => "Cannot run() an old run, create a new one for the same job",
}
}
}
fn create_new_dir(path: &Path) -> Result<(), RunError> {
match fs::metadata(path) {
Ok(_) => return Err(RunError::Io(io::Error::new(io::ErrorKind::Other, format!("Directory {:?} already exists!", path)))),
Err(_) => {},
}
try!(fs::create_dir_all(path));
Ok(())
}
fn write_output(o: &process::Output, d: &Path) ->io::Result<()> {
fn write_file(path: &Path, data: &[u8]) -> io::Result<()> {
let mut f = try!(File::create(path));
try!(f.write_all(data));
Ok(())
}
try!(write_file(&d.join("exit_status"), format!("{}\n", o.status).as_bytes()));
try!(write_file(&d.join("stdout"), &o.stdout));
try!(write_file(&d.join("stderr"), &o.stderr));
Ok(())
}
fn write_job(j: &job::Job, d: &Path) -> io::Result<()> {
let s = toml::to_string(j).map_err(|e| {
io::Error::new(io::ErrorKind::Other, e.to_string())
})?;
let mut f = try!(File::create(d.join("config.toml")));
try!(f.write_all(s.as_bytes()));
Ok(())
}
impl Run {
pub fn new(job: &job::Job, config: &config::Config) -> Run {
Run {
work_base_dir: config.work_dir.join(&job.name).join("workspace"),
archive_base_dir: config.work_dir.join(&job.name).join("archive"),
work_dir: None,
archive_dir: None,
run_start: None,
run_end: None,
job: job.clone(),
}
}
pub fn old(job: &job::Job, config: &config::Config, time: chrono::DateTime<chrono::offset::Local>) -> Result<Run, RunError> {
unimplemented!();
}
pub fn output(&mut self) -> Result<Output, RunError> {
//make sure this isn't an old run, fail if it is
if let Some(_) = self.run_start {
return Err(RunError::OldRun);
}
self.run_start = Some(chrono::Local::now());
try!(self.setup());
let mut cmd = try!(self.command());
let output = try!(cmd.output());
self.run_end = Some(chrono::Local::now());
let a = match self.archive_dir {
Some(ref x) => x,
None => unreachable!(),
};
try!(write_output(&output, &a));
try!(self.copy_artifacts());
//these unwraps cannot fail, go ahead and panic otherwise
Ok(Output{
work_dir: self.work_dir.clone().unwrap(),
archive_dir: self.archive_dir.clone().unwrap(),
run_start: self.run_start.clone().unwrap(),
run_end: self.run_end.clone().unwrap(),
})
}
fn setup(&mut self) -> Result<(), RunError> {
let mut dir_name = PathBuf::from(self.run_start.unwrap().format("%F").to_string());
dir_name.push(self.run_start.unwrap().format("%H.%M.%S%z").to_string());
let mut w = self.work_base_dir.clone();
w.push(&dir_name);
let mut a = self.archive_base_dir.clone();
a.push(&dir_name);
try!(create_new_dir(&w));
try!(create_new_dir(&a));
//Copy all supplementary files to work dir
for entry in walkdir::WalkDir::new(&self.job.location) {
let entry = match entry {
Ok(item) => item,
Err(_) => continue,
};
if entry.path() == &self.job.location { continue; };
//strip off location from entry path
let mut e_components = entry.path().components();
let base_path = &self.job.location.as_path();
for component in base_path.components() {
//TODO: make this not panic
if component != e_components.next().unwrap() {
unreachable!();
};
}
let relative_path = e_components.as_path();
match entry.metadata() {
Ok(ref m) if m.is_dir() => try!(fs::create_dir_all(w.join(relative_path))),
Ok(_) => { try!(fs::copy(entry.path(), w.join(relative_path))); },
_ => continue,
}
//copy over file or create directory
}
try!(write_job(&self.job, &a));
self.work_dir = Some(w);
self.archive_dir = Some(a);
Ok(())
}
fn command(&self) -> Result<process::Command, RunError> {
let mut cmd = process::Command::new(&self.job.command);
let w = match self.work_dir {
Some(ref x) => x,
None => { return Err(RunError::SetupIncomplete); },
};
cmd.env_clear().current_dir(&w);
for (key, value) in &self.job.environment {
cmd.env(key, value);
}
Ok(cmd)
}
fn copy_artifacts(&self) -> Result<(), RunError> {
if !self.run_end.is_some() {
return Err(RunError::RunIncomplete);
}
for ref artifact_path in &self.job.artifacts {
let from = self.work_dir.clone().unwrap().join(artifact_path);
let to = self.archive_dir.clone().unwrap().join(artifact_path.file_name().unwrap());
try!(fs::copy(from, to));
}
Ok(())
}
}