Clone crates.io-index instead of querying it through GitHub's API (#69)

* Clone crates.io-index instead of querying it through GitHub's API

* Implement refreshing the crates.io-index

* Run the initial index clone before starting the server

* Log cloning the crates.io-index

* Disambiguate name of ManagedIndex initial clone fn

* Log errors with cloning or refreshing the index
This commit is contained in:
Paolo Barbolini 2020-11-14 18:29:49 +01:00 committed by GitHub
parent b18c621779
commit 84a7d5154b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 117 additions and 51 deletions

1
Cargo.lock generated
View file

@ -1329,6 +1329,7 @@ dependencies = [
"anyhow", "anyhow",
"badge", "badge",
"cadence", "cadence",
"crates-index",
"derive_more", "derive_more",
"futures", "futures",
"hyper", "hyper",

View file

@ -26,6 +26,7 @@ pin-project = "0.4"
relative-path = { version = "1.3", features = ["serde"] } relative-path = { version = "1.3", features = ["serde"] }
route-recognizer = "0.2" route-recognizer = "0.2"
rustsec = "0.21" rustsec = "0.21"
crates-index = "0.15.0"
semver = { version = "0.10", features = ["serde"] } semver = { version = "0.10", features = ["serde"] }
reqwest = { version = "0.10", features = ["json"] } reqwest = { version = "0.10", features = ["json"] }
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }

View file

@ -7,6 +7,7 @@ use std::{
use anyhow::{anyhow, Error}; use anyhow::{anyhow, Error};
use cadence::{MetricSink, NopMetricSink, StatsdClient}; use cadence::{MetricSink, NopMetricSink, StatsdClient};
use crates_index::Index;
use futures::{future::try_join_all, stream, StreamExt}; use futures::{future::try_join_all, stream, StreamExt};
use hyper::service::Service; use hyper::service::Service;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@ -27,8 +28,7 @@ use crate::utils::cache::Cache;
mod fut; mod fut;
mod machines; mod machines;
use self::fut::analyze_dependencies; use self::fut::{analyze_dependencies, crawl_manifest};
use self::fut::crawl_manifest;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Engine { pub struct Engine {
@ -43,12 +43,12 @@ pub struct Engine {
} }
impl Engine { impl Engine {
pub fn new(client: reqwest::Client, logger: Logger) -> Engine { pub fn new(client: reqwest::Client, index: Index, logger: Logger) -> Engine {
let metrics = StatsdClient::from_sink("engine", NopMetricSink); let metrics = StatsdClient::from_sink("engine", NopMetricSink);
let query_crate = Cache::new( let query_crate = Cache::new(
QueryCrate::new(client.clone()), QueryCrate::new(index),
Duration::from_secs(300), Duration::from_secs(10),
500, 500,
logger.clone(), logger.clone(),
); );

View file

@ -1,17 +1,18 @@
use std::{fmt, str, task::Context, task::Poll}; use std::{fmt, str, task::Context, task::Poll};
use anyhow::Error; use anyhow::{anyhow, Error};
use crates_index::{Crate, DependencyKind, Index};
use futures::FutureExt as _; use futures::FutureExt as _;
use hyper::service::Service; use hyper::service::Service;
use semver::{Version, VersionReq}; use semver::{Version, VersionReq};
use serde::Deserialize; use serde::Deserialize;
use tokio::task::spawn_blocking;
use crate::{ use crate::{
models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease}, models::crates::{CrateDep, CrateDeps, CrateName, CratePath, CrateRelease},
BoxFuture, BoxFuture,
}; };
const CRATES_INDEX_BASE_URI: &str = "https://raw.githubusercontent.com/rust-lang/crates.io-index";
const CRATES_API_BASE_URI: &str = "https://crates.io/api/v1"; const CRATES_API_BASE_URI: &str = "https://crates.io/api/v1";
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
@ -33,27 +34,30 @@ struct RegistryPackage {
yanked: bool, yanked: bool,
} }
fn convert_pkgs( fn convert_pkgs(krate: Crate) -> Result<QueryCrateResponse, Error> {
name: &CrateName, let name: CrateName = krate.name().parse()?;
packages: Vec<RegistryPackage>,
) -> Result<QueryCrateResponse, Error> { let releases = krate
let releases = packages .versions()
.into_iter() .iter()
.map(|package| { .map(|package| {
let mut deps = CrateDeps::default(); let mut deps = CrateDeps::default();
for dep in package.deps { for dep in package.dependencies() {
let name = dep.package.as_deref().unwrap_or(&dep.name).parse()?; let name = dep.crate_name().parse()?;
match dep.kind.as_deref().unwrap_or("normal") { let req = VersionReq::parse(dep.requirement())?;
"normal" => deps.main.insert(name, CrateDep::External(dep.req)),
"dev" => deps.dev.insert(name, CrateDep::External(dep.req)), match dep.kind() {
DependencyKind::Normal => deps.main.insert(name, CrateDep::External(req)),
DependencyKind::Dev => deps.main.insert(name, CrateDep::External(req)),
_ => None, _ => None,
}; };
} }
let version = Version::parse(package.version())?;
Ok(CrateRelease { Ok(CrateRelease {
name: name.clone(), name: name.clone(),
version: package.vers, version,
deps, deps,
yanked: package.yanked, yanked: package.is_yanked(),
}) })
}) })
.collect::<Result<_, Error>>()?; .collect::<Result<_, Error>>()?;
@ -68,40 +72,21 @@ pub struct QueryCrateResponse {
#[derive(Clone)] #[derive(Clone)]
pub struct QueryCrate { pub struct QueryCrate {
client: reqwest::Client, index: Index,
} }
impl QueryCrate { impl QueryCrate {
pub fn new(client: reqwest::Client) -> Self { pub fn new(index: Index) -> Self {
Self { client } Self { index }
} }
pub async fn query( pub async fn query(index: Index, crate_name: CrateName) -> anyhow::Result<QueryCrateResponse> {
client: reqwest::Client, let crate_name2 = crate_name.clone();
crate_name: CrateName, let krate = spawn_blocking(move || index.crate_(crate_name2.as_ref()))
) -> anyhow::Result<QueryCrateResponse> { .await?
let lower_name = crate_name.as_ref().to_lowercase(); .ok_or_else(|| anyhow!("crate '{}' not found", crate_name.as_ref()))?;
let path = match lower_name.len() { convert_pkgs(krate)
1 => format!("1/{}", lower_name),
2 => format!("2/{}", lower_name),
3 => format!("3/{}/{}", &lower_name[..1], lower_name),
_ => format!("{}/{}/{}", &lower_name[0..2], &lower_name[2..4], lower_name),
};
let url = format!("{}/HEAD/{}", CRATES_INDEX_BASE_URI, path);
let res = client.get(&url).send().await?.error_for_status()?;
let string_body = res.text().await?;
let pkgs = string_body
.lines()
.map(str::trim)
.filter(|s| !s.is_empty())
.map(serde_json::from_str)
.collect::<Result<_, _>>()?;
convert_pkgs(&crate_name, pkgs)
} }
} }
@ -121,8 +106,8 @@ impl Service<CrateName> for QueryCrate {
} }
fn call(&mut self, crate_name: CrateName) -> Self::Future { fn call(&mut self, crate_name: CrateName) -> Self::Future {
let client = self.client.clone(); let index = self.index.clone();
Self::query(client, crate_name).boxed() Self::query(index, crate_name).boxed()
} }
} }

View file

@ -28,6 +28,7 @@ mod utils;
use self::engine::Engine; use self::engine::Engine;
use self::server::App; use self::server::App;
use self::utils::index::ManagedIndex;
/// Future crate's BoxFuture without the explicit lifetime parameter. /// Future crate's BoxFuture without the explicit lifetime parameter.
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>; pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
@ -70,7 +71,20 @@ async fn main() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port); let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), port);
let mut engine = Engine::new(client.clone(), logger.new(o!())); let mut managed_index = ManagedIndex::new(Duration::from_secs(20), logger.clone());
if let Err(e) = managed_index.initial_clone().await {
error!(
logger,
"failed running initial clone of the crates.io-index: {}", e
);
}
let index = managed_index.index();
tokio::spawn(async move {
managed_index.refresh_at_interval().await;
});
let mut engine = Engine::new(client.clone(), index, logger.new(o!()));
engine.set_metrics(metrics); engine.set_metrics(metrics);
let svc_logger = logger.new(o!()); let svc_logger = logger.new(o!());

64
src/utils/index.rs Normal file
View file

@ -0,0 +1,64 @@
use std::time::Duration;
use anyhow::{Error, Result};
use crates_index::Index;
use slog::{error, info, Logger};
use tokio::task::spawn_blocking;
use tokio::time::{self, Interval};
pub struct ManagedIndex {
index: Index,
update_interval: Interval,
logger: Logger,
}
impl ManagedIndex {
pub fn new(update_interval: Duration, logger: Logger) -> Self {
// the index path is configurable through the `CARGO_HOME` env variable
let index = Index::new_cargo_default();
let update_interval = time::interval(update_interval);
Self {
index,
update_interval,
logger,
}
}
pub fn index(&self) -> Index {
self.index.clone()
}
pub async fn initial_clone(&mut self) -> Result<()> {
let index = self.index();
let logger = self.logger.clone();
spawn_blocking(move || {
if !index.exists() {
info!(logger, "Cloning crates.io-index");
index.retrieve()?;
}
Ok::<_, Error>(())
})
.await??;
Ok(())
}
pub async fn refresh_at_interval(&mut self) {
loop {
if let Err(e) = self.refresh().await {
error!(
self.logger,
"failed refreshing the crates.io-index, the operation will be retried: {}", e
);
}
self.update_interval.tick().await;
}
}
async fn refresh(&self) -> Result<()> {
let index = self.index();
spawn_blocking(move || index.retrieve_or_update()).await??;
Ok(())
}
}

View file

@ -1 +1,2 @@
pub mod cache; pub mod cache;
pub mod index;