Remove Mutex around Cache and move away from deprecated lru-cache (#65)

This commit is contained in:
Paolo Barbolini 2020-10-05 13:47:24 +02:00 committed by GitHub
parent dcd2e7a421
commit 8b58821beb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 49 additions and 77 deletions

17
Cargo.lock generated
View file

@ -665,12 +665,6 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "linked-hash-map"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a"
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.11" version = "0.4.11"
@ -681,13 +675,10 @@ dependencies = [
] ]
[[package]] [[package]]
name = "lru-cache" name = "lru_time_cache"
version = "0.1.2" version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e24f1ad8321ca0e8a1e0ac13f23cb668e6f5466c2c57319f6a5cf1cc8e3b1c" checksum = "53883292fdb9976d4666a9ede31ed599c255be9715947a8bd2adc72522e66749"
dependencies = [
"linked-hash-map",
]
[[package]] [[package]]
name = "matches" name = "matches"
@ -1316,7 +1307,7 @@ dependencies = [
"futures", "futures",
"hyper", "hyper",
"indexmap", "indexmap",
"lru-cache", "lru_time_cache",
"maud", "maud",
"once_cell", "once_cell",
"pin-project", "pin-project",

View file

@ -19,7 +19,7 @@ derive_more = "0.99"
futures = "0.3" futures = "0.3"
hyper = "0.13" hyper = "0.13"
indexmap = { version = "1", features = ["serde-1"] } indexmap = { version = "1", features = ["serde-1"] }
lru-cache = "0.1" # TODO: replace unmaintained crate lru_time_cache = "0.11.1"
maud = "0.22" maud = "0.22"
once_cell = "1.4" once_cell = "1.4"
pin-project = "0.4" pin-project = "0.4"

View file

@ -15,7 +15,6 @@ use rustsec::database::Database;
use semver::VersionReq; use semver::VersionReq;
use slog::Logger; use slog::Logger;
use stream::BoxStream; use stream::BoxStream;
use tokio::sync::Mutex;
use crate::interactors::crates::{GetPopularCrates, QueryCrate}; use crate::interactors::crates::{GetPopularCrates, QueryCrate};
use crate::interactors::github::GetPopularRepos; use crate::interactors::github::GetPopularRepos;
@ -36,11 +35,11 @@ pub struct Engine {
client: reqwest::Client, client: reqwest::Client,
logger: Logger, logger: Logger,
metrics: StatsdClient, metrics: StatsdClient,
query_crate: Arc<Mutex<Cache<QueryCrate, CrateName>>>, query_crate: Cache<QueryCrate, CrateName>,
get_popular_crates: Arc<Mutex<Cache<GetPopularCrates, ()>>>, get_popular_crates: Cache<GetPopularCrates, ()>,
get_popular_repos: Arc<Mutex<Cache<GetPopularRepos, ()>>>, get_popular_repos: Cache<GetPopularRepos, ()>,
retrieve_file_at_path: Arc<Mutex<RetrieveFileAtPath>>, retrieve_file_at_path: RetrieveFileAtPath,
fetch_advisory_db: Arc<Mutex<Cache<FetchAdvisoryDatabase, ()>>>, fetch_advisory_db: Cache<FetchAdvisoryDatabase, ()>,
} }
impl Engine { impl Engine {
@ -65,6 +64,7 @@ impl Engine {
1, 1,
logger.clone(), logger.clone(),
); );
let retrieve_file_at_path = RetrieveFileAtPath::new(client.clone());
let fetch_advisory_db = Cache::new( let fetch_advisory_db = Cache::new(
FetchAdvisoryDatabase::new(client.clone()), FetchAdvisoryDatabase::new(client.clone()),
Duration::from_secs(1800), Duration::from_secs(1800),
@ -73,14 +73,14 @@ impl Engine {
); );
Engine { Engine {
client: client.clone(), client,
logger, logger,
metrics, metrics,
query_crate: Arc::new(Mutex::new(query_crate)), query_crate,
get_popular_crates: Arc::new(Mutex::new(get_popular_crates)), get_popular_crates,
get_popular_repos: Arc::new(Mutex::new(get_popular_repos)), get_popular_repos,
retrieve_file_at_path: Arc::new(Mutex::new(RetrieveFileAtPath::new(client))), retrieve_file_at_path,
fetch_advisory_db: Arc::new(Mutex::new(fetch_advisory_db)), fetch_advisory_db,
} }
} }
@ -117,10 +117,7 @@ impl AnalyzeDependenciesOutcome {
impl Engine { impl Engine {
pub async fn get_popular_repos(&self) -> Result<Vec<Repository>, Error> { pub async fn get_popular_repos(&self) -> Result<Vec<Repository>, Error> {
let repos = { let repos = self.get_popular_repos.cached_query(()).await?;
let mut lock = self.get_popular_repos.lock().await;
lock.cached_query(()).await?
};
let filtered_repos = repos let filtered_repos = repos
.iter() .iter()
@ -132,8 +129,7 @@ impl Engine {
} }
pub async fn get_popular_crates(&self) -> Result<Vec<CratePath>, Error> { pub async fn get_popular_crates(&self) -> Result<Vec<CratePath>, Error> {
let mut lock = self.get_popular_crates.lock().await; let crates = self.get_popular_crates.cached_query(()).await?;
let crates = lock.cached_query(()).await?;
Ok(crates) Ok(crates)
} }
@ -178,10 +174,10 @@ impl Engine {
) -> Result<AnalyzeDependenciesOutcome, Error> { ) -> Result<AnalyzeDependenciesOutcome, Error> {
let start = Instant::now(); let start = Instant::now();
let query_response = { let query_response = self
let mut lock = self.query_crate.lock().await; .query_crate
lock.cached_query(crate_path.name.clone()).await? .cached_query(crate_path.name.clone())
}; .await?;
let engine = self.clone(); let engine = self.clone();
@ -212,10 +208,7 @@ impl Engine {
name: CrateName, name: CrateName,
req: VersionReq, req: VersionReq,
) -> Result<Option<CrateRelease>, Error> { ) -> Result<Option<CrateRelease>, Error> {
let query_response = { let query_response = self.query_crate.cached_query(name).await?;
let mut lock = self.query_crate.lock().await;
lock.cached_query(name).await?
};
let latest = query_response let latest = query_response
.releases .releases
@ -249,21 +242,19 @@ impl Engine {
) -> Result<String, Error> { ) -> Result<String, Error> {
let manifest_path = path.join(RelativePath::new("Cargo.toml")); let manifest_path = path.join(RelativePath::new("Cargo.toml"));
let mut lock = self.retrieve_file_at_path.lock().await; let mut service = self.retrieve_file_at_path.clone();
Ok(lock.call((repo_path.clone(), manifest_path)).await?) Ok(service.call((repo_path.clone(), manifest_path)).await?)
} }
async fn fetch_advisory_db(&self) -> Result<Arc<Database>, Error> { async fn fetch_advisory_db(&self) -> Result<Arc<Database>, Error> {
let mut lock = self.fetch_advisory_db.lock().await; Ok(self.fetch_advisory_db.cached_query(()).await?)
Ok(lock.cached_query(()).await?)
} }
} }
async fn resolve_crate_with_engine( async fn resolve_crate_with_engine(
(crate_name, engine): (CrateName, Engine), (crate_name, engine): (CrateName, Engine),
) -> anyhow::Result<Vec<CrateRelease>> { ) -> anyhow::Result<Vec<CrateRelease>> {
let mut lock = engine.query_crate.lock().await; let crate_res = engine.query_crate.cached_query(crate_name).await?;
let crate_res = lock.cached_query(crate_name).await?;
Ok(crate_res.releases) Ok(crate_res.releases)
} }

View file

@ -1,12 +1,8 @@
use std::{ use std::{fmt, sync::Arc, time::Duration};
fmt,
hash::Hash,
time::{Duration, Instant},
};
use derive_more::{Display, Error, From}; use derive_more::{Display, Error, From};
use hyper::service::Service; use hyper::service::Service;
use lru_cache::LruCache; use lru_time_cache::LruCache;
use slog::{debug, Logger}; use slog::{debug, Logger};
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -15,63 +11,56 @@ pub struct CacheError<E> {
inner: E, inner: E,
} }
#[derive(Clone)]
pub struct Cache<S, Req> pub struct Cache<S, Req>
where where
S: Service<Req>, S: Service<Req>,
Req: Hash + Eq,
{ {
inner: S, inner: S,
duration: Duration, cache: Arc<Mutex<LruCache<Req, S::Response>>>,
cache: Mutex<LruCache<Req, (Instant, S::Response)>>,
logger: Logger, logger: Logger,
} }
impl<S, Req> fmt::Debug for Cache<S, Req> impl<S, Req> fmt::Debug for Cache<S, Req>
where where
S: Service<Req> + fmt::Debug, S: Service<Req> + fmt::Debug,
Req: Hash + Eq,
{ {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Cache") fmt.debug_struct("Cache")
.field("inner", &self.inner) .field("inner", &self.inner)
.field("duration", &self.duration)
.finish() .finish()
} }
} }
impl<S, Req> Cache<S, Req> impl<S, Req> Cache<S, Req>
where where
S: Service<Req> + fmt::Debug, S: Service<Req> + fmt::Debug + Clone,
S::Response: Clone, S::Response: Clone,
Req: Clone + Eq + Hash + fmt::Debug, Req: Clone + Eq + Ord + fmt::Debug,
{ {
pub fn new(service: S, duration: Duration, capacity: usize, logger: Logger) -> Cache<S, Req> { pub fn new(service: S, ttl: Duration, capacity: usize, logger: Logger) -> Cache<S, Req> {
let cache = LruCache::with_expiry_duration_and_capacity(ttl, capacity);
Cache { Cache {
inner: service, inner: service,
duration, cache: Arc::new(Mutex::new(cache)),
cache: Mutex::new(LruCache::new(capacity)),
logger, logger,
} }
} }
pub async fn cached_query(&mut self, req: Req) -> Result<S::Response, S::Error> { pub async fn cached_query(&self, req: Req) -> Result<S::Response, S::Error> {
let now = Instant::now();
{ {
let mut cache = self.cache.lock().await; let mut cache = self.cache.lock().await;
if let Some((ref valid_until, ref cached_response)) = cache.get_mut(&req) { if let Some(cached_response) = cache.get(&req) {
if *valid_until > now {
debug!( debug!(
self.logger, "cache hit"; self.logger, "cache hit";
"svc" => format!("{:?}", self.inner), "svc" => format!("{:?}", self.inner),
"req" => format!("{:?}", &req) "req" => format!("{:?}", &req)
); );
return Ok(cached_response.clone()); return Ok(cached_response.clone());
} }
} }
}
debug!( debug!(
self.logger, "cache miss"; self.logger, "cache miss";
@ -79,11 +68,12 @@ where
"req" => format!("{:?}", &req) "req" => format!("{:?}", &req)
); );
let fresh = self.inner.call(req.clone()).await?; let mut service = self.inner.clone();
let fresh = service.call(req.clone()).await?;
{ {
let mut cache = self.cache.lock().await; let mut cache = self.cache.lock().await;
cache.insert(req, (now + self.duration, fresh.clone())); cache.insert(req, fresh.clone());
} }
Ok(fresh) Ok(fresh)