diff --git a/Cargo.lock b/Cargo.lock index fe73885..874d6b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1329,6 +1329,7 @@ dependencies = [ "anyhow", "badge", "cadence", + "crates-index", "derive_more", "futures", "hyper", diff --git a/Cargo.toml b/Cargo.toml index 96a825e..fd9fe10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ pin-project = "0.4" relative-path = { version = "1.3", features = ["serde"] } route-recognizer = "0.2" rustsec = "0.21" +crates-index = "0.15.0" semver = { version = "0.10", features = ["serde"] } reqwest = { version = "0.10", features = ["json"] } serde = { version = "1", features = ["derive"] } diff --git a/src/engine/mod.rs b/src/engine/mod.rs index c15ce36..5f5afe8 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -7,6 +7,7 @@ use std::{ use anyhow::{anyhow, Error}; use cadence::{MetricSink, NopMetricSink, StatsdClient}; +use crates_index::Index; use futures::{future::try_join_all, stream, StreamExt}; use hyper::service::Service; use once_cell::sync::Lazy; @@ -27,8 +28,7 @@ use crate::utils::cache::Cache; mod fut; mod machines; -use self::fut::analyze_dependencies; -use self::fut::crawl_manifest; +use self::fut::{analyze_dependencies, crawl_manifest}; #[derive(Clone, Debug)] pub struct Engine { @@ -43,12 +43,12 @@ pub struct Engine { } impl Engine { - pub fn new(client: reqwest::Client, logger: Logger) -> Engine { + pub fn new(client: reqwest::Client, index: Index, logger: Logger) -> Engine { let metrics = StatsdClient::from_sink("engine", NopMetricSink); let query_crate = Cache::new( - QueryCrate::new(client.clone()), - Duration::from_secs(300), + QueryCrate::new(index), + Duration::from_secs(10), 500, logger.clone(), ); diff --git a/src/interactors/crates.rs b/src/interactors/crates.rs index f0ad44b..4a1ad01 100644 --- a/src/interactors/crates.rs +++ b/src/interactors/crates.rs @@ -1,17 +1,18 @@ use std::{fmt, str, task::Context, task::Poll}; -use anyhow::Error; +use anyhow::{anyhow, Error}; +use crates_index::{Crate, DependencyKind, Index}; use futures::FutureExt as _; use hyper::service::Service; use semver::{Version, VersionReq}; use serde::Deserialize; +use tokio::task::spawn_blocking; use crate::{ models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease}, BoxFuture, }; -const CRATES_INDEX_BASE_URI: &str = "https://raw.githubusercontent.com/rust-lang/crates.io-index"; const CRATES_API_BASE_URI: &str = "https://crates.io/api/v1"; #[derive(Deserialize, Debug)] @@ -33,27 +34,30 @@ struct RegistryPackage { yanked: bool, } -fn convert_pkgs( - name: &CrateName, - packages: Vec, -) -> Result { - let releases = packages - .into_iter() +fn convert_pkgs(krate: Crate) -> Result { + let name: CrateName = krate.name().parse()?; + + let releases = krate + .versions() + .iter() .map(|package| { let mut deps = CrateDeps::default(); - for dep in package.deps { - let name = dep.package.as_deref().unwrap_or(&dep.name).parse()?; - match dep.kind.as_deref().unwrap_or("normal") { - "normal" => deps.main.insert(name, CrateDep::External(dep.req)), - "dev" => deps.dev.insert(name, CrateDep::External(dep.req)), + for dep in package.dependencies() { + let name = dep.crate_name().parse()?; + let req = VersionReq::parse(dep.requirement())?; + + match dep.kind() { + DependencyKind::Normal => deps.main.insert(name, CrateDep::External(req)), + DependencyKind::Dev => deps.main.insert(name, CrateDep::External(req)), _ => None, }; } + let version = Version::parse(package.version())?; Ok(CrateRelease { name: name.clone(), - version: package.vers, + version, deps, - yanked: package.yanked, + yanked: package.is_yanked(), }) }) .collect::>()?; @@ -68,40 +72,21 @@ pub struct QueryCrateResponse { #[derive(Clone)] pub struct QueryCrate { - client: reqwest::Client, + index: Index, } impl QueryCrate { - pub fn new(client: reqwest::Client) -> Self { - Self { client } + pub fn new(index: Index) -> Self { + Self { index } } - pub async fn query( - client: reqwest::Client, - crate_name: CrateName, - ) -> anyhow::Result { - let lower_name = crate_name.as_ref().to_lowercase(); + pub async fn query(index: Index, crate_name: CrateName) -> anyhow::Result { + let crate_name2 = crate_name.clone(); + let krate = spawn_blocking(move || index.crate_(crate_name2.as_ref())) + .await? + .ok_or_else(|| anyhow!("crate '{}' not found", crate_name.as_ref()))?; - let path = match lower_name.len() { - 1 => format!("1/{}", lower_name), - 2 => format!("2/{}", lower_name), - 3 => format!("3/{}/{}", &lower_name[..1], lower_name), - _ => format!("{}/{}/{}", &lower_name[0..2], &lower_name[2..4], lower_name), - }; - - let url = format!("{}/HEAD/{}", CRATES_INDEX_BASE_URI, path); - let res = client.get(&url).send().await?.error_for_status()?; - - let string_body = res.text().await?; - - let pkgs = string_body - .lines() - .map(str::trim) - .filter(|s| !s.is_empty()) - .map(serde_json::from_str) - .collect::>()?; - - convert_pkgs(&crate_name, pkgs) + convert_pkgs(krate) } } @@ -121,8 +106,8 @@ impl Service for QueryCrate { } fn call(&mut self, crate_name: CrateName) -> Self::Future { - let client = self.client.clone(); - Self::query(client, crate_name).boxed() + let index = self.index.clone(); + Self::query(index, crate_name).boxed() } } diff --git a/src/main.rs b/src/main.rs index 0f70596..0ba3d4e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -28,6 +28,7 @@ mod utils; use self::engine::Engine; use self::server::App; +use self::utils::index::ManagedIndex; /// Future crate's BoxFuture without the explicit lifetime parameter. pub type BoxFuture = Pin + Send>>; @@ -70,7 +71,20 @@ async fn main() { let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port); - let mut engine = Engine::new(client.clone(), logger.new(o!())); + let mut managed_index = ManagedIndex::new(Duration::from_secs(20), logger.clone()); + if let Err(e) = managed_index.initial_clone().await { + error!( + logger, + "failed running initial clone of the crates.io-index: {}", e + ); + } + + let index = managed_index.index(); + tokio::spawn(async move { + managed_index.refresh_at_interval().await; + }); + + let mut engine = Engine::new(client.clone(), index, logger.new(o!())); engine.set_metrics(metrics); let svc_logger = logger.new(o!()); diff --git a/src/utils/index.rs b/src/utils/index.rs new file mode 100644 index 0000000..b602b41 --- /dev/null +++ b/src/utils/index.rs @@ -0,0 +1,64 @@ +use std::time::Duration; + +use anyhow::{Error, Result}; +use crates_index::Index; +use slog::{error, info, Logger}; +use tokio::task::spawn_blocking; +use tokio::time::{self, Interval}; + +pub struct ManagedIndex { + index: Index, + update_interval: Interval, + logger: Logger, +} + +impl ManagedIndex { + pub fn new(update_interval: Duration, logger: Logger) -> Self { + // the index path is configurable through the `CARGO_HOME` env variable + let index = Index::new_cargo_default(); + let update_interval = time::interval(update_interval); + Self { + index, + update_interval, + logger, + } + } + + pub fn index(&self) -> Index { + self.index.clone() + } + + pub async fn initial_clone(&mut self) -> Result<()> { + let index = self.index(); + let logger = self.logger.clone(); + + spawn_blocking(move || { + if !index.exists() { + info!(logger, "Cloning crates.io-index"); + index.retrieve()?; + } + Ok::<_, Error>(()) + }) + .await??; + Ok(()) + } + + pub async fn refresh_at_interval(&mut self) { + loop { + if let Err(e) = self.refresh().await { + error!( + self.logger, + "failed refreshing the crates.io-index, the operation will be retried: {}", e + ); + } + self.update_interval.tick().await; + } + } + + async fn refresh(&self) -> Result<()> { + let index = self.index(); + + spawn_blocking(move || index.retrieve_or_update()).await??; + Ok(()) + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index a5c08fd..bdf34de 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1 +1,2 @@ pub mod cache; +pub mod index;