From b101a3faded68bc162d023100757e87b07328cf3 Mon Sep 17 00:00:00 2001 From: Sam Rijs Date: Mon, 5 Feb 2018 19:38:04 +1100 Subject: [PATCH] throttle calls to get popular repos from github --- src/engine/mod.rs | 34 +++++++++-- src/interactors/github.rs | 65 ++++++++++++-------- src/main.rs | 6 +- src/models/repo.rs | 10 ++-- src/utils/mod.rs | 1 + src/utils/throttle.rs | 123 ++++++++++++++++++++++++++++++++++++++ 6 files changed, 198 insertions(+), 41 deletions(-) create mode 100644 src/utils/mod.rs create mode 100644 src/utils/throttle.rs diff --git a/src/engine/mod.rs b/src/engine/mod.rs index f5c531f..96af9bc 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -1,10 +1,16 @@ -mod analyzer; +use std::sync::Arc; +use std::time::Duration; use futures::{Future, Stream, stream}; use hyper::Client; use hyper::client::HttpConnector; use hyper_tls::HttpsConnector; use slog::Logger; +use tokio_service::Service; + +mod analyzer; + +use ::utils::throttle::{Throttle, ThrottledError}; use ::models::repo::{Repository, RepoPath}; use ::models::crates::{CrateName, CrateRelease, CrateManifest, AnalyzedDependencies}; @@ -13,17 +19,31 @@ use ::parsers::manifest::{ManifestParseError, parse_manifest_toml}; use ::interactors::crates::{QueryCrateError, query_crate}; use ::interactors::github::{RetrieveFileAtPathError, retrieve_file_at_path}; -use ::interactors::github::get_popular_repos; -pub use ::interactors::github::GetPopularReposError; +use ::interactors::github::GetPopularRepos; use self::analyzer::DependencyAnalyzer; #[derive(Clone, Debug)] pub struct Engine { - pub client: Client>, - pub logger: Logger + client: Client>, + logger: Logger, + + get_popular_repos: Arc>>>> } +impl Engine { + pub fn new(client: Client>, logger: Logger) -> Engine { + Engine { + client: client.clone(), logger, + + get_popular_repos: Arc::new(Throttle::new(GetPopularRepos(client), Duration::from_secs(10))) + } + } +} + +#[derive(Debug)] +pub struct GetPopularReposError(ThrottledError<::interactors::github::GetPopularReposError>); + #[derive(Debug)] pub enum AnalyzeDependenciesError { QueryCrate(QueryCrateError), @@ -42,7 +62,9 @@ impl Engine { pub fn get_popular_repos(&self) -> impl Future, Error=GetPopularReposError> { - get_popular_repos(self.client.clone()) + self.get_popular_repos.call(()) + .map_err(GetPopularReposError) + .map(|repos| repos.clone()) } pub fn analyze_dependencies(&self, repo_path: RepoPath) -> diff --git a/src/interactors/github.rs b/src/interactors/github.rs index fdb4016..f22169f 100644 --- a/src/interactors/github.rs +++ b/src/interactors/github.rs @@ -74,33 +74,46 @@ struct GithubOwner { login: String } -pub fn get_popular_repos(service: S) -> - impl Future, Error=GetPopularReposError> - where S: Service +#[derive(Debug, Clone)] +pub struct GetPopularRepos(pub S); + +impl Service for GetPopularRepos + where S: Service + Clone + 'static, + S::Future: 'static { - let uri_future = format!("{}/search/repositories?q=language:rust&sort=stars", GITHUB_API_BASE_URI) - .parse().into_future().map_err(GetPopularReposError::Uri); + type Request = (); + type Response = Vec; + type Error = GetPopularReposError; + type Future = Box>; - uri_future.and_then(move |uri| { - let mut request = Request::new(Method::Get, uri); - request.headers_mut().set(UserAgent::new("deps.rs")); + fn call(&self, _req: ()) -> Self::Future { + println!("call api"); + let service = self.0.clone(); - service.call(request).map_err(GetPopularReposError::Transport).and_then(|response| { - let status = response.status(); - if !status.is_success() { - future::Either::A(future::err(GetPopularReposError::Status(status))) - } else { - let body_future = response.body().concat2().map_err(GetPopularReposError::Transport); - let decode_future = body_future - .and_then(|body| serde_json::from_slice(body.as_ref()).map_err(GetPopularReposError::Decode)); - future::Either::B(decode_future.and_then(|search_response: GithubSearchResponse| { - search_response.items.into_iter().map(|item| { - let path = RepoPath::from_parts("github", &item.owner.login, &item.name) - .map_err(GetPopularReposError::Validate)?; - Ok(Repository { path, description: item.description }) - }).collect::, _>>() - })) - } - }) - }) + let uri_future = format!("{}/search/repositories?q=language:rust&sort=stars", GITHUB_API_BASE_URI) + .parse().into_future().map_err(GetPopularReposError::Uri); + + Box::new(uri_future.and_then(move |uri| { + let mut request = Request::new(Method::Get, uri); + request.headers_mut().set(UserAgent::new("deps.rs")); + + service.call(request).map_err(GetPopularReposError::Transport).and_then(|response| { + let status = response.status(); + if !status.is_success() { + future::Either::A(future::err(GetPopularReposError::Status(status))) + } else { + let body_future = response.body().concat2().map_err(GetPopularReposError::Transport); + let decode_future = body_future + .and_then(|body| serde_json::from_slice(body.as_ref()).map_err(GetPopularReposError::Decode)); + future::Either::B(decode_future.and_then(|search_response: GithubSearchResponse| { + search_response.items.into_iter().map(|item| { + let path = RepoPath::from_parts("github", &item.owner.login, &item.name) + .map_err(GetPopularReposError::Validate)?; + Ok(Repository { path, description: item.description }) + }).collect::, _>>() + })) + } + }) + })) + } } diff --git a/src/main.rs b/src/main.rs index 156fadf..02d8377 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,6 +20,7 @@ extern crate tokio_core; extern crate tokio_service; extern crate toml; +mod utils; mod models; mod parsers; mod interactors; @@ -65,10 +66,7 @@ fn main() { let http = Http::new(); - let engine = Engine { - client: client.clone(), - logger: logger.clone() - }; + let engine = Engine::new(client.clone(), logger.clone()); let server = Server::new(engine); diff --git a/src/models/repo.rs b/src/models/repo.rs index 3e29cc6..6f39bc5 100644 --- a/src/models/repo.rs +++ b/src/models/repo.rs @@ -1,12 +1,12 @@ use std::str::FromStr; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Repository { pub path: RepoPath, pub description: String } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RepoPath { pub site: RepoSite, pub qual: RepoQualifier, @@ -26,7 +26,7 @@ impl RepoPath { #[derive(Debug)] pub struct RepoValidationError; -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] pub enum RepoSite { Github } @@ -58,7 +58,7 @@ impl AsRef for RepoSite { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RepoQualifier(String); impl FromStr for RepoQualifier { @@ -83,7 +83,7 @@ impl AsRef for RepoQualifier { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RepoName(String); impl FromStr for RepoName { diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..dbec1e4 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1 @@ +pub mod throttle; diff --git a/src/utils/throttle.rs b/src/utils/throttle.rs new file mode 100644 index 0000000..5f3b620 --- /dev/null +++ b/src/utils/throttle.rs @@ -0,0 +1,123 @@ +use std::error::Error; +use std::fmt::{Debug, Display, Formatter, Result as FmtResult}; +use std::time::{Duration, Instant}; +use std::ops::Deref; +use std::sync::Mutex; + +use futures::{Future, Poll}; +use futures::future::{Shared, SharedError, SharedItem}; +use tokio_service::Service; + +pub struct Throttle> { + inner: S, + duration: Duration, + current: Mutex)>> +} + +impl Debug for Throttle + where S: Service + Debug, +{ + fn fmt(&self, fmt: &mut Formatter) -> FmtResult { + fmt.debug_struct("Throttle") + .field("inner", &self.inner) + .field("duration", &self.duration) + .finish() + } +} + +impl> Throttle { + pub fn new(service: S, duration: Duration) -> Throttle { + Throttle { + inner: service, + duration, + current: Mutex::new(None) + } + } +} + +impl> Service for Throttle { + type Request = (); + type Response = ThrottledItem; + type Error = ThrottledError; + type Future = Throttled; + + fn call(&self, _: ()) -> Self::Future { + let now = Instant::now(); + let mut current = self.current.lock().expect("lock poisoned"); + if let Some((valid_until, ref shared_future)) = *current { + if valid_until > now { + if let Some(Ok(_)) = shared_future.peek() { + return Throttled(shared_future.clone()); + } + } + } + let shared_future = self.inner.call(()).shared(); + *current = Some((now + self.duration, shared_future.clone())); + Throttled(shared_future) + } +} + +pub struct Throttled(Shared); + +impl Debug for Throttled + where F: Future + Debug, + F::Item: Debug, + F::Error: Debug +{ + fn fmt(&self, fmt: &mut Formatter) -> FmtResult { + self.0.fmt(fmt) + } +} + +impl Future for Throttled { + type Item = ThrottledItem; + type Error = ThrottledError; + + fn poll(&mut self) -> Poll { + self.0.poll() + .map_err(ThrottledError) + .map(|async| async.map(ThrottledItem)) + } +} + +#[derive(Debug)] +pub struct ThrottledItem(SharedItem); + +impl Deref for ThrottledItem { + type Target = T; + + fn deref(&self) -> &T { + &self.0.deref() + } +} + +#[derive(Debug)] +pub struct ThrottledError(SharedError); + +impl Deref for ThrottledError { + type Target = E; + + fn deref(&self) -> &E { + &self.0.deref() + } +} + +impl Display for ThrottledError + where E: Display, +{ + fn fmt(&self, f: &mut Formatter) -> FmtResult { + self.0.fmt(f) + } +} + +impl Error for ThrottledError + where E: Error, +{ + fn description(&self) -> &str { + self.0.description() + } + + fn cause(&self) -> Option<&Error> { + self.0.cause() + } +}