From 1b66eddb066143f230d279d01b63971f07fb2ede Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Sat, 3 Oct 2020 13:08:16 +0100 Subject: [PATCH] reqwest client + caching (#58) --- Cargo.lock | 204 ++++++++++++++++++++++++++++++++++++- Cargo.toml | 2 +- src/engine/fut/analyze.rs | 1 - src/engine/fut/crawl.rs | 103 ++++++++----------- src/engine/fut/mod.rs | 2 +- src/engine/mod.rs | 154 ++++++++++++---------------- src/interactors/crates.rs | 157 ++++++++++++---------------- src/interactors/github.rs | 110 +++++++++----------- src/interactors/mod.rs | 85 +++++++--------- src/interactors/rustsec.rs | 87 +++++----------- src/main.rs | 21 +++- src/models/repo.rs | 33 +++--- src/utils/cache.rs | 94 ++++++----------- 13 files changed, 557 insertions(+), 496 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 483ae1f..5e5ffe2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,6 +66,12 @@ dependencies = [ "byte-tools", ] +[[package]] +name = "bumpalo" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" + [[package]] name = "byte-tools" version = "0.3.1" @@ -216,6 +222,21 @@ dependencies = [ "generic-array", ] +[[package]] +name = "dtoa" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b" + +[[package]] +name = "encoding_rs" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a51b8cf747471cb9499b6d59e59b0444f4c90eba8968c4e44874e92b5b64ace2" +dependencies = [ + "cfg-if", +] + [[package]] name = "fake-simd" version = "0.1.2" @@ -548,6 +569,12 @@ dependencies = [ "libc", ] +[[package]] +name = "ipnet" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135" + [[package]] name = "itoa" version = "0.4.6" @@ -563,6 +590,15 @@ dependencies = [ "libc", ] +[[package]] +name = "js-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "kernel32-sys" version = "0.2.2" @@ -700,6 +736,22 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "mime_guess" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mio" version = "0.6.22" @@ -1073,6 +1125,42 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "reqwest" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "mime_guess", + "native-tls", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-tls", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "route-recognizer" version = "0.2.0" @@ -1243,6 +1331,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" +dependencies = [ + "dtoa", + "itoa", + "serde", + "url", +] + [[package]] name = "sha-1" version = "0.8.2" @@ -1265,13 +1365,13 @@ dependencies = [ "derive_more", "futures", "hyper", - "hyper-tls", "indexmap", "lru-cache", "maud", "once_cell", "pin-project", "relative-path", + "reqwest", "route-recognizer", "rustsec", "sass-rs", @@ -1520,6 +1620,15 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.4" @@ -1561,6 +1670,12 @@ version = "0.2.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" +[[package]] +name = "version_check" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" + [[package]] name = "want" version = "0.3.0" @@ -1583,6 +1698,84 @@ version = "0.10.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" +[[package]] +name = "wasm-bindgen" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42" +dependencies = [ + "cfg-if", + "serde", + "serde_json", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307" + +[[package]] +name = "web-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.2.8" @@ -1617,6 +1810,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi 0.3.9", +] + [[package]] name = "ws2_32-sys" version = "0.2.1" diff --git a/Cargo.toml b/Cargo.toml index e4f5ab3..f90f2f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,6 @@ cadence = "0.21" derive_more = "0.99" futures = "0.3" hyper = "0.13" -hyper-tls = "0.4" indexmap = { version = "1", features = ["serde-1"] } lru-cache = "0.1" # TODO: replace unmaintained crate maud = "0.22" @@ -27,6 +26,7 @@ pin-project = "0.4" relative-path = { version = "1.3", features = ["serde"] } route-recognizer = "0.2" rustsec = "0.21" +reqwest = { version = "0.10", features = ["json"] } semver = { version = "0.11", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/src/engine/fut/analyze.rs b/src/engine/fut/analyze.rs index d090b59..6bf98ea 100644 --- a/src/engine/fut/analyze.rs +++ b/src/engine/fut/analyze.rs @@ -9,7 +9,6 @@ pub async fn analyze_dependencies( deps: CrateDeps, ) -> Result { let advisory_db = engine.fetch_advisory_db().await?; - let mut analyzer = DependencyAnalyzer::new(&deps, Some(advisory_db)); let main_deps = diff --git a/src/engine/fut/crawl.rs b/src/engine/fut/crawl.rs index afbb10f..e6c13b8 100644 --- a/src/engine/fut/crawl.rs +++ b/src/engine/fut/crawl.rs @@ -1,75 +1,56 @@ -use std::{future::Future, mem, pin::Pin, task::Context, task::Poll}; - use anyhow::Error; -use futures::{future::BoxFuture, ready, Stream}; -use futures::{stream::FuturesOrdered, FutureExt}; +use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt as _, StreamExt as _}; use relative_path::RelativePathBuf; use crate::models::repo::RepoPath; -use super::super::machines::crawler::ManifestCrawler; -pub use super::super::machines::crawler::ManifestCrawlerOutput; -use super::super::Engine; +use crate::engine::{ + machines::crawler::{ManifestCrawler, ManifestCrawlerOutput}, + Engine, +}; -#[pin_project::pin_project] -pub struct CrawlManifestFuture { - repo_path: RepoPath, +pub async fn crawl_manifest( engine: Engine, - crawler: ManifestCrawler, - #[pin] - futures: FuturesOrdered>>, -} + repo_path: RepoPath, + entry_point: RelativePathBuf, +) -> anyhow::Result { + let mut crawler = ManifestCrawler::new(); + let mut futures: FuturesOrdered>> = + FuturesOrdered::new(); + + let engine2 = engine.clone(); + let repo_path2 = repo_path.clone(); + + let fut = async move { + let contents = engine2 + .retrieve_manifest_at_path(&repo_path2, &entry_point) + .await?; + Ok((entry_point, contents)) + } + .boxed(); + + futures.push(fut); + + while let Some(item) = futures.next().await { + let (path, raw_manifest) = item?; + let output = crawler.step(path, raw_manifest)?; -impl CrawlManifestFuture { - pub fn new(engine: &Engine, repo_path: RepoPath, entry_point: RelativePathBuf) -> Self { let engine = engine.clone(); - let crawler = ManifestCrawler::new(); - let mut futures = FuturesOrdered::new(); + let repo_path = repo_path.clone(); - let future: Pin + Send>> = Box::pin( - engine - .retrieve_manifest_at_path(&repo_path, &entry_point) - .map(move |contents| contents.map(|c| (entry_point, c))), - ); - futures.push(future); + for path in output.paths_of_interest { + let engine = engine.clone(); + let repo_path = repo_path.clone(); - CrawlManifestFuture { - repo_path, - engine, - crawler, - futures, - } - } -} - -impl Future for CrawlManifestFuture { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().project(); - - match ready!(this.futures.poll_next(cx)) { - None => { - let crawler = mem::replace(&mut self.crawler, ManifestCrawler::new()); - Poll::Ready(Ok(crawler.finalize())) - } - - Some(Ok((path, raw_manifest))) => { - let output = self.crawler.step(path, raw_manifest)?; - - for path in output.paths_of_interest.into_iter() { - let future: Pin + Send>> = Box::pin( - self.engine - .retrieve_manifest_at_path(&self.repo_path, &path) - .map(move |contents| contents.map(|c| (path, c))), - ); - self.futures.push(future); - } - - self.poll(cx) - } - - Some(Err(err)) => Poll::Ready(Err(err)), + let fut = async move { + let contents = engine.retrieve_manifest_at_path(&repo_path, &path).await?; + Ok((path, contents)) + } + .boxed(); + + futures.push(fut); } } + + Ok(crawler.finalize()) } diff --git a/src/engine/fut/mod.rs b/src/engine/fut/mod.rs index a2fc499..9112e18 100644 --- a/src/engine/fut/mod.rs +++ b/src/engine/fut/mod.rs @@ -2,4 +2,4 @@ mod analyze; mod crawl; pub use self::analyze::analyze_dependencies; -pub use self::crawl::CrawlManifestFuture; +pub use self::crawl::crawl_manifest; diff --git a/src/engine/mod.rs b/src/engine/mod.rs index 0761f32..bc0f472 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -1,25 +1,21 @@ use std::{ collections::HashSet, panic::RefUnwindSafe, - sync::{Arc, Mutex}, - task::{Context, Poll}, + sync::Arc, time::{Duration, Instant}, }; use anyhow::{anyhow, Error}; use cadence::{MetricSink, NopMetricSink, StatsdClient}; -use futures::{future::try_join_all, stream::FuturesUnordered, Future, FutureExt, Stream}; -use hyper::{ - client::{HttpConnector, ResponseFuture}, - service::Service, - Body, Client, Request, Response, -}; -use hyper_tls::HttpsConnector; +use futures::{future::try_join_all, stream, StreamExt}; +use hyper::service::Service; use once_cell::sync::Lazy; use relative_path::{RelativePath, RelativePathBuf}; use rustsec::database::Database; use semver::VersionReq; use slog::Logger; +use stream::BoxStream; +use tokio::sync::Mutex; use crate::interactors::crates::{GetPopularCrates, QueryCrate}; use crate::interactors::github::GetPopularRepos; @@ -33,77 +29,57 @@ mod fut; mod machines; use self::fut::analyze_dependencies; -use self::fut::CrawlManifestFuture; - -type HttpClient = Client>; -// type HttpClient = Client; - -// workaround for hyper 0.12 not implementing Service for Client -#[derive(Debug, Clone)] -struct ServiceHttpClient(HttpClient); - -impl Service> for ServiceHttpClient { - type Response = Response; - type Error = hyper::Error; - type Future = ResponseFuture; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx) - } - - fn call(&mut self, req: Request) -> Self::Future { - self.0.request(req) - } -} +use self::fut::crawl_manifest; #[derive(Clone, Debug)] pub struct Engine { - client: HttpClient, + client: reqwest::Client, logger: Logger, metrics: StatsdClient, - // TODO: use futures aware mutex - query_crate: Arc, CrateName>>>, - get_popular_crates: Arc, ()>>>, - get_popular_repos: Arc, ()>>>, - retrieve_file_at_path: Arc>>, - fetch_advisory_db: Arc, ()>>>, + query_crate: Arc>>, + get_popular_crates: Arc>>, + get_popular_repos: Arc>>, + retrieve_file_at_path: Arc>, + fetch_advisory_db: Arc>>, } impl Engine { - pub fn new(client: HttpClient, logger: Logger) -> Engine { + pub fn new(client: reqwest::Client, logger: Logger) -> Engine { let metrics = StatsdClient::from_sink("engine", NopMetricSink); - let service_client = ServiceHttpClient(client.clone()); - let query_crate = Cache::new( - QueryCrate(service_client.clone()), + QueryCrate::new(client.clone()), Duration::from_secs(300), 500, + logger.clone(), ); let get_popular_crates = Cache::new( - GetPopularCrates(service_client.clone()), - Duration::from_secs(10), + GetPopularCrates::new(client.clone()), + Duration::from_secs(120), 1, + logger.clone(), ); let get_popular_repos = Cache::new( - GetPopularRepos(service_client.clone()), - Duration::from_secs(10), + GetPopularRepos::new(client.clone()), + Duration::from_secs(120), 1, + logger.clone(), ); let fetch_advisory_db = Cache::new( - FetchAdvisoryDatabase(service_client.clone()), - Duration::from_secs(300), + FetchAdvisoryDatabase::new(client.clone()), + Duration::from_secs(1800), 1, + logger.clone(), ); Engine { - client, + client: client.clone(), logger, metrics, query_crate: Arc::new(Mutex::new(query_crate)), get_popular_crates: Arc::new(Mutex::new(get_popular_crates)), get_popular_repos: Arc::new(Mutex::new(get_popular_repos)), - retrieve_file_at_path: Arc::new(Mutex::new(RetrieveFileAtPath(service_client))), + retrieve_file_at_path: Arc::new(Mutex::new(RetrieveFileAtPath::new(client))), fetch_advisory_db: Arc::new(Mutex::new(fetch_advisory_db)), } } @@ -141,8 +117,10 @@ impl AnalyzeDependenciesOutcome { impl Engine { pub async fn get_popular_repos(&self) -> Result, Error> { - let repos = self.get_popular_repos.lock().unwrap().call(()); - let repos = repos.await?; + let repos = { + let mut lock = self.get_popular_repos.lock().await; + lock.cached_query(()).await? + }; let filtered_repos = repos .iter() @@ -154,8 +132,8 @@ impl Engine { } pub async fn get_popular_crates(&self) -> Result, Error> { - let crates = self.get_popular_crates.lock().unwrap().call(()); - let crates = crates.await?; + let mut lock = self.get_popular_crates.lock().await; + let crates = lock.cached_query(()).await?; Ok(crates) } @@ -168,8 +146,7 @@ impl Engine { let entry_point = RelativePath::new("/").to_relative_path_buf(); let engine = self.clone(); - let manifest_future = CrawlManifestFuture::new(self, repo_path.clone(), entry_point); - let manifest_output = manifest_future.await?; + let manifest_output = crawl_manifest(self.clone(), repo_path.clone(), entry_point).await?; let engine_for_analyze = engine.clone(); let futures = manifest_output @@ -201,12 +178,10 @@ impl Engine { ) -> Result { let start = Instant::now(); - let query_response = self - .query_crate - .lock() - .unwrap() - .call(crate_path.name.clone()); - let query_response = query_response.await?; + let query_response = { + let mut lock = self.query_crate.lock().await; + lock.cached_query(crate_path.name.clone()).await? + }; let engine = self.clone(); @@ -237,8 +212,10 @@ impl Engine { name: CrateName, req: VersionReq, ) -> Result, Error> { - let query_response = self.query_crate.lock().unwrap().call(name); - let query_response = query_response.await?; + let query_response = { + let mut lock = self.query_crate.lock().await; + lock.cached_query(name).await? + }; let latest = query_response .releases @@ -250,43 +227,46 @@ impl Engine { Ok(latest) } - fn fetch_releases>( - &self, - names: I, - ) -> impl Stream, Error>> { + fn fetch_releases<'a, I>(&'a self, names: I) -> BoxStream<'a, anyhow::Result>> + where + I: IntoIterator, + ::IntoIter: Send + 'a, + { let engine = self.clone(); - names - .into_iter() - .map(|name| { - engine - .query_crate - .lock() - .unwrap() - .call(name) - .map(|resp| resp.map(|r| r.releases)) - }) - .collect::>() + let s = stream::iter(names) + .zip(stream::repeat(engine)) + .map(resolve_crate_with_engine) + .buffer_unordered(25); + + Box::pin(s) } - fn retrieve_manifest_at_path( + async fn retrieve_manifest_at_path( &self, repo_path: &RepoPath, path: &RelativePathBuf, - ) -> impl Future> { + ) -> Result { let manifest_path = path.join(RelativePath::new("Cargo.toml")); - self.retrieve_file_at_path - .lock() - .unwrap() - .call((repo_path.clone(), manifest_path)) + let mut lock = self.retrieve_file_at_path.lock().await; + Ok(lock.call((repo_path.clone(), manifest_path)).await?) } - fn fetch_advisory_db(&self) -> impl Future, Error>> { - self.fetch_advisory_db.lock().unwrap().call(()) + async fn fetch_advisory_db(&self) -> Result, Error> { + let mut lock = self.fetch_advisory_db.lock().await; + Ok(lock.cached_query(()).await?) } } +async fn resolve_crate_with_engine( + (crate_name, engine): (CrateName, Engine), +) -> anyhow::Result> { + let mut lock = engine.query_crate.lock().await; + let crate_res = lock.cached_query(crate_name).await?; + Ok(crate_res.releases) +} + static POPULAR_REPO_BLOCK_LIST: Lazy> = Lazy::new(|| { vec![ RepoPath::from_parts("github", "rust-lang", "rust"), diff --git a/src/interactors/crates.rs b/src/interactors/crates.rs index d2a021b..f95ff31 100644 --- a/src/interactors/crates.rs +++ b/src/interactors/crates.rs @@ -1,17 +1,15 @@ use std::{str, task::Context, task::Poll}; -use anyhow::{anyhow, Error}; -use futures::{ - future::{err, ok, ready, BoxFuture}, - TryFutureExt, -}; -use hyper::{ - body, header::USER_AGENT, service::Service, Body, Error as HyperError, Request, Response, Uri, -}; +use anyhow::Error; +use futures::FutureExt as _; +use hyper::service::Service; use semver::{Version, VersionReq}; use serde::Deserialize; -use crate::models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease}; +use crate::{ + models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease}, + BoxFuture, +}; const CRATES_INDEX_BASE_URI: &str = "https://raw.githubusercontent.com/rust-lang/crates.io-index"; const CRATES_API_BASE_URI: &str = "https://crates.io/api/v1"; @@ -69,22 +67,19 @@ pub struct QueryCrateResponse { } #[derive(Debug, Clone)] -pub struct QueryCrate(pub S); +pub struct QueryCrate { + client: reqwest::Client, +} -impl Service for QueryCrate -where - S: Service, Response = Response, Error = HyperError> + Clone, - S::Future: Send + 'static, -{ - type Response = QueryCrateResponse; - type Error = Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx).map_err(|err| err.into()) +impl QueryCrate { + pub fn new(client: reqwest::Client) -> Self { + Self { client } } - fn call(&mut self, crate_name: CrateName) -> Self::Future { + pub async fn query( + client: reqwest::Client, + crate_name: CrateName, + ) -> anyhow::Result { let lower_name = crate_name.as_ref().to_lowercase(); let path = match lower_name.len() { @@ -94,41 +89,34 @@ where _ => format!("{}/{}/{}", &lower_name[0..2], &lower_name[2..4], lower_name), }; - let uri = format!("{}/master/{}", CRATES_INDEX_BASE_URI, path); - let uri = uri.parse::().expect("TODO: MAP ERROR PROPERLY"); + let url = format!("{}/HEAD/{}", CRATES_INDEX_BASE_URI, path); + let res = client.get(&url).send().await?.error_for_status()?; - let request = Request::get(uri.clone()) - .header(USER_AGENT, "deps.rs") - .body(Body::empty()) - .unwrap(); + let string_body = res.text().await?; - Box::pin( - self.0 - .call(request) - .map_err(|err| err.into()) - .and_then(move |response| { - let status = response.status(); - if !status.is_success() { - return err(anyhow!("Status code {} for URI {}", status, uri)); - } + let pkgs = string_body + .lines() + .map(str::trim) + .filter(|s| !s.is_empty()) + .map(serde_json::from_str) + .collect::>()?; - ok(response) - }) - .and_then(|response| body::to_bytes(response.into_body()).err_into()) - .and_then(|body| ready(String::from_utf8(body.to_vec())).err_into()) - .and_then(|string_body| { - ready( - string_body - .lines() - .map(|s| s.trim()) - .filter(|s| !s.is_empty()) - .map(serde_json::from_str) - .collect::>(), - ) - .err_into() - }) - .and_then(move |pkgs| ready(convert_pkgs(&crate_name, pkgs))), - ) + convert_pkgs(&crate_name, pkgs) + } +} + +impl Service for QueryCrate { + type Response = QueryCrateResponse; + type Error = Error; + type Future = BoxFuture>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, crate_name: CrateName) -> Self::Future { + let client = self.client.clone(); + Self::query(client, crate_name).boxed() } } @@ -157,49 +145,36 @@ fn convert_summary(response: SummaryResponse) -> Result, Error> { .collect() } -#[derive(Debug, Clone)] -pub struct GetPopularCrates(pub S); +#[derive(Debug, Clone, Default)] +pub struct GetPopularCrates { + client: reqwest::Client, +} -impl Service<()> for GetPopularCrates -where - S: Service, Response = Response, Error = HyperError> + Clone, - S::Future: Send + 'static, -{ +impl GetPopularCrates { + pub fn new(client: reqwest::Client) -> Self { + Self { client } + } + + pub async fn query(client: reqwest::Client) -> anyhow::Result> { + let url = format!("{}/summary", CRATES_API_BASE_URI); + let res = client.get(&url).send().await?.error_for_status()?; + + let summary: SummaryResponse = res.json().await?; + convert_summary(summary) + } +} + +impl Service<()> for GetPopularCrates { type Response = Vec; type Error = Error; - type Future = BoxFuture<'static, Result>; + type Future = BoxFuture>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx).map_err(|err| err.into()) + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _req: ()) -> Self::Future { - let mut service = self.0.clone(); - - let uri = format!("{}/summary", CRATES_API_BASE_URI); - let uri = uri.parse::().unwrap(); - let request = Request::get(uri.clone()) - .header(USER_AGENT, "deps.rs") - .body(Body::empty()) - .unwrap(); - - Box::pin( - service - .call(request) - .map_err(|err| err.into()) - .and_then(move |response| { - let status = response.status(); - if !status.is_success() { - err(anyhow!("Status code {} for URI {}", status, uri)) - } else { - ok(response) - } - }) - .and_then(|response| body::to_bytes(response.into_body()).err_into()) - .and_then(|bytes| { - ready(serde_json::from_slice::(&bytes)).err_into() - }) - .and_then(|summary| ready(convert_summary(summary)).err_into()), - ) + let client = self.client.clone(); + Self::query(client).boxed() } } diff --git a/src/interactors/github.rs b/src/interactors/github.rs index a15bb4b..a81c068 100644 --- a/src/interactors/github.rs +++ b/src/interactors/github.rs @@ -1,16 +1,15 @@ -use std::{task::Context, task::Poll}; +use std::task::{Context, Poll}; -use anyhow::{anyhow, Error}; -use futures::{ - future::{err, ok, ready, BoxFuture}, - TryFutureExt, -}; -use hyper::{ - body, header::USER_AGENT, service::Service, Body, Error as HyperError, Request, Response, Uri, -}; +use anyhow::Error; + +use futures::FutureExt as _; +use hyper::service::Service; use serde::Deserialize; -use crate::models::repo::{RepoPath, Repository}; +use crate::{ + models::repo::{RepoPath, Repository}, + BoxFuture, +}; const GITHUB_API_BASE_URI: &str = "https://api.github.com"; @@ -32,65 +31,50 @@ struct GithubOwner { } #[derive(Debug, Clone)] -pub struct GetPopularRepos(pub S); +pub struct GetPopularRepos { + client: reqwest::Client, +} -impl Service<()> for GetPopularRepos -where - S: Service, Response = Response, Error = HyperError> + Clone, - S::Future: Send + 'static, -{ +impl GetPopularRepos { + pub fn new(client: reqwest::Client) -> Self { + Self { client } + } + + pub async fn query(client: reqwest::Client) -> anyhow::Result> { + let url = format!( + "{}/search/repositories?q=language:rust&sort=stars", + GITHUB_API_BASE_URI + ); + + let res = client.get(&url).send().await?.error_for_status()?; + let summary: GithubSearchResponse = res.json().await?; + + summary + .items + .into_iter() + .map(|item| { + let path = RepoPath::from_parts("github", &item.owner.login, &item.name)?; + + Ok(Repository { + path, + description: item.description, + }) + }) + .collect::, Error>>() + } +} + +impl Service<()> for GetPopularRepos { type Response = Vec; type Error = Error; - type Future = BoxFuture<'static, Result>; + type Future = BoxFuture>; - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx).map_err(|err| err.into()) + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) } fn call(&mut self, _req: ()) -> Self::Future { - let uri = format!( - "{}/search/repositories?q=language:rust&sort=stars", - GITHUB_API_BASE_URI - ) - .parse::() - .expect("TODO: handle error properly"); - - let request = Request::get(uri) - .header(USER_AGENT, "deps.rs") - .body(Body::empty()) - .unwrap(); - - Box::pin( - self.0 - .call(request) - .err_into() - .and_then(|response| { - let status = response.status(); - if !status.is_success() { - return err(anyhow!("Status code {} for popular repo search", status)); - } - - ok(response) - }) - .and_then(|response| body::to_bytes(response.into_body()).err_into()) - .and_then(|bytes| ready(serde_json::from_slice(bytes.as_ref())).err_into()) - .and_then(|search_response: GithubSearchResponse| { - ready( - search_response - .items - .into_iter() - .map(|item| { - let path = - RepoPath::from_parts("github", &item.owner.login, &item.name)?; - - Ok(Repository { - path, - description: item.description, - }) - }) - .collect::, Error>>(), - ) - }), - ) + let client = self.client.clone(); + Self::query(client).boxed() } } diff --git a/src/interactors/mod.rs b/src/interactors/mod.rs index 9007c0b..df3085d 100644 --- a/src/interactors/mod.rs +++ b/src/interactors/mod.rs @@ -1,66 +1,53 @@ -use std::{task::Context, task::Poll}; +use std::task::{Context, Poll}; use anyhow::{anyhow, Error}; -use futures::{ - future::{err, ok, ready, BoxFuture}, - TryFutureExt, -}; -use hyper::{ - body, header::USER_AGENT, service::Service, Body, Error as HyperError, Request, Response, -}; +use futures::FutureExt as _; +use hyper::service::Service; use relative_path::RelativePathBuf; -use crate::models::repo::RepoPath; +use crate::{models::repo::RepoPath, BoxFuture}; pub mod crates; pub mod github; pub mod rustsec; #[derive(Debug, Clone)] -pub struct RetrieveFileAtPath(pub S); +pub struct RetrieveFileAtPath { + client: reqwest::Client, +} -impl Service<(RepoPath, RelativePathBuf)> for RetrieveFileAtPath -where - S: Service, Response = Response, Error = HyperError> + Clone, - S::Future: Send + 'static, -{ - type Response = String; - type Error = Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_ready(cx).map_err(|err| err.into()) +impl RetrieveFileAtPath { + pub fn new(client: reqwest::Client) -> Self { + Self { client } } - fn call(&mut self, req: (RepoPath, RelativePathBuf)) -> Self::Future { - let (repo_path, path) = req; + pub async fn query( + client: reqwest::Client, + repo_path: RepoPath, + path: RelativePathBuf, + ) -> anyhow::Result { + let url = repo_path.to_usercontent_file_url(&path); + let res = client.get(&url).send().await?; - let uri = repo_path.to_usercontent_file_uri(&path); - let uri = match uri { - Ok(uri) => uri, - Err(error) => return Box::pin(err(error)), - }; + if !res.status().is_success() { + return Err(anyhow!("Status code {} for URI {}", res.status(), url)); + } - let request = Request::get(uri.clone()) - .header(USER_AGENT, "deps.rs") - .body(Body::empty()) - .unwrap(); - - Box::pin( - self.0 - .call(request) - .err_into() - .and_then(move |response| { - let status = response.status(); - - if status.is_success() { - ok(response) - } else { - err(anyhow!("Status code {} for URI {}", status, uri)) - } - }) - .and_then(|response| body::to_bytes(response.into_body()).err_into()) - .and_then(|bytes| ready(String::from_utf8(bytes.to_vec())).err_into()), - ) + Ok(res.text().await?) + } +} + +impl Service<(RepoPath, RelativePathBuf)> for RetrieveFileAtPath { + type Response = String; + type Error = Error; + type Future = BoxFuture>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, (repo_path, path): (RepoPath, RelativePathBuf)) -> Self::Future { + let client = self.client.clone(); + Self::query(client, repo_path, path).boxed() } } diff --git a/src/interactors/rustsec.rs b/src/interactors/rustsec.rs index c74f16e..c670557 100644 --- a/src/interactors/rustsec.rs +++ b/src/interactors/rustsec.rs @@ -1,76 +1,39 @@ use std::{sync::Arc, task::Context, task::Poll}; use anyhow::Error; -use futures::{future::ready, future::BoxFuture}; -use hyper::{service::Service, Body, Error as HyperError, Request, Response}; +use futures::FutureExt as _; +use hyper::service::Service; use rustsec::database::Database; +use crate::BoxFuture; + #[derive(Debug, Clone)] -pub struct FetchAdvisoryDatabase(pub S); +pub struct FetchAdvisoryDatabase { + client: reqwest::Client, +} -impl Service<()> for FetchAdvisoryDatabase -where - S: Service, Response = Response, Error = HyperError> + Clone, - S::Future: 'static, -{ - type Response = Arc; - type Error = Error; - type Future = BoxFuture<'static, Result>; - - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - // TODO: should be this when async client is used again - // self.0.poll_ready(cx).map_err(|err| err.into()) - Poll::Ready(Ok(())) +impl FetchAdvisoryDatabase { + pub fn new(client: reqwest::Client) -> Self { + Self { client } } - // TODO: make fetch async again - fn call(&mut self, _req: ()) -> Self::Future { - let _service = self.0.clone(); - - Box::pin(ready( - rustsec::Database::fetch().map(Arc::new).map_err(Into::into), - )) + pub async fn fetch(_client: reqwest::Client) -> anyhow::Result> { + // TODO: make fetch async + Ok(rustsec::Database::fetch().map(Arc::new)?) } } -// #[derive(Debug, Clone)] -// pub struct FetchAdvisoryDatabase(pub S); +impl Service<()> for FetchAdvisoryDatabase { + type Response = Arc; + type Error = Error; + type Future = BoxFuture>; -// impl Service for FetchAdvisoryDatabase -// where -// S: Service + Clone, -// S::Future: 'static, -// { -// type Request = (); -// type Response = Arc; -// type Error = Error; -// type Future = Box>; + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } -// fn call(&self, _req: ()) -> Self::Future { -// let service = self.0.clone(); - -// let uri_future = DEFAULT_URL.parse().into_future().from_err(); - -// Box::new(uri_future.and_then(move |uri| { -// let request = Request::new(Method::Get, uri); - -// service.call(request).from_err().and_then(|response| { -// let status = response.status(); -// if !status.is_success() { -// future::Either::A(future::err(anyhow!( -// "Status code {} when fetching advisory db", -// status -// ))) -// } else { -// let body_future = response.body().concat2().from_err(); -// let decode_future = body_future.and_then(|body| { -// Ok(Arc::new(Database::from_toml(str::from_utf8( -// &body, -// )?)?)) -// }); -// future::Either::B(decode_future) -// } -// }) -// })) -// } -// } + fn call(&mut self, _req: ()) -> Self::Future { + let client = self.client.clone(); + Self::fetch(client).boxed() + } +} diff --git a/src/main.rs b/src/main.rs index 53c2c60..b207d99 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,17 +3,21 @@ use std::{ env, + future::Future, net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket}, + pin::Pin, sync::Mutex, + time::Duration, }; use cadence::{QueuingMetricSink, UdpMetricSink}; use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn}, - Client, Server, + Server, }; -use hyper_tls::HttpsConnector; + +use reqwest::redirect::Policy as RedirectPolicy; use slog::{o, Drain}; mod engine; @@ -26,6 +30,11 @@ mod utils; use self::engine::Engine; use self::server::App; +/// Future crate's BoxFuture without the explicit lifetime parameter. +pub type BoxFuture = Pin + Send>>; + +const DEPS_RS_UA: &str = "deps.rs"; + fn init_metrics() -> QueuingMetricSink { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); socket.set_nonblocking(true).unwrap(); @@ -42,7 +51,13 @@ async fn main() { ); let metrics = init_metrics(); - let client = Client::builder().build(HttpsConnector::new()); + + let client = reqwest::Client::builder() + .user_agent(DEPS_RS_UA) + .redirect(RedirectPolicy::limited(5)) + .timeout(Duration::from_secs(5)) + .build() + .unwrap(); let port = env::var("PORT") .unwrap_or_else(|_| "8080".to_string()) diff --git a/src/models/repo.rs b/src/models/repo.rs index c3d5d9a..f97c7e2 100644 --- a/src/models/repo.rs +++ b/src/models/repo.rs @@ -1,7 +1,6 @@ -use std::str::FromStr; +use std::{fmt, str::FromStr}; use anyhow::{anyhow, ensure, Error}; -use hyper::Uri; use relative_path::RelativePath; #[derive(Clone, Debug)] @@ -26,17 +25,27 @@ impl RepoPath { }) } - pub fn to_usercontent_file_uri(&self, path: &RelativePath) -> Result { - let url = format!( + pub fn to_usercontent_file_url(&self, path: &RelativePath) -> String { + format!( "{}/{}/{}/{}/{}", self.site.to_usercontent_base_uri(), self.qual.as_ref(), self.name.as_ref(), self.site.to_usercontent_repo_suffix(), path.normalize() - ); + ) + } +} - Ok(url.parse::()?) +impl fmt::Display for RepoPath { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "{} => {}/{}", + self.site.as_ref(), + self.qual.as_ref(), + self.name.as_ref() + ) } } @@ -156,9 +165,7 @@ mod tests { for (input, expected) in &paths { let repo = RepoPath::from_parts("github", "deps-rs", "deps.rs").unwrap(); - let out = repo - .to_usercontent_file_uri(RelativePath::new(input)) - .unwrap(); + let out = repo.to_usercontent_file_url(RelativePath::new(input)); let exp = format!( "https://raw.githubusercontent.com/deps-rs/deps.rs/HEAD/{}", @@ -169,9 +176,7 @@ mod tests { for (input, expected) in &paths { let repo = RepoPath::from_parts("gitlab", "deps-rs", "deps.rs").unwrap(); - let out = repo - .to_usercontent_file_uri(RelativePath::new(input)) - .unwrap(); + let out = repo.to_usercontent_file_url(RelativePath::new(input)); let exp = format!("https://gitlab.com/deps-rs/deps.rs/raw/HEAD/{}", expected); assert_eq!(out.to_string(), exp); @@ -179,9 +184,7 @@ mod tests { for (input, expected) in &paths { let repo = RepoPath::from_parts("bitbucket", "deps-rs", "deps.rs").unwrap(); - let out = repo - .to_usercontent_file_uri(RelativePath::new(input)) - .unwrap(); + let out = repo.to_usercontent_file_url(RelativePath::new(input)); let exp = format!( "https://bitbucket.org/deps-rs/deps.rs/raw/HEAD/{}", diff --git a/src/utils/cache.rs b/src/utils/cache.rs index 87d0b07..ac801fd 100644 --- a/src/utils/cache.rs +++ b/src/utils/cache.rs @@ -1,15 +1,19 @@ use std::{ fmt::{Debug, Formatter, Result as FmtResult}, hash::Hash, - sync::Mutex, - task::Context, - task::Poll, time::{Duration, Instant}, }; -use anyhow::Error; +use derive_more::{Display, Error, From}; use hyper::service::Service; use lru_cache::LruCache; +use slog::{debug, Logger}; +use tokio::sync::Mutex; + +#[derive(Debug, Clone, Display, From, Error)] +pub struct CacheError { + inner: E, +} pub struct Cache where @@ -18,8 +22,8 @@ where { inner: S, duration: Duration, - #[allow(unused)] cache: Mutex>, + logger: Logger, } impl Debug for Cache @@ -38,73 +42,41 @@ where impl Cache where S: Service, - Req: Hash + Eq, + S::Response: Clone, + Req: Clone + Eq + Hash, { - pub fn new(service: S, duration: Duration, capacity: usize) -> Cache { + pub fn new(service: S, duration: Duration, capacity: usize, logger: Logger) -> Cache { Cache { inner: service, duration, cache: Mutex::new(LruCache::new(capacity)), + logger, } } -} -impl Service for Cache -where - S: Service, - S::Response: Clone, - Req: Clone + Hash + Eq, -{ - type Response = S::Response; - type Error = Error; - // WAS: type Future = Cached; - // type Future = Pin>>>; - type Future = S::Future; + pub async fn cached_query(&mut self, req: Req) -> Result { + let now = Instant::now(); - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } + { + let mut cache = self.cache.lock().await; - fn call(&mut self, req: Req) -> Self::Future { - // TODO: re-add caching - // Box::pin({ - // let now = Instant::now(); - // let mut cache = self.cache.lock().expect("lock poisoned"); + if let Some((ref valid_until, ref cached_response)) = cache.get_mut(&req) { + if *valid_until > now { + debug!(self.logger, "cache hit"); + return Ok(cached_response.clone()); + } + } + } - // if let Some(&mut (valid_until, ref cached_response)) = cache.get_mut(&req) { - // if valid_until > now { - // return Box::pin(ok(cached_response.clone())); - // } - // } + debug!(self.logger, "cache miss"); - self.inner.call(req) - // .and_then(|response| { - // // cache.insert(req, (now + self.duration, response.clone())); - // ok(response) - // }) - // }) + let fresh = self.inner.call(req.clone()).await?; + + { + let mut cache = self.cache.lock().await; + cache.insert(req, (now + self.duration, fresh.clone())); + } + + Ok(fresh) } } - -// pub struct Cached(Shared); - -// impl Debug for Cached -// where -// F: Future + Debug, -// F::Output: Debug, -// { -// fn fmt(&self, fmt: &mut Formatter<'_>) -> FmtResult { -// self.0.fmt(fmt) -// } -// } - -// // WAS: impl> Future for Cached { -// impl Future for Cached { -// type Output = Result; - -// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { -// self.0 -// .poll() -// .map_err(|_err| anyhow!("TODO: shared error not clone-able")) -// } -// }