diff --git a/Cargo.toml b/Cargo.toml index 3d68ad7..5983baf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ futures = "0.1.18" hyper = "0.11.15" hyper-tls = "0.1.2" lazy_static = "1.0.0" +lru-cache = "0.1.1" maud = "0.17.2" ordermap = { version = "0.4.0", features = ["serde-1"] } relative-path = { version = "0.3.7", features = ["serde"] } diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 9ff4108..b9f78a6 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -15,32 +15,40 @@ use tokio_service::Service; mod machines; mod futures; -use ::utils::throttle::Throttle; +use ::utils::cache::Cache; use ::models::repo::{Repository, RepoPath}; use ::models::crates::{CrateName, CrateRelease, AnalyzedDependencies}; -use ::interactors::crates::query_crate; -use ::interactors::github::retrieve_file_at_path; -use ::interactors::github::GetPopularRepos; +use ::interactors::crates::QueryCrate; +use ::interactors::github::{GetPopularRepos, RetrieveFileAtPath}; use self::futures::AnalyzeDependenciesFuture; use self::futures::CrawlManifestFuture; +type HttpClient = Client>; + #[derive(Clone, Debug)] pub struct Engine { - client: Client>, + client: HttpClient, logger: Logger, - get_popular_repos: Arc>>>> + query_crate: Arc>>, + get_popular_repos: Arc>>, + retrieve_file_at_path: Arc> } impl Engine { pub fn new(client: Client>, logger: Logger) -> Engine { + let query_crate = Cache::new(QueryCrate(client.clone()), Duration::from_secs(300), 500); + let get_popular_repos = Cache::new(GetPopularRepos(client.clone()), Duration::from_secs(10), 1); + Engine { client: client.clone(), logger, - get_popular_repos: Arc::new(Throttle::new(GetPopularRepos(client), Duration::from_secs(10))) + query_crate: Arc::new(query_crate), + get_popular_repos: Arc::new(get_popular_repos), + retrieve_file_at_path: Arc::new(RetrieveFileAtPath(client)) } } } @@ -97,10 +105,11 @@ impl Engine { fn fetch_releases>(&self, names: I) -> impl Iterator, Error=Error>> { - let client = self.client.clone(); + let engine = self.clone(); names.into_iter().map(move |name| { - query_crate(client.clone(), name) - .map(|resp| resp.releases) + engine.query_crate.call(name) + .from_err() + .map(|resp| resp.releases.clone()) }) } @@ -108,7 +117,7 @@ impl Engine { impl Future { let manifest_path = path.join(RelativePath::new("Cargo.toml")); - retrieve_file_at_path(self.client.clone(), &repo_path, &manifest_path).from_err() + self.retrieve_file_at_path.call((repo_path.clone(), manifest_path)) } } diff --git a/src/interactors/crates.rs b/src/interactors/crates.rs index 496ffab..774ac73 100644 --- a/src/interactors/crates.rs +++ b/src/interactors/crates.rs @@ -36,43 +36,55 @@ pub struct QueryCrateResponse { pub releases: Vec } -pub fn query_crate(service: S, crate_name: CrateName) -> - impl Future - where S: Service +#[derive(Debug, Clone)] +pub struct QueryCrate(pub S); + +impl Service for QueryCrate + where S: Service + Clone + 'static, + S::Future: 'static { - let lower_name = crate_name.as_ref().to_lowercase(); + type Request = CrateName; + type Response = QueryCrateResponse; + type Error = Error; + type Future = Box>; - let path = match lower_name.len() { - 1 => format!("1/{}", lower_name), - 2 => format!("2/{}", lower_name), - 3 => format!("3/{}/{}", &lower_name[..1], lower_name), - _ => format!("{}/{}/{}", &lower_name[0..2], &lower_name[2..4], lower_name), - }; + fn call(&self, crate_name: CrateName) -> Self::Future { + let service = self.0.clone(); - let uri_future = format!("{}/master/{}", CRATES_INDEX_BASE_URI, path) - .parse::().into_future().from_err(); + let lower_name = crate_name.as_ref().to_lowercase(); - uri_future.and_then(move |uri| { - let request = Request::new(Method::Get, uri.clone()); + let path = match lower_name.len() { + 1 => format!("1/{}", lower_name), + 2 => format!("2/{}", lower_name), + 3 => format!("3/{}/{}", &lower_name[..1], lower_name), + _ => format!("{}/{}/{}", &lower_name[0..2], &lower_name[2..4], lower_name), + }; - service.call(request).from_err().and_then(move |response| { - let status = response.status(); - if !status.is_success() { - future::Either::A(future::err(format_err!("Status code {} for URI {}", status, uri))) - } else { - let body_future = response.body().concat2().from_err(); - let decode_future = body_future.and_then(|body| { - let string_body = str::from_utf8(body.as_ref())?; - let packages = string_body.lines() - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .map(|s| serde_json::from_str::(s)) - .collect::>()?; - Ok(packages) - }); - let convert_future = decode_future.and_then(move |pkgs| convert_pkgs(&crate_name, pkgs)); - future::Either::B(convert_future) - } - }) - }) + let uri_future = format!("{}/master/{}", CRATES_INDEX_BASE_URI, path) + .parse::().into_future().from_err(); + + Box::new(uri_future.and_then(move |uri| { + let request = Request::new(Method::Get, uri.clone()); + + service.call(request).from_err().and_then(move |response| { + let status = response.status(); + if !status.is_success() { + future::Either::A(future::err(format_err!("Status code {} for URI {}", status, uri))) + } else { + let body_future = response.body().concat2().from_err(); + let decode_future = body_future.and_then(|body| { + let string_body = str::from_utf8(body.as_ref())?; + let packages = string_body.lines() + .map(|s| s.trim()) + .filter(|s| !s.is_empty()) + .map(|s| serde_json::from_str::(s)) + .collect::>()?; + Ok(packages) + }); + let convert_future = decode_future.and_then(move |pkgs| convert_pkgs(&crate_name, pkgs)); + future::Either::B(convert_future) + } + }) + })) + } } diff --git a/src/interactors/github.rs b/src/interactors/github.rs index 386e5cb..363605b 100644 --- a/src/interactors/github.rs +++ b/src/interactors/github.rs @@ -11,33 +11,46 @@ use ::models::repo::{Repository, RepoPath}; const GITHUB_API_BASE_URI: &'static str = "https://api.github.com"; const GITHUB_USER_CONTENT_BASE_URI: &'static str = "https://raw.githubusercontent.com"; -pub fn retrieve_file_at_path(service: S, repo_path: &RepoPath, path: &RelativePathBuf) -> - impl Future - where S: Service +#[derive(Debug, Clone)] +pub struct RetrieveFileAtPath(pub S); + +impl Service for RetrieveFileAtPath + where S: Service + Clone + 'static, + S::Future: 'static { - let path_str: &str = path.as_ref(); - let uri_future = format!("{}/{}/{}/HEAD/{}", - GITHUB_USER_CONTENT_BASE_URI, - repo_path.qual.as_ref(), - repo_path.name.as_ref(), - path_str - ).parse::().into_future().from_err(); + type Request = (RepoPath, RelativePathBuf); + type Response = String; + type Error = Error; + type Future = Box>; - uri_future.and_then(move |uri| { - let request = Request::new(Method::Get, uri.clone()); + fn call(&self, req: Self::Request) -> Self::Future { + let service = self.0.clone(); - service.call(request).from_err().and_then(move |response| { - let status = response.status(); - if !status.is_success() { - future::Either::A(future::err(format_err!("Status code {} for URI {}", status, uri))) - } else { - let body_future = response.body().concat2().from_err(); - let decode_future = body_future - .and_then(|body| String::from_utf8(body.to_vec()).map_err(|err| err.into())); - future::Either::B(decode_future) - } - }) - }) + let (repo_path, path) = req; + let path_str: &str = path.as_ref(); + let uri_future = format!("{}/{}/{}/HEAD/{}", + GITHUB_USER_CONTENT_BASE_URI, + repo_path.qual.as_ref(), + repo_path.name.as_ref(), + path_str + ).parse::().into_future().from_err(); + + Box::new(uri_future.and_then(move |uri| { + let request = Request::new(Method::Get, uri.clone()); + + service.call(request).from_err().and_then(move |response| { + let status = response.status(); + if !status.is_success() { + future::Either::A(future::err(format_err!("Status code {} for URI {}", status, uri))) + } else { + let body_future = response.body().concat2().from_err(); + let decode_future = body_future + .and_then(|body| String::from_utf8(body.to_vec()).map_err(|err| err.into())); + future::Either::B(decode_future) + } + }) + })) + } } #[derive(Deserialize)] diff --git a/src/main.rs b/src/main.rs index 95483b5..f48016a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ extern crate base64; extern crate hyper; extern crate hyper_tls; #[macro_use] extern crate lazy_static; +extern crate lru_cache; extern crate maud; extern crate ordermap; extern crate relative_path; diff --git a/src/models/crates.rs b/src/models/crates.rs index 2ebe7a8..e75706f 100644 --- a/src/models/crates.rs +++ b/src/models/crates.rs @@ -43,7 +43,7 @@ impl FromStr for CrateName { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct CrateRelease { pub name: CrateName, pub version: Version, diff --git a/src/utils/cache.rs b/src/utils/cache.rs new file mode 100644 index 0000000..89d2aab --- /dev/null +++ b/src/utils/cache.rs @@ -0,0 +1,127 @@ +use std::fmt::{Debug, Display, Formatter, Result as FmtResult}; +use std::hash::Hash; +use std::time::{Duration, Instant}; +use std::ops::Deref; +use std::sync::Mutex; + +use failure::{Error, Fail}; +use futures::{Future, Poll}; +use futures::future::{Shared, SharedError, SharedItem}; +use lru_cache::LruCache; +use tokio_service::Service; + +pub struct Cache + where S: Service, + S::Request: Hash + Eq +{ + inner: S, + duration: Duration, + cache: Mutex)>> +} + +impl Debug for Cache + where S: Service + Debug, + S::Request: Hash + Eq +{ + fn fmt(&self, fmt: &mut Formatter) -> FmtResult { + fmt.debug_struct("Cache") + .field("inner", &self.inner) + .field("duration", &self.duration) + .finish() + } +} + +impl Cache + where S: Service, + S::Request: Hash + Eq +{ + pub fn new(service: S, duration: Duration, capacity: usize) -> Cache { + Cache { + inner: service, + duration: duration, + cache: Mutex::new(LruCache::new(capacity)) + } + } +} + +impl Service for Cache + where S: Service, + S::Request: Clone + Hash + Eq +{ + type Request = S::Request; + type Response = CachedItem; + type Error = CachedError; + type Future = Cached; + + fn call(&self, req: Self::Request) -> Self::Future { + let now = Instant::now(); + let mut cache = self.cache.lock().expect("lock poisoned"); + if let Some(&mut (valid_until, ref shared_future)) = cache.get_mut(&req) { + if valid_until > now { + if let Some(Ok(_)) = shared_future.peek() { + return Cached(shared_future.clone()); + } + } + } + let shared_future = self.inner.call(req.clone()).shared(); + cache.insert(req, (now + self.duration, shared_future.clone())); + Cached(shared_future) + } +} + +pub struct Cached(Shared); + +impl Debug for Cached + where F: Future + Debug, + F::Item: Debug, + F::Error: Debug +{ + fn fmt(&self, fmt: &mut Formatter) -> FmtResult { + self.0.fmt(fmt) + } +} + +impl> Future for Cached { + type Item = CachedItem; + type Error = CachedError; + + fn poll(&mut self) -> Poll { + self.0.poll() + .map_err(CachedError) + .map(|async| async.map(CachedItem)) + } +} + +#[derive(Debug)] +pub struct CachedItem(SharedItem); + +impl Deref for CachedItem { + type Target = T; + + fn deref(&self) -> &T { + &self.0.deref() + } +} + +#[derive(Debug)] +pub struct CachedError(SharedError); + +impl Fail for CachedError { + fn cause(&self) -> Option<&Fail> { + Some(self.0.cause()) + } + + fn backtrace(&self) -> Option<&::failure::Backtrace> { + Some(self.0.backtrace()) + } + + fn causes(&self) -> ::failure::Causes { + self.0.causes() + } +} + +impl Display for CachedError { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + Display::fmt(&self.0, f) + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index dbec1e4..a5c08fd 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1 +1 @@ -pub mod throttle; +pub mod cache; diff --git a/src/utils/throttle.rs b/src/utils/throttle.rs deleted file mode 100644 index 21d89a5..0000000 --- a/src/utils/throttle.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::fmt::{Debug, Display, Formatter, Result as FmtResult}; -use std::time::{Duration, Instant}; -use std::ops::Deref; -use std::sync::Mutex; - -use failure::{Error, Fail}; -use futures::{Future, Poll}; -use futures::future::{Shared, SharedError, SharedItem}; -use tokio_service::Service; - -pub struct Throttle - where S: Service -{ - inner: S, - duration: Duration, - current: Mutex)>> -} - -impl Debug for Throttle - where S: Service + Debug -{ - fn fmt(&self, fmt: &mut Formatter) -> FmtResult { - fmt.debug_struct("Throttle") - .field("inner", &self.inner) - .field("duration", &self.duration) - .finish() - } -} - -impl Throttle - where S: Service -{ - pub fn new(service: S, duration: Duration) -> Throttle { - Throttle { - inner: service, - duration, - current: Mutex::new(None) - } - } -} - -impl Service for Throttle - where S: Service -{ - type Request = (); - type Response = ThrottledItem; - type Error = ThrottledError; - type Future = Throttled; - - fn call(&self, _: ()) -> Self::Future { - let now = Instant::now(); - let mut current = self.current.lock().expect("lock poisoned"); - if let Some((valid_until, ref shared_future)) = *current { - if valid_until > now { - if let Some(Ok(_)) = shared_future.peek() { - return Throttled(shared_future.clone()); - } - } - } - let shared_future = self.inner.call(()).shared(); - *current = Some((now + self.duration, shared_future.clone())); - Throttled(shared_future) - } -} - -pub struct Throttled(Shared); - -impl Debug for Throttled - where F: Future + Debug, - F::Item: Debug, - F::Error: Debug -{ - fn fmt(&self, fmt: &mut Formatter) -> FmtResult { - self.0.fmt(fmt) - } -} - -impl> Future for Throttled { - type Item = ThrottledItem; - type Error = ThrottledError; - - fn poll(&mut self) -> Poll { - self.0.poll() - .map_err(ThrottledError) - .map(|async| async.map(ThrottledItem)) - } -} - -#[derive(Debug)] -pub struct ThrottledItem(SharedItem); - -impl Deref for ThrottledItem { - type Target = T; - - fn deref(&self) -> &T { - &self.0.deref() - } -} - -#[derive(Debug)] -pub struct ThrottledError(SharedError); - -impl Fail for ThrottledError { - fn cause(&self) -> Option<&Fail> { - Some(self.0.cause()) - } - - fn backtrace(&self) -> Option<&::failure::Backtrace> { - Some(self.0.backtrace()) - } - - fn causes(&self) -> ::failure::Causes { - self.0.causes() - } -} - -impl Display for ThrottledError { - fn fmt(&self, f: &mut Formatter) -> FmtResult { - Display::fmt(&self.0, f) - } -}