From 8b58821bebdf881551f7709ff8d0c84d54ea1103 Mon Sep 17 00:00:00 2001 From: Paolo Barbolini Date: Mon, 5 Oct 2020 13:47:24 +0200 Subject: [PATCH] Remove Mutex around Cache and move away from deprecated lru-cache (#65) --- Cargo.lock | 17 ++++---------- Cargo.toml | 2 +- src/engine/mod.rs | 55 +++++++++++++++++++--------------------------- src/utils/cache.rs | 52 ++++++++++++++++++------------------------- 4 files changed, 49 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e66672a..eb6c1b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -665,12 +665,6 @@ dependencies = [ "vcpkg", ] -[[package]] -name = "linked-hash-map" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" - [[package]] name = "log" version = "0.4.11" @@ -681,13 +675,10 @@ dependencies = [ ] [[package]] -name = "lru-cache" -version = "0.1.2" +name = "lru_time_cache" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" -dependencies = [ - "linked-hash-map", -] +checksum = "53883292fdb9976d4666a9ede31ed599c255be9715947a8bd2adc72522e66749" [[package]] name = "matches" @@ -1316,7 +1307,7 @@ dependencies = [ "futures", "hyper", "indexmap", - "lru-cache", + "lru_time_cache", "maud", "once_cell", "pin-project", diff --git a/Cargo.toml b/Cargo.toml index 227f707..5131b51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ derive_more = "0.99" futures = "0.3" hyper = "0.13" indexmap = { version = "1", features = ["serde-1"] } -lru-cache = "0.1" # TODO: replace unmaintained crate +lru_time_cache = "0.11.1" maud = "0.22" once_cell = "1.4" pin-project = "0.4" diff --git a/src/engine/mod.rs b/src/engine/mod.rs index bc0f472..e0df9bc 100644 --- a/src/engine/mod.rs +++ b/src/engine/mod.rs @@ -15,7 +15,6 @@ use rustsec::database::Database; use semver::VersionReq; use slog::Logger; use stream::BoxStream; -use tokio::sync::Mutex; use crate::interactors::crates::{GetPopularCrates, QueryCrate}; use crate::interactors::github::GetPopularRepos; @@ -36,11 +35,11 @@ pub struct Engine { client: reqwest::Client, logger: Logger, metrics: StatsdClient, - query_crate: Arc>>, - get_popular_crates: Arc>>, - get_popular_repos: Arc>>, - retrieve_file_at_path: Arc>, - fetch_advisory_db: Arc>>, + query_crate: Cache, + get_popular_crates: Cache, + get_popular_repos: Cache, + retrieve_file_at_path: RetrieveFileAtPath, + fetch_advisory_db: Cache, } impl Engine { @@ -65,6 +64,7 @@ impl Engine { 1, logger.clone(), ); + let retrieve_file_at_path = RetrieveFileAtPath::new(client.clone()); let fetch_advisory_db = Cache::new( FetchAdvisoryDatabase::new(client.clone()), Duration::from_secs(1800), @@ -73,14 +73,14 @@ impl Engine { ); Engine { - client: client.clone(), + client, logger, metrics, - query_crate: Arc::new(Mutex::new(query_crate)), - get_popular_crates: Arc::new(Mutex::new(get_popular_crates)), - get_popular_repos: Arc::new(Mutex::new(get_popular_repos)), - retrieve_file_at_path: Arc::new(Mutex::new(RetrieveFileAtPath::new(client))), - fetch_advisory_db: Arc::new(Mutex::new(fetch_advisory_db)), + query_crate, + get_popular_crates, + get_popular_repos, + retrieve_file_at_path, + fetch_advisory_db, } } @@ -117,10 +117,7 @@ impl AnalyzeDependenciesOutcome { impl Engine { pub async fn get_popular_repos(&self) -> Result, Error> { - let repos = { - let mut lock = self.get_popular_repos.lock().await; - lock.cached_query(()).await? - }; + let repos = self.get_popular_repos.cached_query(()).await?; let filtered_repos = repos .iter() @@ -132,8 +129,7 @@ impl Engine { } pub async fn get_popular_crates(&self) -> Result, Error> { - let mut lock = self.get_popular_crates.lock().await; - let crates = lock.cached_query(()).await?; + let crates = self.get_popular_crates.cached_query(()).await?; Ok(crates) } @@ -178,10 +174,10 @@ impl Engine { ) -> Result { let start = Instant::now(); - let query_response = { - let mut lock = self.query_crate.lock().await; - lock.cached_query(crate_path.name.clone()).await? - }; + let query_response = self + .query_crate + .cached_query(crate_path.name.clone()) + .await?; let engine = self.clone(); @@ -212,10 +208,7 @@ impl Engine { name: CrateName, req: VersionReq, ) -> Result, Error> { - let query_response = { - let mut lock = self.query_crate.lock().await; - lock.cached_query(name).await? - }; + let query_response = self.query_crate.cached_query(name).await?; let latest = query_response .releases @@ -249,21 +242,19 @@ impl Engine { ) -> Result { let manifest_path = path.join(RelativePath::new("Cargo.toml")); - let mut lock = self.retrieve_file_at_path.lock().await; - Ok(lock.call((repo_path.clone(), manifest_path)).await?) + let mut service = self.retrieve_file_at_path.clone(); + Ok(service.call((repo_path.clone(), manifest_path)).await?) } async fn fetch_advisory_db(&self) -> Result, Error> { - let mut lock = self.fetch_advisory_db.lock().await; - Ok(lock.cached_query(()).await?) + Ok(self.fetch_advisory_db.cached_query(()).await?) } } async fn resolve_crate_with_engine( (crate_name, engine): (CrateName, Engine), ) -> anyhow::Result> { - let mut lock = engine.query_crate.lock().await; - let crate_res = lock.cached_query(crate_name).await?; + let crate_res = engine.query_crate.cached_query(crate_name).await?; Ok(crate_res.releases) } diff --git a/src/utils/cache.rs b/src/utils/cache.rs index df2efbf..5637472 100644 --- a/src/utils/cache.rs +++ b/src/utils/cache.rs @@ -1,12 +1,8 @@ -use std::{ - fmt, - hash::Hash, - time::{Duration, Instant}, -}; +use std::{fmt, sync::Arc, time::Duration}; use derive_more::{Display, Error, From}; use hyper::service::Service; -use lru_cache::LruCache; +use lru_time_cache::LruCache; use slog::{debug, Logger}; use tokio::sync::Mutex; @@ -15,61 +11,54 @@ pub struct CacheError { inner: E, } +#[derive(Clone)] pub struct Cache where S: Service, - Req: Hash + Eq, { inner: S, - duration: Duration, - cache: Mutex>, + cache: Arc>>, logger: Logger, } impl fmt::Debug for Cache where S: Service + fmt::Debug, - Req: Hash + Eq, { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Cache") .field("inner", &self.inner) - .field("duration", &self.duration) .finish() } } impl Cache where - S: Service + fmt::Debug, + S: Service + fmt::Debug + Clone, S::Response: Clone, - Req: Clone + Eq + Hash + fmt::Debug, + Req: Clone + Eq + Ord + fmt::Debug, { - pub fn new(service: S, duration: Duration, capacity: usize, logger: Logger) -> Cache { + pub fn new(service: S, ttl: Duration, capacity: usize, logger: Logger) -> Cache { + let cache = LruCache::with_expiry_duration_and_capacity(ttl, capacity); + Cache { inner: service, - duration, - cache: Mutex::new(LruCache::new(capacity)), + cache: Arc::new(Mutex::new(cache)), logger, } } - pub async fn cached_query(&mut self, req: Req) -> Result { - let now = Instant::now(); - + pub async fn cached_query(&self, req: Req) -> Result { { let mut cache = self.cache.lock().await; - if let Some((ref valid_until, ref cached_response)) = cache.get_mut(&req) { - if *valid_until > now { - debug!( - self.logger, "cache hit"; - "svc" => format!("{:?}", self.inner), - "req" => format!("{:?}", &req) - ); - - return Ok(cached_response.clone()); - } + if let Some(cached_response) = cache.get(&req) { + debug!( + self.logger, "cache hit"; + "svc" => format!("{:?}", self.inner), + "req" => format!("{:?}", &req) + ); + return Ok(cached_response.clone()); } } @@ -79,11 +68,12 @@ where "req" => format!("{:?}", &req) ); - let fresh = self.inner.call(req.clone()).await?; + let mut service = self.inner.clone(); + let fresh = service.call(req.clone()).await?; { let mut cache = self.cache.lock().await; - cache.insert(req, (now + self.duration, fresh.clone())); + cache.insert(req, fresh.clone()); } Ok(fresh)