All ProjectsHome
jobrunner
jobrunner/src/run.rs
run.rs Raw
extern crate toml;
extern crate walkdir;

use chrono;

use job;
use config;

use std::error::Error;
use std::fmt;
use std::fs;
use std::fs::File;
use std::io;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::process;

#[derive(Debug,Clone)]
pub struct Run {
    work_base_dir: PathBuf,
    work_dir: Option<PathBuf>,
    archive_base_dir: PathBuf,
    archive_dir: Option<PathBuf>,
    run_start: Option<chrono::DateTime<chrono::offset::Local>>,
    run_end: Option<chrono::DateTime<chrono::offset::Local>>,
    job: job::Job,
}

#[derive(Debug,Clone)]
pub struct Output {
    pub archive_dir: PathBuf,
    pub work_dir: PathBuf,
    pub run_start: chrono::DateTime<chrono::offset::Local>,
    pub run_end: chrono::DateTime<chrono::offset::Local>,
}

#[derive(Debug)]
pub enum RunError {
    Io(io::Error),
    SetupIncomplete,
    RunIncomplete,
    OldRun,
}

impl From<io::Error> for RunError {
    fn from(err: io::Error) -> RunError {
        RunError::Io(err)
    }
}

impl fmt::Display for RunError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match *self {
            RunError::Io(ref err) => err.fmt(f),
            _ => write!(f, "{}", self.description()),
        }
    }
}

impl Error for RunError {
    fn description(&self) -> &str {
        match *self {
            RunError::Io(ref err) => err.description(),
            RunError::SetupIncomplete => "Attempted to use a run without calling setup()",
            RunError::RunIncomplete => "Attempted to collect outputs or artifacts of an incomplete run",
            RunError::OldRun => "Cannot run() an old run, create a new one for the same job",
        }
    }
}

fn create_new_dir(path: &Path) -> Result<(), RunError> {
    match fs::metadata(path) {
        Ok(_) => return Err(RunError::Io(io::Error::new(io::ErrorKind::Other, format!("Directory {:?} already exists!", path)))),
        Err(_) => {},
    }
    try!(fs::create_dir_all(path));
    Ok(())
}

fn write_output(o: &process::Output, d: &Path) ->io::Result<()> {
    fn write_file(path: &Path, data: &[u8]) -> io::Result<()> {
        let mut f = try!(File::create(path));
        try!(f.write_all(data));
        Ok(())
    }
    try!(write_file(&d.join("exit_status"), format!("{}\n", o.status).as_bytes()));
    try!(write_file(&d.join("stdout"), &o.stdout));
    try!(write_file(&d.join("stderr"), &o.stderr));
    Ok(())
}

fn write_job(j: &job::Job, d: &Path) -> io::Result<()> {
    let s = toml::to_string(j).map_err(|e| {
        io::Error::new(io::ErrorKind::Other, e.to_string())
    })?;
    let mut f = try!(File::create(d.join("config.toml")));
    try!(f.write_all(s.as_bytes()));
    Ok(())
}

impl Run {
    pub fn new(job: &job::Job, config: &config::Config) -> Run {
        Run {
            work_base_dir: config.work_dir.join(&job.name).join("workspace"),
            archive_base_dir: config.work_dir.join(&job.name).join("archive"),
            work_dir: None,
            archive_dir: None,
            run_start: None,
            run_end: None,
            job: job.clone(),
        }
    }

    pub fn old(job: &job::Job, config: &config::Config, time: chrono::DateTime<chrono::offset::Local>) -> Result<Run, RunError> {
        unimplemented!();
    }

    pub fn output(&mut self) -> Result<Output, RunError> {
        //make sure this isn't an old run, fail if it is
        if let Some(_) = self.run_start {
            return Err(RunError::OldRun);
        }
        self.run_start = Some(chrono::Local::now());
        try!(self.setup());
        let mut cmd = try!(self.command());
        let output = try!(cmd.output());
        self.run_end = Some(chrono::Local::now());
        let a = match self.archive_dir {
            Some(ref x) => x,
            None => unreachable!(),
        };
        try!(write_output(&output, &a));
        try!(self.copy_artifacts());
        //these unwraps cannot fail, go ahead and panic otherwise
        Ok(Output{
            work_dir: self.work_dir.clone().unwrap(),
            archive_dir: self.archive_dir.clone().unwrap(),
            run_start: self.run_start.clone().unwrap(),
            run_end: self.run_end.clone().unwrap(),
        })
    }

    fn setup(&mut self) -> Result<(), RunError> {
        let mut dir_name = PathBuf::from(self.run_start.unwrap().format("%F").to_string());
        dir_name.push(self.run_start.unwrap().format("%H.%M.%S%z").to_string());
        let mut w = self.work_base_dir.clone();
        w.push(&dir_name);
        let mut a = self.archive_base_dir.clone();
        a.push(&dir_name);
        try!(create_new_dir(&w));
        try!(create_new_dir(&a));
        //Copy all supplementary files to work dir
        for entry in walkdir::WalkDir::new(&self.job.location) {
            let entry = match entry {
                Ok(item) => item,
                Err(_) => continue,
            };
            if entry.path() == &self.job.location { continue; };
            //strip off location from entry path
            let mut e_components = entry.path().components();
            let base_path = &self.job.location.as_path();
            for component in base_path.components() {
                //TODO: make this not panic
                if component != e_components.next().unwrap() {
                    unreachable!();
                };
            }
            let relative_path = e_components.as_path();
            match entry.metadata() {
                Ok(ref m) if m.is_dir() => try!(fs::create_dir_all(w.join(relative_path))),
                Ok(_) => { try!(fs::copy(entry.path(), w.join(relative_path))); },
                _ => continue,
            }
            //copy over file or create directory
            
        }
        try!(write_job(&self.job, &a));
        self.work_dir = Some(w);
        self.archive_dir = Some(a);
        Ok(())
    }

    fn command(&self) -> Result<process::Command, RunError> {
        let mut cmd = process::Command::new(&self.job.command);
        let w = match self.work_dir {
            Some(ref x) => x,
            None => { return Err(RunError::SetupIncomplete); },
        };
        cmd.env_clear().current_dir(&w);
        for (key, value) in &self.job.environment {
            cmd.env(key, value);
        }
        Ok(cmd)
    }

    fn copy_artifacts(&self) -> Result<(), RunError> {
        if !self.run_end.is_some() {
            return Err(RunError::RunIncomplete);
        }
        for ref artifact_path in &self.job.artifacts {
            let from = self.work_dir.clone().unwrap().join(artifact_path);
            let to = self.archive_dir.clone().unwrap().join(artifact_path.file_name().unwrap());
            try!(fs::copy(from, to));
        }
        Ok(())
    }
}