Commit 73d44476 authored by Brad King's avatar Brad King Committed by Kitware Robot
Browse files

Merge topic 'tokio-uplift'

11b6848b handlers: use `tokio::task::spawn_blocking` to handle jobs
f7c21cf9 cargo: update the test suite to use tokio 1.0
fd436294 cargo: update json-job-dispatch
3ed0121b cargo: drop unnecessary features of `chrono`
9870621f

 cargo: remove `failure` direct dependency
Acked-by: Kitware Robot's avatarKitware Robot <kwrobot@kitware.com>
Tested-by: buildbot's avatarbuildbot <buildbot@kitware.com>
Merge-request: !302
parents 5bf156a4 11b6848b
Pipeline #287134 passed with stages
in 28 minutes and 22 seconds
This diff is collapsed.
......@@ -14,21 +14,21 @@ keywords = ["git", "workflow", "daemon", "ghostflow"]
edition = "2018"
[dev-dependencies]
futures = "~0.1"
async-stream = "~0.3"
futures = "~0.3"
lazy_static = "^1"
serial_test = "~0.6"
tempfile = "^3.2.0"
tokio = "~0.1.8"
tokio-uds = "~0.2"
tokio = { version = "^1.0", default-features = false, features = ["io-util", "net", "rt", "sync", "time"] }
tokio-stream = { version = "~0.1", default-features = false, features = ["net", "time"] }
[dependencies]
async-trait = "~0.1"
boxfnonce = "~0.1"
chrono = "~0.4"
chrono = { version = "~0.4.16", default-features = false }
clap = { version = "^3", features = ["cargo"] }
either = "^1.0"
env_logger = "~0.9"
failure = "~0.1"
#ghostflow = "~0.1"
#ghostflow-gitlab = "~0.1"
#ghostflow-github = "~0.1"
......@@ -39,7 +39,7 @@ git-topic-stage = "^4.0"
git-workarea = "^4.0"
graphql_client = "~0.10"
itertools = "~0.10"
json-job-dispatch = "^3"
json-job-dispatch = "^3.0.1"
lazy-init = "~0.5"
log = "~0.4"
rayon = "^1.0"
......@@ -50,6 +50,7 @@ serde_json = "^1.0"
serde_yaml = "~0.8"
systemd = { version = "~0.10.0", optional = true }
thiserror = "^1.0.2"
tokio = { version = "^1.0", default-features = false, features = ["rt"] }
topological-sort = "~0.1"
#ttl_cache = "~0.5"
ttl_cache = { git = "https://github.com/mathstuf/rust-ttl_cache.git", branch = "add-ttl-value" }
......
......@@ -155,7 +155,7 @@ pub fn host_handler(
/// The handler for Github events.
struct GithubHandler {
/// The host block for this handler.
host: RwLock<Host>,
host: Arc<RwLock<Host>>,
/// The specific `GithubService` for this connection.
///
/// This is required because we need to make our own query due to a lack of information in
......@@ -171,7 +171,7 @@ impl GithubHandler {
/// Create a new handler.
fn new(host: Host, github: Arc<GithubService>, name: String) -> Self {
GithubHandler {
host: RwLock::new(host),
host: Arc::new(RwLock::new(host)),
github,
name,
}
......@@ -209,10 +209,16 @@ impl GithubHandler {
}
/// Handle a job.
fn handle_kind(&self, kind: &str, object: &Value, can_defer: bool) -> JobResult {
match kind {
fn handle_kind(
host: Arc<RwLock<Host>>,
github: Arc<GithubService>,
kind: String,
object: Value,
can_defer: bool,
) -> JobResult {
match kind.as_ref() {
"check_run" => {
Self::parse_object(object, |hook: CheckRunEvent| {
Self::parse_object(&object, |hook: CheckRunEvent| {
info!(
target: "github",
"check run {} {:?} on {}",
......@@ -237,7 +243,7 @@ impl GithubHandler {
})
},
"check_suite" => {
Self::parse_object(object, |hook: CheckSuiteEvent| {
Self::parse_object(&object, |hook: CheckSuiteEvent| {
info!(
target: "github",
"check suite {:?}",
......@@ -250,7 +256,7 @@ impl GithubHandler {
})
},
"github_app_authorization" => {
Self::parse_object(object, |hook: GithubAppAuthorizationEvent| {
Self::parse_object(&object, |hook: GithubAppAuthorizationEvent| {
info!(
target: "github",
"application authorization {:?}",
......@@ -261,7 +267,7 @@ impl GithubHandler {
})
},
"installation" => {
Self::parse_object(object, |hook: InstallationEvent| {
Self::parse_object(&object, |hook: InstallationEvent| {
info!(
target: "github",
"installation hook {:?}: {} by {}",
......@@ -274,7 +280,7 @@ impl GithubHandler {
})
},
"installation_repositories" => {
Self::parse_object(object, |hook: InstallationRepositoriesEvent| {
Self::parse_object(&object, |hook: InstallationRepositoriesEvent| {
info!(
target: "github",
"installation hook {:?} for {:?} repositories: {} by {}: {}{}",
......@@ -290,12 +296,12 @@ impl GithubHandler {
})
},
"issue_comment" => {
Self::parse_object(object, |hook: IssueCommentEvent| {
GithubMergeRequestNoteInfo::from_web_hook(self.github.as_ref(), &hook)
Self::parse_object(&object, |hook: IssueCommentEvent| {
GithubMergeRequestNoteInfo::from_web_hook(github.as_ref(), &hook)
.map(|note| {
if let Some(note) = note {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_merge_request_note(object, &host, &note, can_defer)
let host = host.read().expect(HOST_LOCK_POISONED);
handle_merge_request_note(&object, &host, &note, can_defer)
} else {
JobResult::reject(format!(
"comment on non-pr: {}#{}",
......@@ -307,34 +313,34 @@ impl GithubHandler {
})
},
"member" => {
Self::parse_object(object, |hook: MemberEvent| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_project_membership_refresh(object, &host, &hook.repository.full_name)
Self::parse_object(&object, |hook: MemberEvent| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_project_membership_refresh(&object, &host, &hook.repository.full_name)
})
},
"membership" => {
Self::parse_object(object, |hook: MembershipEvent| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_group_membership_refresh(object, &host, &hook.organization.login)
Self::parse_object(&object, |hook: MembershipEvent| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_group_membership_refresh(&object, &host, &hook.organization.login)
})
},
"organization" => {
Self::parse_object(object, |hook: OrganizationEvent| {
Self::parse_object(&object, |hook: OrganizationEvent| {
if let OrganizationAction::MemberInvited = hook.action {
return JobResult::Accept;
}
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_group_membership_refresh(object, &host, &hook.organization.login)
let host = host.read().expect(HOST_LOCK_POISONED);
handle_group_membership_refresh(&object, &host, &hook.organization.login)
})
},
"pull_request_review" => {
Self::parse_object(object, |hook: PullRequestReviewEvent| {
GithubMergeRequestNoteInfo::from_web_hook_review(self.github.as_ref(), &hook)
Self::parse_object(&object, |hook: PullRequestReviewEvent| {
GithubMergeRequestNoteInfo::from_web_hook_review(github.as_ref(), &hook)
.map(|note_res| {
match note_res {
Ok(note) => {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_merge_request_note(object, &host, &note, can_defer)
let host = host.read().expect(HOST_LOCK_POISONED);
handle_merge_request_note(&object, &host, &note, can_defer)
},
Err(err) => {
if let GithubMergeRequestError::Desync {
......@@ -352,12 +358,12 @@ impl GithubHandler {
})
},
"pull_request" => {
Self::parse_object(object, |hook: PullRequestEvent| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: PullRequestEvent| {
let host = host.read().expect(HOST_LOCK_POISONED);
GithubMergeRequestInfo::from_web_hook(host.service.as_ref(), &hook)
.map(|mr_res| {
match mr_res {
Ok(mr) => handle_merge_request_update(object, &host, &mr),
Ok(mr) => handle_merge_request_update(&object, &host, &mr),
Err(err) => {
if let GithubMergeRequestError::Desync {
..
......@@ -374,53 +380,52 @@ impl GithubHandler {
})
},
"push" => {
Self::parse_object(object, |hook: PushEvent| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: PushEvent| {
let host = host.read().expect(HOST_LOCK_POISONED);
GithubPushInfo::from_web_hook(host.service.as_ref(), hook)
.map(|push| handle_push(object, &host, &push))
.map(|push| handle_push(&object, &host, &push))
.unwrap_or_else(JobResult::fail)
})
},
"team" => {
Self::parse_object(object, |hook: TeamEvent| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: TeamEvent| {
let host = host.read().expect(HOST_LOCK_POISONED);
if let Some(ref repo) = hook.repository {
handle_project_membership_refresh(object, &host, &repo.full_name)
handle_project_membership_refresh(&object, &host, &repo.full_name)
} else if let Some(ref org) = hook.organization {
handle_group_membership_refresh(object, &host, &org.login)
handle_group_membership_refresh(&object, &host, &org.login)
} else {
JobResult::reject("no associated repository or organization")
}
})
},
"team_add" => {
Self::parse_object(object, |hook: TeamAddEvent| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_project_membership_refresh(object, &host, &hook.repository.full_name)
Self::parse_object(&object, |hook: TeamAddEvent| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_project_membership_refresh(&object, &host, &hook.repository.full_name)
})
},
"clear_test_refs" => {
Self::parse_object(object, |data: BatchBranchJob<ClearTestRefs>| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_clear_test_refs(object, &host, data)
Self::parse_object(&object, |data: BatchBranchJob<ClearTestRefs>| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_clear_test_refs(&object, &host, data)
})
},
"tag_stage" => {
Self::parse_object(object, |data: BatchBranchJob<TagStage>| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_stage_tag(object, &host, data)
Self::parse_object(&object, |data: BatchBranchJob<TagStage>| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_stage_tag(&object, &host, data)
})
},
"update_follow_refs" => {
Self::parse_object(object, |data: BatchBranchJob<UpdateFollowRefs>| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_update_follow_refs(object, &host, data)
Self::parse_object(&object, |data: BatchBranchJob<UpdateFollowRefs>| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_update_follow_refs(&object, &host, data)
})
},
"reset_failed_projects" => {
Self::parse_object(object, |_: ResetFailedProjects| {
self.host
.write()
Self::parse_object(&object, |_: ResetFailedProjects| {
host.write()
.expect(HOST_LOCK_POISONED)
.reset_failed_projects();
JobResult::Accept
......@@ -489,6 +494,15 @@ impl Handler for GithubHandler {
Either::Right(res) => return Ok(res),
};
Ok(self.handle_kind(kind, object, can_defer))
let host = self.host.clone();
let github = self.github.clone();
let kind = kind.into();
let object = object.clone();
tokio::task::spawn_blocking(move || {
Self::handle_kind(host, github, kind, object, can_defer)
})
.await
.map_err(|err| Box::new(err) as Box<dyn Error + Send + Sync>)
}
}
......@@ -121,9 +121,9 @@ pub fn host_handler(
/// The handler for Gitlab events.
struct GitlabHandler {
/// Handle to a private client.
gitlab: Gitlab,
gitlab: Arc<Gitlab>,
/// The host block for this handler.
host: RwLock<Host>,
host: Arc<RwLock<Host>>,
/// The name to use for this handler.
name: String,
}
......@@ -134,8 +134,8 @@ impl GitlabHandler {
/// Create a new handler.
fn new(gitlab: Gitlab, host: Host, name: String) -> Self {
GitlabHandler {
gitlab,
host: RwLock::new(host),
gitlab: Arc::new(gitlab),
host: Arc::new(RwLock::new(host)),
name,
}
}
......@@ -172,11 +172,17 @@ impl GitlabHandler {
}
/// Handle a job.
fn handle_kind(&self, kind: &str, object: &Value, can_defer: bool) -> JobResult {
match kind {
fn handle_kind(
host: Arc<RwLock<Host>>,
gitlab: Arc<Gitlab>,
kind: String,
object: Value,
can_defer: bool,
) -> JobResult {
match kind.as_ref() {
"merge_request" => {
Self::parse_object(object, |hook: MergeRequestHook| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: MergeRequestHook| {
let host = host.read().expect(HOST_LOCK_POISONED);
let info = GitlabMergeRequestInfo::from_web_hook(
host.service.as_ref(),
&hook.object_attributes,
......@@ -185,20 +191,20 @@ impl GitlabHandler {
// XXX(gitlab): Only needed until something like this feature has landed:
// https://gitlab.com/gitlab-org/gitlab/merge_requests/18213
if let Ok(mr) = info.as_ref() {
self.unprotect_source_branch(&host, mr);
Self::unprotect_source_branch(&host, &gitlab, mr);
}
info.map(|mr| handle_merge_request_update(object, &host, &mr))
info.map(|mr| handle_merge_request_update(&object, &host, &mr))
.unwrap_or_else(JobResult::fail)
})
},
"note" => {
Self::parse_object(object, |hook: NoteHook| {
Self::parse_object(&object, |hook: NoteHook| {
let note_type = hook.object_attributes.noteable_type;
if let NoteType::MergeRequest = note_type {
let host = self.host.read().expect(HOST_LOCK_POISONED);
let host = host.read().expect(HOST_LOCK_POISONED);
GitlabMergeRequestNoteInfo::from_web_hook(host.service.as_ref(), &hook)
.map(|note| handle_merge_request_note(object, &host, &note, can_defer))
.map(|note| handle_merge_request_note(&object, &host, &note, can_defer))
.unwrap_or_else(JobResult::fail)
} else {
JobResult::reject(format!("unhandled noteable type: {:?}", note_type))
......@@ -206,66 +212,65 @@ impl GitlabHandler {
})
},
"push" => {
Self::parse_object(object, |hook: PushHook| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: PushHook| {
let host = host.read().expect(HOST_LOCK_POISONED);
GitlabPushInfo::from_web_hook(host.service.as_ref(), hook)
.map(|push| handle_push(object, &host, &push))
.map(|push| handle_push(&object, &host, &push))
.unwrap_or_else(JobResult::fail)
})
},
"project_create" => {
Self::parse_object(object, |hook: ProjectSystemHook| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: ProjectSystemHook| {
let host = host.read().expect(HOST_LOCK_POISONED);
host.service
.repo(&hook.path_with_namespace)
.map(|project| handle_project_creation(object, &host, &project))
.map(|project| handle_project_creation(&object, &host, &project))
.unwrap_or_else(JobResult::fail)
})
},
"user_add_to_team" | "user_remove_from_team" => {
Self::parse_object(object, |hook: ProjectMemberSystemHook| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: ProjectMemberSystemHook| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_project_membership_refresh(
object,
&object,
&host,
&hook.project_path_with_namespace,
)
})
},
"user_add_to_group" | "user_remove_from_group" => {
Self::parse_object(object, |hook: GroupMemberSystemHook| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_group_membership_refresh(object, &host, &hook.group_name)
Self::parse_object(&object, |hook: GroupMemberSystemHook| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_group_membership_refresh(&object, &host, &hook.group_name)
})
},
"clear_test_refs" => {
Self::parse_object(object, |data: BatchBranchJob<ClearTestRefs>| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_clear_test_refs(object, &host, data)
Self::parse_object(&object, |data: BatchBranchJob<ClearTestRefs>| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_clear_test_refs(&object, &host, data)
})
},
"tag_stage" => {
Self::parse_object(object, |data: BatchBranchJob<TagStage>| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_stage_tag(object, &host, data)
Self::parse_object(&object, |data: BatchBranchJob<TagStage>| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_stage_tag(&object, &host, data)
})
},
"update_follow_refs" => {
Self::parse_object(object, |data: BatchBranchJob<UpdateFollowRefs>| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_update_follow_refs(object, &host, data)
Self::parse_object(&object, |data: BatchBranchJob<UpdateFollowRefs>| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_update_follow_refs(&object, &host, data)
})
},
"ensure_project_state" => {
Self::parse_object(object, |ensure: support::EnsureProjectState| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
ensure.check_all_projects(&self.gitlab, &host)
Self::parse_object(&object, |ensure: support::EnsureProjectState| {
let host = host.read().expect(HOST_LOCK_POISONED);
ensure.check_all_projects(&gitlab, &host)
})
},
"reset_failed_projects" => {
Self::parse_object(object, |_: ResetFailedProjects| {
self.host
.write()
Self::parse_object(&object, |_: ResetFailedProjects| {
host.write()
.expect(HOST_LOCK_POISONED)
.reset_failed_projects();
JobResult::Accept
......@@ -275,7 +280,7 @@ impl GitlabHandler {
}
}
fn unprotect_source_branch(&self, host: &Host, mr: &Info) {
fn unprotect_source_branch(host: &Host, gitlab: &Gitlab, mr: &Info) {
let project =
if let Ok(project) = utils::get_project(host, &mr.merge_request.target_repo.name) {
project
......@@ -287,7 +292,7 @@ impl GitlabHandler {
return;
}
support::unprotect_source_branch(&self.gitlab, host, mr)
support::unprotect_source_branch(gitlab, host, mr)
}
}
......@@ -344,6 +349,15 @@ impl Handler for GitlabHandler {
Either::Right(res) => return Ok(res),
};
Ok(self.handle_kind(kind, object, can_defer))
let host = self.host.clone();
let gitlab = self.gitlab.clone();
let kind = kind.into();
let object = object.clone();
tokio::task::spawn_blocking(move || {
Self::handle_kind(host, gitlab, kind, object, can_defer)
})
.await
.map_err(|err| Box::new(err) as Box<dyn Error + Send + Sync>)
}
}
......@@ -6,12 +6,11 @@
use std::sync::{self, Arc, RwLock, RwLockReadGuard};
use futures::sync::mpsc::{self, Sender};
use futures::{Future, Sink};
use ghostflow::host::*;
use ghostflow::utils::TrailerRef;
use git_workarea::{CommitId, GitContext, GitError};
use thiserror::Error;
use tokio::sync::mpsc::{self, Sender};
use crate::ghostflow_ext::{AccessLevel, DirectorHostingService, HostingServiceExt, Membership};
use crate::handlers::test::data;
......@@ -28,7 +27,7 @@ enum HostError {
#[error("failed to write to server channel: {}", source)]
WriteChannel {
#[from]
source: mpsc::SendError<ServiceEvent>,
source: mpsc::error::SendError<ServiceEvent>,
},
}
......@@ -103,7 +102,7 @@ impl TestHost {
fn send(&self, event: ServiceEvent) -> Result<(), HostError> {
let sender = self.writer.clone();
sender.send(event).wait()?;
sender.blocking_send(event)?;
Ok(())
}
......
......@@ -8,7 +8,7 @@ use std::cmp::Ordering;
use std::error::Error;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::sync::RwLock;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use async_trait::async_trait;
......@@ -27,7 +27,7 @@ use crate::handlers::test::hooks::*;
/// The handler for test service jobs.
pub struct TestHandler {
/// The host block for this handler.
host: RwLock<Host>,
host: Arc<RwLock<Host>>,
}
#[derive(Deserialize, Debug)]
......@@ -67,7 +67,7 @@ impl TestHandler {
/// Create a new handler.
pub fn new(host: Host) -> Self {
TestHandler {
host: RwLock::new(host),
host: Arc::new(RwLock::new(host)),
}
}
......@@ -87,52 +87,57 @@ impl TestHandler {
}
/// Handle a job.
fn handle_kind(&self, kind: &str, object: &Value, can_defer: bool) -> JobResult {
match kind {
fn handle_kind(
host: Arc<RwLock<Host>>,
kind: String,
object: Value,
can_defer: bool,
) -> JobResult {
match kind.as_ref() {
"merge_request" => {
Self::parse_object(object, |hook: MergeRequestHook| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: MergeRequestHook| {
let host = host.read().expect(HOST_LOCK_POISONED);
if hook.action == MergeRequestAction::Comment {
handle_merge_request_note(object, &host, &hook.into(), can_defer)
handle_merge_request_note(&object, &host, &hook.into(), can_defer)
} else {
handle_merge_request_update(object, &host, &hook.into())
handle_merge_request_update(&object, &host, &hook.into())
}
})
},
"push" => {
Self::parse_object(object, |hook: PushHook| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
handle_push(object, &host, &hook.into())
Self::parse_object(&object, |hook: PushHook| {
let host = host.read().expect(HOST_LOCK_POISONED);
handle_push(&object, &host, &hook.into())
})
},
"project" => {
Self::parse_object(object, |hook: ProjectHook| {
let host = self.host.read().expect(HOST_LOCK_POISONED);
Self::parse_object(&object, |hook: ProjectHook| {
let host = host.read().expect(HOST_LOCK_POISONED);
let repo = hook.project.into();
handle_project_creation(object, &host, &repo)
handle_project_creation(&object, &host, &repo)
})
},