reqwest client + caching (#58)

This commit is contained in:
Rob Ede 2020-10-03 13:08:16 +01:00 committed by GitHub
parent b3fcdabeba
commit 1b66eddb06
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 557 additions and 496 deletions

204
Cargo.lock generated
View file

@ -66,6 +66,12 @@ dependencies = [
"byte-tools",
]
[[package]]
name = "bumpalo"
version = "3.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820"
[[package]]
name = "byte-tools"
version = "0.3.1"
@ -216,6 +222,21 @@ dependencies = [
"generic-array",
]
[[package]]
name = "dtoa"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b"
[[package]]
name = "encoding_rs"
version = "0.8.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a51b8cf747471cb9499b6d59e59b0444f4c90eba8968c4e44874e92b5b64ace2"
dependencies = [
"cfg-if",
]
[[package]]
name = "fake-simd"
version = "0.1.2"
@ -548,6 +569,12 @@ dependencies = [
"libc",
]
[[package]]
name = "ipnet"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135"
[[package]]
name = "itoa"
version = "0.4.6"
@ -563,6 +590,15 @@ dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
@ -700,6 +736,22 @@ version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "mime_guess"
version = "2.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "mio"
version = "0.6.22"
@ -1073,6 +1125,42 @@ dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "reqwest"
version = "0.10.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e"
dependencies = [
"base64",
"bytes",
"encoding_rs",
"futures-core",
"futures-util",
"http",
"http-body",
"hyper",
"hyper-tls",
"ipnet",
"js-sys",
"lazy_static",
"log",
"mime",
"mime_guess",
"native-tls",
"percent-encoding",
"pin-project-lite",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-tls",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg",
]
[[package]]
name = "route-recognizer"
version = "0.2.0"
@ -1243,6 +1331,18 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_urlencoded"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97"
dependencies = [
"dtoa",
"itoa",
"serde",
"url",
]
[[package]]
name = "sha-1"
version = "0.8.2"
@ -1265,13 +1365,13 @@ dependencies = [
"derive_more",
"futures",
"hyper",
"hyper-tls",
"indexmap",
"lru-cache",
"maud",
"once_cell",
"pin-project",
"relative-path",
"reqwest",
"route-recognizer",
"rustsec",
"sass-rs",
@ -1520,6 +1620,15 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56dee185309b50d1f11bfedef0fe6d036842e3fb77413abef29f8f8d1c5d4c1c"
[[package]]
name = "unicase"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.4"
@ -1561,6 +1670,12 @@ version = "0.2.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c"
[[package]]
name = "version_check"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed"
[[package]]
name = "want"
version = "0.3.0"
@ -1583,6 +1698,84 @@ version = "0.10.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
[[package]]
name = "wasm-bindgen"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42"
dependencies = [
"cfg-if",
"serde",
"serde_json",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68"
dependencies = [
"bumpalo",
"lazy_static",
"log",
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da"
dependencies = [
"cfg-if",
"js-sys",
"wasm-bindgen",
"web-sys",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe"
dependencies = [
"proc-macro2",
"quote",
"syn",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307"
[[package]]
name = "web-sys"
version = "0.3.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d"
dependencies = [
"js-sys",
"wasm-bindgen",
]
[[package]]
name = "winapi"
version = "0.2.8"
@ -1617,6 +1810,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "winreg"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69"
dependencies = [
"winapi 0.3.9",
]
[[package]]
name = "ws2_32-sys"
version = "0.2.1"

View file

@ -18,7 +18,6 @@ cadence = "0.21"
derive_more = "0.99"
futures = "0.3"
hyper = "0.13"
hyper-tls = "0.4"
indexmap = { version = "1", features = ["serde-1"] }
lru-cache = "0.1" # TODO: replace unmaintained crate
maud = "0.22"
@ -27,6 +26,7 @@ pin-project = "0.4"
relative-path = { version = "1.3", features = ["serde"] }
route-recognizer = "0.2"
rustsec = "0.21"
reqwest = { version = "0.10", features = ["json"] }
semver = { version = "0.11", features = ["serde"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

View file

@ -9,7 +9,6 @@ pub async fn analyze_dependencies(
deps: CrateDeps,
) -> Result<AnalyzedDependencies, Error> {
let advisory_db = engine.fetch_advisory_db().await?;
let mut analyzer = DependencyAnalyzer::new(&deps, Some(advisory_db));
let main_deps =

View file

@ -1,75 +1,56 @@
use std::{future::Future, mem, pin::Pin, task::Context, task::Poll};
use anyhow::Error;
use futures::{future::BoxFuture, ready, Stream};
use futures::{stream::FuturesOrdered, FutureExt};
use futures::{future::BoxFuture, stream::FuturesOrdered, FutureExt as _, StreamExt as _};
use relative_path::RelativePathBuf;
use crate::models::repo::RepoPath;
use super::super::machines::crawler::ManifestCrawler;
pub use super::super::machines::crawler::ManifestCrawlerOutput;
use super::super::Engine;
use crate::engine::{
machines::crawler::{ManifestCrawler, ManifestCrawlerOutput},
Engine,
};
#[pin_project::pin_project]
pub struct CrawlManifestFuture {
repo_path: RepoPath,
pub async fn crawl_manifest(
engine: Engine,
crawler: ManifestCrawler,
#[pin]
futures: FuturesOrdered<BoxFuture<'static, Result<(RelativePathBuf, String), Error>>>,
}
repo_path: RepoPath,
entry_point: RelativePathBuf,
) -> anyhow::Result<ManifestCrawlerOutput> {
let mut crawler = ManifestCrawler::new();
let mut futures: FuturesOrdered<BoxFuture<'static, Result<(RelativePathBuf, String), Error>>> =
FuturesOrdered::new();
let engine2 = engine.clone();
let repo_path2 = repo_path.clone();
let fut = async move {
let contents = engine2
.retrieve_manifest_at_path(&repo_path2, &entry_point)
.await?;
Ok((entry_point, contents))
}
.boxed();
futures.push(fut);
while let Some(item) = futures.next().await {
let (path, raw_manifest) = item?;
let output = crawler.step(path, raw_manifest)?;
impl CrawlManifestFuture {
pub fn new(engine: &Engine, repo_path: RepoPath, entry_point: RelativePathBuf) -> Self {
let engine = engine.clone();
let crawler = ManifestCrawler::new();
let mut futures = FuturesOrdered::new();
let repo_path = repo_path.clone();
let future: Pin<Box<dyn Future<Output = _> + Send>> = Box::pin(
engine
.retrieve_manifest_at_path(&repo_path, &entry_point)
.map(move |contents| contents.map(|c| (entry_point, c))),
);
futures.push(future);
for path in output.paths_of_interest {
let engine = engine.clone();
let repo_path = repo_path.clone();
CrawlManifestFuture {
repo_path,
engine,
crawler,
futures,
}
}
}
impl Future for CrawlManifestFuture {
type Output = Result<ManifestCrawlerOutput, Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_mut().project();
match ready!(this.futures.poll_next(cx)) {
None => {
let crawler = mem::replace(&mut self.crawler, ManifestCrawler::new());
Poll::Ready(Ok(crawler.finalize()))
}
Some(Ok((path, raw_manifest))) => {
let output = self.crawler.step(path, raw_manifest)?;
for path in output.paths_of_interest.into_iter() {
let future: Pin<Box<dyn Future<Output = _> + Send>> = Box::pin(
self.engine
.retrieve_manifest_at_path(&self.repo_path, &path)
.map(move |contents| contents.map(|c| (path, c))),
);
self.futures.push(future);
}
self.poll(cx)
}
Some(Err(err)) => Poll::Ready(Err(err)),
let fut = async move {
let contents = engine.retrieve_manifest_at_path(&repo_path, &path).await?;
Ok((path, contents))
}
.boxed();
futures.push(fut);
}
}
Ok(crawler.finalize())
}

View file

@ -2,4 +2,4 @@ mod analyze;
mod crawl;
pub use self::analyze::analyze_dependencies;
pub use self::crawl::CrawlManifestFuture;
pub use self::crawl::crawl_manifest;

View file

@ -1,25 +1,21 @@
use std::{
collections::HashSet,
panic::RefUnwindSafe,
sync::{Arc, Mutex},
task::{Context, Poll},
sync::Arc,
time::{Duration, Instant},
};
use anyhow::{anyhow, Error};
use cadence::{MetricSink, NopMetricSink, StatsdClient};
use futures::{future::try_join_all, stream::FuturesUnordered, Future, FutureExt, Stream};
use hyper::{
client::{HttpConnector, ResponseFuture},
service::Service,
Body, Client, Request, Response,
};
use hyper_tls::HttpsConnector;
use futures::{future::try_join_all, stream, StreamExt};
use hyper::service::Service;
use once_cell::sync::Lazy;
use relative_path::{RelativePath, RelativePathBuf};
use rustsec::database::Database;
use semver::VersionReq;
use slog::Logger;
use stream::BoxStream;
use tokio::sync::Mutex;
use crate::interactors::crates::{GetPopularCrates, QueryCrate};
use crate::interactors::github::GetPopularRepos;
@ -33,77 +29,57 @@ mod fut;
mod machines;
use self::fut::analyze_dependencies;
use self::fut::CrawlManifestFuture;
type HttpClient = Client<HttpsConnector<HttpConnector>>;
// type HttpClient = Client<HttpConnector>;
// workaround for hyper 0.12 not implementing Service for Client
#[derive(Debug, Clone)]
struct ServiceHttpClient(HttpClient);
impl Service<Request<Body>> for ServiceHttpClient {
type Response = Response<Body>;
type Error = hyper::Error;
type Future = ResponseFuture;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx)
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
self.0.request(req)
}
}
use self::fut::crawl_manifest;
#[derive(Clone, Debug)]
pub struct Engine {
client: HttpClient,
client: reqwest::Client,
logger: Logger,
metrics: StatsdClient,
// TODO: use futures aware mutex
query_crate: Arc<Mutex<Cache<QueryCrate<ServiceHttpClient>, CrateName>>>,
get_popular_crates: Arc<Mutex<Cache<GetPopularCrates<ServiceHttpClient>, ()>>>,
get_popular_repos: Arc<Mutex<Cache<GetPopularRepos<ServiceHttpClient>, ()>>>,
retrieve_file_at_path: Arc<Mutex<RetrieveFileAtPath<ServiceHttpClient>>>,
fetch_advisory_db: Arc<Mutex<Cache<FetchAdvisoryDatabase<ServiceHttpClient>, ()>>>,
query_crate: Arc<Mutex<Cache<QueryCrate, CrateName>>>,
get_popular_crates: Arc<Mutex<Cache<GetPopularCrates, ()>>>,
get_popular_repos: Arc<Mutex<Cache<GetPopularRepos, ()>>>,
retrieve_file_at_path: Arc<Mutex<RetrieveFileAtPath>>,
fetch_advisory_db: Arc<Mutex<Cache<FetchAdvisoryDatabase, ()>>>,
}
impl Engine {
pub fn new(client: HttpClient, logger: Logger) -> Engine {
pub fn new(client: reqwest::Client, logger: Logger) -> Engine {
let metrics = StatsdClient::from_sink("engine", NopMetricSink);
let service_client = ServiceHttpClient(client.clone());
let query_crate = Cache::new(
QueryCrate(service_client.clone()),
QueryCrate::new(client.clone()),
Duration::from_secs(300),
500,
logger.clone(),
);
let get_popular_crates = Cache::new(
GetPopularCrates(service_client.clone()),
Duration::from_secs(10),
GetPopularCrates::new(client.clone()),
Duration::from_secs(120),
1,
logger.clone(),
);
let get_popular_repos = Cache::new(
GetPopularRepos(service_client.clone()),
Duration::from_secs(10),
GetPopularRepos::new(client.clone()),
Duration::from_secs(120),
1,
logger.clone(),
);
let fetch_advisory_db = Cache::new(
FetchAdvisoryDatabase(service_client.clone()),
Duration::from_secs(300),
FetchAdvisoryDatabase::new(client.clone()),
Duration::from_secs(1800),
1,
logger.clone(),
);
Engine {
client,
client: client.clone(),
logger,
metrics,
query_crate: Arc::new(Mutex::new(query_crate)),
get_popular_crates: Arc::new(Mutex::new(get_popular_crates)),
get_popular_repos: Arc::new(Mutex::new(get_popular_repos)),
retrieve_file_at_path: Arc::new(Mutex::new(RetrieveFileAtPath(service_client))),
retrieve_file_at_path: Arc::new(Mutex::new(RetrieveFileAtPath::new(client))),
fetch_advisory_db: Arc::new(Mutex::new(fetch_advisory_db)),
}
}
@ -141,8 +117,10 @@ impl AnalyzeDependenciesOutcome {
impl Engine {
pub async fn get_popular_repos(&self) -> Result<Vec<Repository>, Error> {
let repos = self.get_popular_repos.lock().unwrap().call(());
let repos = repos.await?;
let repos = {
let mut lock = self.get_popular_repos.lock().await;
lock.cached_query(()).await?
};
let filtered_repos = repos
.iter()
@ -154,8 +132,8 @@ impl Engine {
}
pub async fn get_popular_crates(&self) -> Result<Vec<CratePath>, Error> {
let crates = self.get_popular_crates.lock().unwrap().call(());
let crates = crates.await?;
let mut lock = self.get_popular_crates.lock().await;
let crates = lock.cached_query(()).await?;
Ok(crates)
}
@ -168,8 +146,7 @@ impl Engine {
let entry_point = RelativePath::new("/").to_relative_path_buf();
let engine = self.clone();
let manifest_future = CrawlManifestFuture::new(self, repo_path.clone(), entry_point);
let manifest_output = manifest_future.await?;
let manifest_output = crawl_manifest(self.clone(), repo_path.clone(), entry_point).await?;
let engine_for_analyze = engine.clone();
let futures = manifest_output
@ -201,12 +178,10 @@ impl Engine {
) -> Result<AnalyzeDependenciesOutcome, Error> {
let start = Instant::now();
let query_response = self
.query_crate
.lock()
.unwrap()
.call(crate_path.name.clone());
let query_response = query_response.await?;
let query_response = {
let mut lock = self.query_crate.lock().await;
lock.cached_query(crate_path.name.clone()).await?
};
let engine = self.clone();
@ -237,8 +212,10 @@ impl Engine {
name: CrateName,
req: VersionReq,
) -> Result<Option<CrateRelease>, Error> {
let query_response = self.query_crate.lock().unwrap().call(name);
let query_response = query_response.await?;
let query_response = {
let mut lock = self.query_crate.lock().await;
lock.cached_query(name).await?
};
let latest = query_response
.releases
@ -250,43 +227,46 @@ impl Engine {
Ok(latest)
}
fn fetch_releases<I: IntoIterator<Item = CrateName>>(
&self,
names: I,
) -> impl Stream<Item = Result<Vec<CrateRelease>, Error>> {
fn fetch_releases<'a, I>(&'a self, names: I) -> BoxStream<'a, anyhow::Result<Vec<CrateRelease>>>
where
I: IntoIterator<Item = CrateName>,
<I as IntoIterator>::IntoIter: Send + 'a,
{
let engine = self.clone();
names
.into_iter()
.map(|name| {
engine
.query_crate
.lock()
.unwrap()
.call(name)
.map(|resp| resp.map(|r| r.releases))
})
.collect::<FuturesUnordered<_>>()
let s = stream::iter(names)
.zip(stream::repeat(engine))
.map(resolve_crate_with_engine)
.buffer_unordered(25);
Box::pin(s)
}
fn retrieve_manifest_at_path(
async fn retrieve_manifest_at_path(
&self,
repo_path: &RepoPath,
path: &RelativePathBuf,
) -> impl Future<Output = Result<String, Error>> {
) -> Result<String, Error> {
let manifest_path = path.join(RelativePath::new("Cargo.toml"));
self.retrieve_file_at_path
.lock()
.unwrap()
.call((repo_path.clone(), manifest_path))
let mut lock = self.retrieve_file_at_path.lock().await;
Ok(lock.call((repo_path.clone(), manifest_path)).await?)
}
fn fetch_advisory_db(&self) -> impl Future<Output = Result<Arc<Database>, Error>> {
self.fetch_advisory_db.lock().unwrap().call(())
async fn fetch_advisory_db(&self) -> Result<Arc<Database>, Error> {
let mut lock = self.fetch_advisory_db.lock().await;
Ok(lock.cached_query(()).await?)
}
}
async fn resolve_crate_with_engine(
(crate_name, engine): (CrateName, Engine),
) -> anyhow::Result<Vec<CrateRelease>> {
let mut lock = engine.query_crate.lock().await;
let crate_res = lock.cached_query(crate_name).await?;
Ok(crate_res.releases)
}
static POPULAR_REPO_BLOCK_LIST: Lazy<HashSet<RepoPath>> = Lazy::new(|| {
vec![
RepoPath::from_parts("github", "rust-lang", "rust"),

View file

@ -1,17 +1,15 @@
use std::{str, task::Context, task::Poll};
use anyhow::{anyhow, Error};
use futures::{
future::{err, ok, ready, BoxFuture},
TryFutureExt,
};
use hyper::{
body, header::USER_AGENT, service::Service, Body, Error as HyperError, Request, Response, Uri,
};
use anyhow::Error;
use futures::FutureExt as _;
use hyper::service::Service;
use semver::{Version, VersionReq};
use serde::Deserialize;
use crate::models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease};
use crate::{
models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease},
BoxFuture,
};
const CRATES_INDEX_BASE_URI: &str = "https://raw.githubusercontent.com/rust-lang/crates.io-index";
const CRATES_API_BASE_URI: &str = "https://crates.io/api/v1";
@ -69,22 +67,19 @@ pub struct QueryCrateResponse {
}
#[derive(Debug, Clone)]
pub struct QueryCrate<S>(pub S);
pub struct QueryCrate {
client: reqwest::Client,
}
impl<S> Service<CrateName> for QueryCrate<S>
where
S: Service<Request<Body>, Response = Response<Body>, Error = HyperError> + Clone,
S::Future: Send + 'static,
{
type Response = QueryCrateResponse;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(|err| err.into())
impl QueryCrate {
pub fn new(client: reqwest::Client) -> Self {
Self { client }
}
fn call(&mut self, crate_name: CrateName) -> Self::Future {
pub async fn query(
client: reqwest::Client,
crate_name: CrateName,
) -> anyhow::Result<QueryCrateResponse> {
let lower_name = crate_name.as_ref().to_lowercase();
let path = match lower_name.len() {
@ -94,41 +89,34 @@ where
_ => format!("{}/{}/{}", &lower_name[0..2], &lower_name[2..4], lower_name),
};
let uri = format!("{}/master/{}", CRATES_INDEX_BASE_URI, path);
let uri = uri.parse::<Uri>().expect("TODO: MAP ERROR PROPERLY");
let url = format!("{}/HEAD/{}", CRATES_INDEX_BASE_URI, path);
let res = client.get(&url).send().await?.error_for_status()?;
let request = Request::get(uri.clone())
.header(USER_AGENT, "deps.rs")
.body(Body::empty())
.unwrap();
let string_body = res.text().await?;
Box::pin(
self.0
.call(request)
.map_err(|err| err.into())
.and_then(move |response| {
let status = response.status();
if !status.is_success() {
return err(anyhow!("Status code {} for URI {}", status, uri));
}
let pkgs = string_body
.lines()
.map(str::trim)
.filter(|s| !s.is_empty())
.map(serde_json::from_str)
.collect::<Result<_, _>>()?;
ok(response)
})
.and_then(|response| body::to_bytes(response.into_body()).err_into())
.and_then(|body| ready(String::from_utf8(body.to_vec())).err_into())
.and_then(|string_body| {
ready(
string_body
.lines()
.map(|s| s.trim())
.filter(|s| !s.is_empty())
.map(serde_json::from_str)
.collect::<Result<_, _>>(),
)
.err_into()
})
.and_then(move |pkgs| ready(convert_pkgs(&crate_name, pkgs))),
)
convert_pkgs(&crate_name, pkgs)
}
}
impl Service<CrateName> for QueryCrate {
type Response = QueryCrateResponse;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, crate_name: CrateName) -> Self::Future {
let client = self.client.clone();
Self::query(client, crate_name).boxed()
}
}
@ -157,49 +145,36 @@ fn convert_summary(response: SummaryResponse) -> Result<Vec<CratePath>, Error> {
.collect()
}
#[derive(Debug, Clone)]
pub struct GetPopularCrates<S>(pub S);
#[derive(Debug, Clone, Default)]
pub struct GetPopularCrates {
client: reqwest::Client,
}
impl<S> Service<()> for GetPopularCrates<S>
where
S: Service<Request<Body>, Response = Response<Body>, Error = HyperError> + Clone,
S::Future: Send + 'static,
{
impl GetPopularCrates {
pub fn new(client: reqwest::Client) -> Self {
Self { client }
}
pub async fn query(client: reqwest::Client) -> anyhow::Result<Vec<CratePath>> {
let url = format!("{}/summary", CRATES_API_BASE_URI);
let res = client.get(&url).send().await?.error_for_status()?;
let summary: SummaryResponse = res.json().await?;
convert_summary(summary)
}
}
impl Service<()> for GetPopularCrates {
type Response = Vec<CratePath>;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(|err| err.into())
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _req: ()) -> Self::Future {
let mut service = self.0.clone();
let uri = format!("{}/summary", CRATES_API_BASE_URI);
let uri = uri.parse::<Uri>().unwrap();
let request = Request::get(uri.clone())
.header(USER_AGENT, "deps.rs")
.body(Body::empty())
.unwrap();
Box::pin(
service
.call(request)
.map_err(|err| err.into())
.and_then(move |response| {
let status = response.status();
if !status.is_success() {
err(anyhow!("Status code {} for URI {}", status, uri))
} else {
ok(response)
}
})
.and_then(|response| body::to_bytes(response.into_body()).err_into())
.and_then(|bytes| {
ready(serde_json::from_slice::<SummaryResponse>(&bytes)).err_into()
})
.and_then(|summary| ready(convert_summary(summary)).err_into()),
)
let client = self.client.clone();
Self::query(client).boxed()
}
}

View file

@ -1,16 +1,15 @@
use std::{task::Context, task::Poll};
use std::task::{Context, Poll};
use anyhow::{anyhow, Error};
use futures::{
future::{err, ok, ready, BoxFuture},
TryFutureExt,
};
use hyper::{
body, header::USER_AGENT, service::Service, Body, Error as HyperError, Request, Response, Uri,
};
use anyhow::Error;
use futures::FutureExt as _;
use hyper::service::Service;
use serde::Deserialize;
use crate::models::repo::{RepoPath, Repository};
use crate::{
models::repo::{RepoPath, Repository},
BoxFuture,
};
const GITHUB_API_BASE_URI: &str = "https://api.github.com";
@ -32,65 +31,50 @@ struct GithubOwner {
}
#[derive(Debug, Clone)]
pub struct GetPopularRepos<S>(pub S);
pub struct GetPopularRepos {
client: reqwest::Client,
}
impl<S> Service<()> for GetPopularRepos<S>
where
S: Service<Request<Body>, Response = Response<Body>, Error = HyperError> + Clone,
S::Future: Send + 'static,
{
impl GetPopularRepos {
pub fn new(client: reqwest::Client) -> Self {
Self { client }
}
pub async fn query(client: reqwest::Client) -> anyhow::Result<Vec<Repository>> {
let url = format!(
"{}/search/repositories?q=language:rust&sort=stars",
GITHUB_API_BASE_URI
);
let res = client.get(&url).send().await?.error_for_status()?;
let summary: GithubSearchResponse = res.json().await?;
summary
.items
.into_iter()
.map(|item| {
let path = RepoPath::from_parts("github", &item.owner.login, &item.name)?;
Ok(Repository {
path,
description: item.description,
})
})
.collect::<Result<Vec<_>, Error>>()
}
}
impl Service<()> for GetPopularRepos {
type Response = Vec<Repository>;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(|err| err.into())
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, _req: ()) -> Self::Future {
let uri = format!(
"{}/search/repositories?q=language:rust&sort=stars",
GITHUB_API_BASE_URI
)
.parse::<Uri>()
.expect("TODO: handle error properly");
let request = Request::get(uri)
.header(USER_AGENT, "deps.rs")
.body(Body::empty())
.unwrap();
Box::pin(
self.0
.call(request)
.err_into()
.and_then(|response| {
let status = response.status();
if !status.is_success() {
return err(anyhow!("Status code {} for popular repo search", status));
}
ok(response)
})
.and_then(|response| body::to_bytes(response.into_body()).err_into())
.and_then(|bytes| ready(serde_json::from_slice(bytes.as_ref())).err_into())
.and_then(|search_response: GithubSearchResponse| {
ready(
search_response
.items
.into_iter()
.map(|item| {
let path =
RepoPath::from_parts("github", &item.owner.login, &item.name)?;
Ok(Repository {
path,
description: item.description,
})
})
.collect::<Result<Vec<_>, Error>>(),
)
}),
)
let client = self.client.clone();
Self::query(client).boxed()
}
}

View file

@ -1,66 +1,53 @@
use std::{task::Context, task::Poll};
use std::task::{Context, Poll};
use anyhow::{anyhow, Error};
use futures::{
future::{err, ok, ready, BoxFuture},
TryFutureExt,
};
use hyper::{
body, header::USER_AGENT, service::Service, Body, Error as HyperError, Request, Response,
};
use futures::FutureExt as _;
use hyper::service::Service;
use relative_path::RelativePathBuf;
use crate::models::repo::RepoPath;
use crate::{models::repo::RepoPath, BoxFuture};
pub mod crates;
pub mod github;
pub mod rustsec;
#[derive(Debug, Clone)]
pub struct RetrieveFileAtPath<S>(pub S);
pub struct RetrieveFileAtPath {
client: reqwest::Client,
}
impl<S> Service<(RepoPath, RelativePathBuf)> for RetrieveFileAtPath<S>
where
S: Service<Request<Body>, Response = Response<Body>, Error = HyperError> + Clone,
S::Future: Send + 'static,
{
type Response = String;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.0.poll_ready(cx).map_err(|err| err.into())
impl RetrieveFileAtPath {
pub fn new(client: reqwest::Client) -> Self {
Self { client }
}
fn call(&mut self, req: (RepoPath, RelativePathBuf)) -> Self::Future {
let (repo_path, path) = req;
pub async fn query(
client: reqwest::Client,
repo_path: RepoPath,
path: RelativePathBuf,
) -> anyhow::Result<String> {
let url = repo_path.to_usercontent_file_url(&path);
let res = client.get(&url).send().await?;
let uri = repo_path.to_usercontent_file_uri(&path);
let uri = match uri {
Ok(uri) => uri,
Err(error) => return Box::pin(err(error)),
};
if !res.status().is_success() {
return Err(anyhow!("Status code {} for URI {}", res.status(), url));
}
let request = Request::get(uri.clone())
.header(USER_AGENT, "deps.rs")
.body(Body::empty())
.unwrap();
Box::pin(
self.0
.call(request)
.err_into()
.and_then(move |response| {
let status = response.status();
if status.is_success() {
ok(response)
} else {
err(anyhow!("Status code {} for URI {}", status, uri))
}
})
.and_then(|response| body::to_bytes(response.into_body()).err_into())
.and_then(|bytes| ready(String::from_utf8(bytes.to_vec())).err_into()),
)
Ok(res.text().await?)
}
}
impl Service<(RepoPath, RelativePathBuf)> for RetrieveFileAtPath {
type Response = String;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, (repo_path, path): (RepoPath, RelativePathBuf)) -> Self::Future {
let client = self.client.clone();
Self::query(client, repo_path, path).boxed()
}
}

View file

@ -1,76 +1,39 @@
use std::{sync::Arc, task::Context, task::Poll};
use anyhow::Error;
use futures::{future::ready, future::BoxFuture};
use hyper::{service::Service, Body, Error as HyperError, Request, Response};
use futures::FutureExt as _;
use hyper::service::Service;
use rustsec::database::Database;
use crate::BoxFuture;
#[derive(Debug, Clone)]
pub struct FetchAdvisoryDatabase<S>(pub S);
pub struct FetchAdvisoryDatabase {
client: reqwest::Client,
}
impl<S> Service<()> for FetchAdvisoryDatabase<S>
where
S: Service<Request<Body>, Response = Response<Body>, Error = HyperError> + Clone,
S::Future: 'static,
{
type Response = Arc<Database>;
type Error = Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// TODO: should be this when async client is used again
// self.0.poll_ready(cx).map_err(|err| err.into())
Poll::Ready(Ok(()))
impl FetchAdvisoryDatabase {
pub fn new(client: reqwest::Client) -> Self {
Self { client }
}
// TODO: make fetch async again
fn call(&mut self, _req: ()) -> Self::Future {
let _service = self.0.clone();
Box::pin(ready(
rustsec::Database::fetch().map(Arc::new).map_err(Into::into),
))
pub async fn fetch(_client: reqwest::Client) -> anyhow::Result<Arc<Database>> {
// TODO: make fetch async
Ok(rustsec::Database::fetch().map(Arc::new)?)
}
}
// #[derive(Debug, Clone)]
// pub struct FetchAdvisoryDatabase<S>(pub S);
impl Service<()> for FetchAdvisoryDatabase {
type Response = Arc<Database>;
type Error = Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;
// impl<S> Service for FetchAdvisoryDatabase<S>
// where
// S: Service<Request = Request, Response = Response, Error = HyperError> + Clone,
// S::Future: 'static,
// {
// type Request = ();
// type Response = Arc<Database>;
// type Error = Error;
// type Future = Box<dyn Future<Item = Self::Response, Error = Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
// fn call(&self, _req: ()) -> Self::Future {
// let service = self.0.clone();
// let uri_future = DEFAULT_URL.parse().into_future().from_err();
// Box::new(uri_future.and_then(move |uri| {
// let request = Request::new(Method::Get, uri);
// service.call(request).from_err().and_then(|response| {
// let status = response.status();
// if !status.is_success() {
// future::Either::A(future::err(anyhow!(
// "Status code {} when fetching advisory db",
// status
// )))
// } else {
// let body_future = response.body().concat2().from_err();
// let decode_future = body_future.and_then(|body| {
// Ok(Arc::new(Database::from_toml(str::from_utf8(
// &body,
// )?)?))
// });
// future::Either::B(decode_future)
// }
// })
// }))
// }
// }
fn call(&mut self, _req: ()) -> Self::Future {
let client = self.client.clone();
Self::fetch(client).boxed()
}
}

View file

@ -3,17 +3,21 @@
use std::{
env,
future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr, UdpSocket},
pin::Pin,
sync::Mutex,
time::Duration,
};
use cadence::{QueuingMetricSink, UdpMetricSink};
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Client, Server,
Server,
};
use hyper_tls::HttpsConnector;
use reqwest::redirect::Policy as RedirectPolicy;
use slog::{o, Drain};
mod engine;
@ -26,6 +30,11 @@ mod utils;
use self::engine::Engine;
use self::server::App;
/// Future crate's BoxFuture without the explicit lifetime parameter.
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
const DEPS_RS_UA: &str = "deps.rs";
fn init_metrics() -> QueuingMetricSink {
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
socket.set_nonblocking(true).unwrap();
@ -42,7 +51,13 @@ async fn main() {
);
let metrics = init_metrics();
let client = Client::builder().build(HttpsConnector::new());
let client = reqwest::Client::builder()
.user_agent(DEPS_RS_UA)
.redirect(RedirectPolicy::limited(5))
.timeout(Duration::from_secs(5))
.build()
.unwrap();
let port = env::var("PORT")
.unwrap_or_else(|_| "8080".to_string())

View file

@ -1,7 +1,6 @@
use std::str::FromStr;
use std::{fmt, str::FromStr};
use anyhow::{anyhow, ensure, Error};
use hyper::Uri;
use relative_path::RelativePath;
#[derive(Clone, Debug)]
@ -26,17 +25,27 @@ impl RepoPath {
})
}
pub fn to_usercontent_file_uri(&self, path: &RelativePath) -> Result<Uri, Error> {
let url = format!(
pub fn to_usercontent_file_url(&self, path: &RelativePath) -> String {
format!(
"{}/{}/{}/{}/{}",
self.site.to_usercontent_base_uri(),
self.qual.as_ref(),
self.name.as_ref(),
self.site.to_usercontent_repo_suffix(),
path.normalize()
);
)
}
}
Ok(url.parse::<Uri>()?)
impl fmt::Display for RepoPath {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{} => {}/{}",
self.site.as_ref(),
self.qual.as_ref(),
self.name.as_ref()
)
}
}
@ -156,9 +165,7 @@ mod tests {
for (input, expected) in &paths {
let repo = RepoPath::from_parts("github", "deps-rs", "deps.rs").unwrap();
let out = repo
.to_usercontent_file_uri(RelativePath::new(input))
.unwrap();
let out = repo.to_usercontent_file_url(RelativePath::new(input));
let exp = format!(
"https://raw.githubusercontent.com/deps-rs/deps.rs/HEAD/{}",
@ -169,9 +176,7 @@ mod tests {
for (input, expected) in &paths {
let repo = RepoPath::from_parts("gitlab", "deps-rs", "deps.rs").unwrap();
let out = repo
.to_usercontent_file_uri(RelativePath::new(input))
.unwrap();
let out = repo.to_usercontent_file_url(RelativePath::new(input));
let exp = format!("https://gitlab.com/deps-rs/deps.rs/raw/HEAD/{}", expected);
assert_eq!(out.to_string(), exp);
@ -179,9 +184,7 @@ mod tests {
for (input, expected) in &paths {
let repo = RepoPath::from_parts("bitbucket", "deps-rs", "deps.rs").unwrap();
let out = repo
.to_usercontent_file_uri(RelativePath::new(input))
.unwrap();
let out = repo.to_usercontent_file_url(RelativePath::new(input));
let exp = format!(
"https://bitbucket.org/deps-rs/deps.rs/raw/HEAD/{}",

View file

@ -1,15 +1,19 @@
use std::{
fmt::{Debug, Formatter, Result as FmtResult},
hash::Hash,
sync::Mutex,
task::Context,
task::Poll,
time::{Duration, Instant},
};
use anyhow::Error;
use derive_more::{Display, Error, From};
use hyper::service::Service;
use lru_cache::LruCache;
use slog::{debug, Logger};
use tokio::sync::Mutex;
#[derive(Debug, Clone, Display, From, Error)]
pub struct CacheError<E> {
inner: E,
}
pub struct Cache<S, Req>
where
@ -18,8 +22,8 @@ where
{
inner: S,
duration: Duration,
#[allow(unused)]
cache: Mutex<LruCache<Req, (Instant, S::Response)>>,
logger: Logger,
}
impl<S, Req> Debug for Cache<S, Req>
@ -38,73 +42,41 @@ where
impl<S, Req> Cache<S, Req>
where
S: Service<Req>,
Req: Hash + Eq,
S::Response: Clone,
Req: Clone + Eq + Hash,
{
pub fn new(service: S, duration: Duration, capacity: usize) -> Cache<S, Req> {
pub fn new(service: S, duration: Duration, capacity: usize, logger: Logger) -> Cache<S, Req> {
Cache {
inner: service,
duration,
cache: Mutex::new(LruCache::new(capacity)),
logger,
}
}
}
impl<S, Req> Service<Req> for Cache<S, Req>
where
S: Service<Req, Error = Error>,
S::Response: Clone,
Req: Clone + Hash + Eq,
{
type Response = S::Response;
type Error = Error;
// WAS: type Future = Cached<S::Future>;
// type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>>>>;
type Future = S::Future;
pub async fn cached_query(&mut self, req: Req) -> Result<S::Response, S::Error> {
let now = Instant::now();
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
{
let mut cache = self.cache.lock().await;
fn call(&mut self, req: Req) -> Self::Future {
// TODO: re-add caching
// Box::pin({
// let now = Instant::now();
// let mut cache = self.cache.lock().expect("lock poisoned");
if let Some((ref valid_until, ref cached_response)) = cache.get_mut(&req) {
if *valid_until > now {
debug!(self.logger, "cache hit");
return Ok(cached_response.clone());
}
}
}
// if let Some(&mut (valid_until, ref cached_response)) = cache.get_mut(&req) {
// if valid_until > now {
// return Box::pin(ok(cached_response.clone()));
// }
// }
debug!(self.logger, "cache miss");
self.inner.call(req)
// .and_then(|response| {
// // cache.insert(req, (now + self.duration, response.clone()));
// ok(response)
// })
// })
let fresh = self.inner.call(req.clone()).await?;
{
let mut cache = self.cache.lock().await;
cache.insert(req, (now + self.duration, fresh.clone()));
}
Ok(fresh)
}
}
// pub struct Cached<F: Future>(Shared<F>);
// impl<F> Debug for Cached<F>
// where
// F: Future + Debug,
// F::Output: Debug,
// {
// fn fmt(&self, fmt: &mut Formatter<'_>) -> FmtResult {
// self.0.fmt(fmt)
// }
// }
// // WAS: impl<F: Future<Error = Error>> Future for Cached<F> {
// impl<F: Future> Future for Cached<F> {
// type Output = Result<F::Output, Error>;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// self.0
// .poll()
// .map_err(|_err| anyhow!("TODO: shared error not clone-able"))
// }
// }