mirror of
https://github.com/deps-rs/deps.rs.git
synced 2024-11-22 10:26:30 +00:00
introduce lightweight (5 mins) caching for crate metadata
This commit is contained in:
parent
ab3f69fe6e
commit
53b7ed3015
9 changed files with 234 additions and 192 deletions
|
@ -10,6 +10,7 @@ futures = "0.1.18"
|
||||||
hyper = "0.11.15"
|
hyper = "0.11.15"
|
||||||
hyper-tls = "0.1.2"
|
hyper-tls = "0.1.2"
|
||||||
lazy_static = "1.0.0"
|
lazy_static = "1.0.0"
|
||||||
|
lru-cache = "0.1.1"
|
||||||
maud = "0.17.2"
|
maud = "0.17.2"
|
||||||
ordermap = { version = "0.4.0", features = ["serde-1"] }
|
ordermap = { version = "0.4.0", features = ["serde-1"] }
|
||||||
relative-path = { version = "0.3.7", features = ["serde"] }
|
relative-path = { version = "0.3.7", features = ["serde"] }
|
||||||
|
|
|
@ -15,32 +15,40 @@ use tokio_service::Service;
|
||||||
mod machines;
|
mod machines;
|
||||||
mod futures;
|
mod futures;
|
||||||
|
|
||||||
use ::utils::throttle::Throttle;
|
use ::utils::cache::Cache;
|
||||||
|
|
||||||
use ::models::repo::{Repository, RepoPath};
|
use ::models::repo::{Repository, RepoPath};
|
||||||
use ::models::crates::{CrateName, CrateRelease, AnalyzedDependencies};
|
use ::models::crates::{CrateName, CrateRelease, AnalyzedDependencies};
|
||||||
|
|
||||||
use ::interactors::crates::query_crate;
|
use ::interactors::crates::QueryCrate;
|
||||||
use ::interactors::github::retrieve_file_at_path;
|
use ::interactors::github::{GetPopularRepos, RetrieveFileAtPath};
|
||||||
use ::interactors::github::GetPopularRepos;
|
|
||||||
|
|
||||||
use self::futures::AnalyzeDependenciesFuture;
|
use self::futures::AnalyzeDependenciesFuture;
|
||||||
use self::futures::CrawlManifestFuture;
|
use self::futures::CrawlManifestFuture;
|
||||||
|
|
||||||
|
type HttpClient = Client<HttpsConnector<HttpConnector>>;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Engine {
|
pub struct Engine {
|
||||||
client: Client<HttpsConnector<HttpConnector>>,
|
client: HttpClient,
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
|
|
||||||
get_popular_repos: Arc<Throttle<GetPopularRepos<Client<HttpsConnector<HttpConnector>>>>>
|
query_crate: Arc<Cache<QueryCrate<HttpClient>>>,
|
||||||
|
get_popular_repos: Arc<Cache<GetPopularRepos<HttpClient>>>,
|
||||||
|
retrieve_file_at_path: Arc<RetrieveFileAtPath<HttpClient>>
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
pub fn new(client: Client<HttpsConnector<HttpConnector>>, logger: Logger) -> Engine {
|
pub fn new(client: Client<HttpsConnector<HttpConnector>>, logger: Logger) -> Engine {
|
||||||
|
let query_crate = Cache::new(QueryCrate(client.clone()), Duration::from_secs(300), 500);
|
||||||
|
let get_popular_repos = Cache::new(GetPopularRepos(client.clone()), Duration::from_secs(10), 1);
|
||||||
|
|
||||||
Engine {
|
Engine {
|
||||||
client: client.clone(), logger,
|
client: client.clone(), logger,
|
||||||
|
|
||||||
get_popular_repos: Arc::new(Throttle::new(GetPopularRepos(client), Duration::from_secs(10)))
|
query_crate: Arc::new(query_crate),
|
||||||
|
get_popular_repos: Arc::new(get_popular_repos),
|
||||||
|
retrieve_file_at_path: Arc::new(RetrieveFileAtPath(client))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,10 +105,11 @@ impl Engine {
|
||||||
fn fetch_releases<I: IntoIterator<Item=CrateName>>(&self, names: I) ->
|
fn fetch_releases<I: IntoIterator<Item=CrateName>>(&self, names: I) ->
|
||||||
impl Iterator<Item=impl Future<Item=Vec<CrateRelease>, Error=Error>>
|
impl Iterator<Item=impl Future<Item=Vec<CrateRelease>, Error=Error>>
|
||||||
{
|
{
|
||||||
let client = self.client.clone();
|
let engine = self.clone();
|
||||||
names.into_iter().map(move |name| {
|
names.into_iter().map(move |name| {
|
||||||
query_crate(client.clone(), name)
|
engine.query_crate.call(name)
|
||||||
.map(|resp| resp.releases)
|
.from_err()
|
||||||
|
.map(|resp| resp.releases.clone())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +117,7 @@ impl Engine {
|
||||||
impl Future<Item=String, Error=Error>
|
impl Future<Item=String, Error=Error>
|
||||||
{
|
{
|
||||||
let manifest_path = path.join(RelativePath::new("Cargo.toml"));
|
let manifest_path = path.join(RelativePath::new("Cargo.toml"));
|
||||||
retrieve_file_at_path(self.client.clone(), &repo_path, &manifest_path).from_err()
|
self.retrieve_file_at_path.call((repo_path.clone(), manifest_path))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,43 +36,55 @@ pub struct QueryCrateResponse {
|
||||||
pub releases: Vec<CrateRelease>
|
pub releases: Vec<CrateRelease>
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn query_crate<S>(service: S, crate_name: CrateName) ->
|
#[derive(Debug, Clone)]
|
||||||
impl Future<Item=QueryCrateResponse, Error=Error>
|
pub struct QueryCrate<S>(pub S);
|
||||||
where S: Service<Request=Request, Response=Response, Error=HyperError>
|
|
||||||
|
impl<S> Service for QueryCrate<S>
|
||||||
|
where S: Service<Request=Request, Response=Response, Error=HyperError> + Clone + 'static,
|
||||||
|
S::Future: 'static
|
||||||
{
|
{
|
||||||
let lower_name = crate_name.as_ref().to_lowercase();
|
type Request = CrateName;
|
||||||
|
type Response = QueryCrateResponse;
|
||||||
|
type Error = Error;
|
||||||
|
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
|
||||||
|
|
||||||
let path = match lower_name.len() {
|
fn call(&self, crate_name: CrateName) -> Self::Future {
|
||||||
1 => format!("1/{}", lower_name),
|
let service = self.0.clone();
|
||||||
2 => format!("2/{}", lower_name),
|
|
||||||
3 => format!("3/{}/{}", &lower_name[..1], lower_name),
|
|
||||||
_ => format!("{}/{}/{}", &lower_name[0..2], &lower_name[2..4], lower_name),
|
|
||||||
};
|
|
||||||
|
|
||||||
let uri_future = format!("{}/master/{}", CRATES_INDEX_BASE_URI, path)
|
let lower_name = crate_name.as_ref().to_lowercase();
|
||||||
.parse::<Uri>().into_future().from_err();
|
|
||||||
|
|
||||||
uri_future.and_then(move |uri| {
|
let path = match lower_name.len() {
|
||||||
let request = Request::new(Method::Get, uri.clone());
|
1 => format!("1/{}", lower_name),
|
||||||
|
2 => format!("2/{}", lower_name),
|
||||||
|
3 => format!("3/{}/{}", &lower_name[..1], lower_name),
|
||||||
|
_ => format!("{}/{}/{}", &lower_name[0..2], &lower_name[2..4], lower_name),
|
||||||
|
};
|
||||||
|
|
||||||
service.call(request).from_err().and_then(move |response| {
|
let uri_future = format!("{}/master/{}", CRATES_INDEX_BASE_URI, path)
|
||||||
let status = response.status();
|
.parse::<Uri>().into_future().from_err();
|
||||||
if !status.is_success() {
|
|
||||||
future::Either::A(future::err(format_err!("Status code {} for URI {}", status, uri)))
|
Box::new(uri_future.and_then(move |uri| {
|
||||||
} else {
|
let request = Request::new(Method::Get, uri.clone());
|
||||||
let body_future = response.body().concat2().from_err();
|
|
||||||
let decode_future = body_future.and_then(|body| {
|
service.call(request).from_err().and_then(move |response| {
|
||||||
let string_body = str::from_utf8(body.as_ref())?;
|
let status = response.status();
|
||||||
let packages = string_body.lines()
|
if !status.is_success() {
|
||||||
.map(|s| s.trim())
|
future::Either::A(future::err(format_err!("Status code {} for URI {}", status, uri)))
|
||||||
.filter(|s| !s.is_empty())
|
} else {
|
||||||
.map(|s| serde_json::from_str::<RegistryPackage>(s))
|
let body_future = response.body().concat2().from_err();
|
||||||
.collect::<Result<_, _>>()?;
|
let decode_future = body_future.and_then(|body| {
|
||||||
Ok(packages)
|
let string_body = str::from_utf8(body.as_ref())?;
|
||||||
});
|
let packages = string_body.lines()
|
||||||
let convert_future = decode_future.and_then(move |pkgs| convert_pkgs(&crate_name, pkgs));
|
.map(|s| s.trim())
|
||||||
future::Either::B(convert_future)
|
.filter(|s| !s.is_empty())
|
||||||
}
|
.map(|s| serde_json::from_str::<RegistryPackage>(s))
|
||||||
})
|
.collect::<Result<_, _>>()?;
|
||||||
})
|
Ok(packages)
|
||||||
|
});
|
||||||
|
let convert_future = decode_future.and_then(move |pkgs| convert_pkgs(&crate_name, pkgs));
|
||||||
|
future::Either::B(convert_future)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,33 +11,46 @@ use ::models::repo::{Repository, RepoPath};
|
||||||
const GITHUB_API_BASE_URI: &'static str = "https://api.github.com";
|
const GITHUB_API_BASE_URI: &'static str = "https://api.github.com";
|
||||||
const GITHUB_USER_CONTENT_BASE_URI: &'static str = "https://raw.githubusercontent.com";
|
const GITHUB_USER_CONTENT_BASE_URI: &'static str = "https://raw.githubusercontent.com";
|
||||||
|
|
||||||
pub fn retrieve_file_at_path<S>(service: S, repo_path: &RepoPath, path: &RelativePathBuf) ->
|
#[derive(Debug, Clone)]
|
||||||
impl Future<Item=String, Error=Error>
|
pub struct RetrieveFileAtPath<S>(pub S);
|
||||||
where S: Service<Request=Request, Response=Response, Error=HyperError>
|
|
||||||
|
impl<S> Service for RetrieveFileAtPath<S>
|
||||||
|
where S: Service<Request=Request, Response=Response, Error=HyperError> + Clone + 'static,
|
||||||
|
S::Future: 'static
|
||||||
{
|
{
|
||||||
let path_str: &str = path.as_ref();
|
type Request = (RepoPath, RelativePathBuf);
|
||||||
let uri_future = format!("{}/{}/{}/HEAD/{}",
|
type Response = String;
|
||||||
GITHUB_USER_CONTENT_BASE_URI,
|
type Error = Error;
|
||||||
repo_path.qual.as_ref(),
|
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
|
||||||
repo_path.name.as_ref(),
|
|
||||||
path_str
|
|
||||||
).parse::<Uri>().into_future().from_err();
|
|
||||||
|
|
||||||
uri_future.and_then(move |uri| {
|
fn call(&self, req: Self::Request) -> Self::Future {
|
||||||
let request = Request::new(Method::Get, uri.clone());
|
let service = self.0.clone();
|
||||||
|
|
||||||
service.call(request).from_err().and_then(move |response| {
|
let (repo_path, path) = req;
|
||||||
let status = response.status();
|
let path_str: &str = path.as_ref();
|
||||||
if !status.is_success() {
|
let uri_future = format!("{}/{}/{}/HEAD/{}",
|
||||||
future::Either::A(future::err(format_err!("Status code {} for URI {}", status, uri)))
|
GITHUB_USER_CONTENT_BASE_URI,
|
||||||
} else {
|
repo_path.qual.as_ref(),
|
||||||
let body_future = response.body().concat2().from_err();
|
repo_path.name.as_ref(),
|
||||||
let decode_future = body_future
|
path_str
|
||||||
.and_then(|body| String::from_utf8(body.to_vec()).map_err(|err| err.into()));
|
).parse::<Uri>().into_future().from_err();
|
||||||
future::Either::B(decode_future)
|
|
||||||
}
|
Box::new(uri_future.and_then(move |uri| {
|
||||||
})
|
let request = Request::new(Method::Get, uri.clone());
|
||||||
})
|
|
||||||
|
service.call(request).from_err().and_then(move |response| {
|
||||||
|
let status = response.status();
|
||||||
|
if !status.is_success() {
|
||||||
|
future::Either::A(future::err(format_err!("Status code {} for URI {}", status, uri)))
|
||||||
|
} else {
|
||||||
|
let body_future = response.body().concat2().from_err();
|
||||||
|
let decode_future = body_future
|
||||||
|
.and_then(|body| String::from_utf8(body.to_vec()).map_err(|err| err.into()));
|
||||||
|
future::Either::B(decode_future)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
|
|
|
@ -9,6 +9,7 @@ extern crate base64;
|
||||||
extern crate hyper;
|
extern crate hyper;
|
||||||
extern crate hyper_tls;
|
extern crate hyper_tls;
|
||||||
#[macro_use] extern crate lazy_static;
|
#[macro_use] extern crate lazy_static;
|
||||||
|
extern crate lru_cache;
|
||||||
extern crate maud;
|
extern crate maud;
|
||||||
extern crate ordermap;
|
extern crate ordermap;
|
||||||
extern crate relative_path;
|
extern crate relative_path;
|
||||||
|
|
|
@ -43,7 +43,7 @@ impl FromStr for CrateName {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct CrateRelease {
|
pub struct CrateRelease {
|
||||||
pub name: CrateName,
|
pub name: CrateName,
|
||||||
pub version: Version,
|
pub version: Version,
|
||||||
|
|
127
src/utils/cache.rs
Normal file
127
src/utils/cache.rs
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
|
||||||
|
use std::hash::Hash;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
use std::ops::Deref;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
use failure::{Error, Fail};
|
||||||
|
use futures::{Future, Poll};
|
||||||
|
use futures::future::{Shared, SharedError, SharedItem};
|
||||||
|
use lru_cache::LruCache;
|
||||||
|
use tokio_service::Service;
|
||||||
|
|
||||||
|
pub struct Cache<S>
|
||||||
|
where S: Service<Error=Error>,
|
||||||
|
S::Request: Hash + Eq
|
||||||
|
{
|
||||||
|
inner: S,
|
||||||
|
duration: Duration,
|
||||||
|
cache: Mutex<LruCache<S::Request, (Instant, Shared<S::Future>)>>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Debug for Cache<S>
|
||||||
|
where S: Service<Error=Error> + Debug,
|
||||||
|
S::Request: Hash + Eq
|
||||||
|
{
|
||||||
|
fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
|
||||||
|
fmt.debug_struct("Cache")
|
||||||
|
.field("inner", &self.inner)
|
||||||
|
.field("duration", &self.duration)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Cache<S>
|
||||||
|
where S: Service<Error=Error>,
|
||||||
|
S::Request: Hash + Eq
|
||||||
|
{
|
||||||
|
pub fn new(service: S, duration: Duration, capacity: usize) -> Cache<S> {
|
||||||
|
Cache {
|
||||||
|
inner: service,
|
||||||
|
duration: duration,
|
||||||
|
cache: Mutex::new(LruCache::new(capacity))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Service for Cache<S>
|
||||||
|
where S: Service<Error=Error>,
|
||||||
|
S::Request: Clone + Hash + Eq
|
||||||
|
{
|
||||||
|
type Request = S::Request;
|
||||||
|
type Response = CachedItem<S::Response>;
|
||||||
|
type Error = CachedError;
|
||||||
|
type Future = Cached<S::Future>;
|
||||||
|
|
||||||
|
fn call(&self, req: Self::Request) -> Self::Future {
|
||||||
|
let now = Instant::now();
|
||||||
|
let mut cache = self.cache.lock().expect("lock poisoned");
|
||||||
|
if let Some(&mut (valid_until, ref shared_future)) = cache.get_mut(&req) {
|
||||||
|
if valid_until > now {
|
||||||
|
if let Some(Ok(_)) = shared_future.peek() {
|
||||||
|
return Cached(shared_future.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
let shared_future = self.inner.call(req.clone()).shared();
|
||||||
|
cache.insert(req, (now + self.duration, shared_future.clone()));
|
||||||
|
Cached(shared_future)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Cached<F: Future>(Shared<F>);
|
||||||
|
|
||||||
|
impl<F> Debug for Cached<F>
|
||||||
|
where F: Future + Debug,
|
||||||
|
F::Item: Debug,
|
||||||
|
F::Error: Debug
|
||||||
|
{
|
||||||
|
fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
|
||||||
|
self.0.fmt(fmt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<F: Future<Error=Error>> Future for Cached<F> {
|
||||||
|
type Item = CachedItem<F::Item>;
|
||||||
|
type Error = CachedError;
|
||||||
|
|
||||||
|
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
||||||
|
self.0.poll()
|
||||||
|
.map_err(CachedError)
|
||||||
|
.map(|async| async.map(CachedItem))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct CachedItem<T>(SharedItem<T>);
|
||||||
|
|
||||||
|
impl<T> Deref for CachedItem<T> {
|
||||||
|
type Target = T;
|
||||||
|
|
||||||
|
fn deref(&self) -> &T {
|
||||||
|
&self.0.deref()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct CachedError(SharedError<Error>);
|
||||||
|
|
||||||
|
impl Fail for CachedError {
|
||||||
|
fn cause(&self) -> Option<&Fail> {
|
||||||
|
Some(self.0.cause())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn backtrace(&self) -> Option<&::failure::Backtrace> {
|
||||||
|
Some(self.0.backtrace())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn causes(&self) -> ::failure::Causes {
|
||||||
|
self.0.causes()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for CachedError {
|
||||||
|
fn fmt(&self, f: &mut Formatter) -> FmtResult {
|
||||||
|
Display::fmt(&self.0, f)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1 +1 @@
|
||||||
pub mod throttle;
|
pub mod cache;
|
||||||
|
|
|
@ -1,121 +0,0 @@
|
||||||
use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
use std::ops::Deref;
|
|
||||||
use std::sync::Mutex;
|
|
||||||
|
|
||||||
use failure::{Error, Fail};
|
|
||||||
use futures::{Future, Poll};
|
|
||||||
use futures::future::{Shared, SharedError, SharedItem};
|
|
||||||
use tokio_service::Service;
|
|
||||||
|
|
||||||
pub struct Throttle<S>
|
|
||||||
where S: Service<Request=(), Error=Error>
|
|
||||||
{
|
|
||||||
inner: S,
|
|
||||||
duration: Duration,
|
|
||||||
current: Mutex<Option<(Instant, Shared<S::Future>)>>
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> Debug for Throttle<S>
|
|
||||||
where S: Service<Request=(), Error=Error> + Debug
|
|
||||||
{
|
|
||||||
fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
|
|
||||||
fmt.debug_struct("Throttle")
|
|
||||||
.field("inner", &self.inner)
|
|
||||||
.field("duration", &self.duration)
|
|
||||||
.finish()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> Throttle<S>
|
|
||||||
where S: Service<Request=(), Error=Error>
|
|
||||||
{
|
|
||||||
pub fn new(service: S, duration: Duration) -> Throttle<S> {
|
|
||||||
Throttle {
|
|
||||||
inner: service,
|
|
||||||
duration,
|
|
||||||
current: Mutex::new(None)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<S> Service for Throttle<S>
|
|
||||||
where S: Service<Request=(), Error=Error>
|
|
||||||
{
|
|
||||||
type Request = ();
|
|
||||||
type Response = ThrottledItem<S::Response>;
|
|
||||||
type Error = ThrottledError;
|
|
||||||
type Future = Throttled<S::Future>;
|
|
||||||
|
|
||||||
fn call(&self, _: ()) -> Self::Future {
|
|
||||||
let now = Instant::now();
|
|
||||||
let mut current = self.current.lock().expect("lock poisoned");
|
|
||||||
if let Some((valid_until, ref shared_future)) = *current {
|
|
||||||
if valid_until > now {
|
|
||||||
if let Some(Ok(_)) = shared_future.peek() {
|
|
||||||
return Throttled(shared_future.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let shared_future = self.inner.call(()).shared();
|
|
||||||
*current = Some((now + self.duration, shared_future.clone()));
|
|
||||||
Throttled(shared_future)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct Throttled<F: Future>(Shared<F>);
|
|
||||||
|
|
||||||
impl<F> Debug for Throttled<F>
|
|
||||||
where F: Future + Debug,
|
|
||||||
F::Item: Debug,
|
|
||||||
F::Error: Debug
|
|
||||||
{
|
|
||||||
fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
|
|
||||||
self.0.fmt(fmt)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<F: Future<Error=Error>> Future for Throttled<F> {
|
|
||||||
type Item = ThrottledItem<F::Item>;
|
|
||||||
type Error = ThrottledError;
|
|
||||||
|
|
||||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
|
|
||||||
self.0.poll()
|
|
||||||
.map_err(ThrottledError)
|
|
||||||
.map(|async| async.map(ThrottledItem))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ThrottledItem<T>(SharedItem<T>);
|
|
||||||
|
|
||||||
impl<T> Deref for ThrottledItem<T> {
|
|
||||||
type Target = T;
|
|
||||||
|
|
||||||
fn deref(&self) -> &T {
|
|
||||||
&self.0.deref()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct ThrottledError(SharedError<Error>);
|
|
||||||
|
|
||||||
impl Fail for ThrottledError {
|
|
||||||
fn cause(&self) -> Option<&Fail> {
|
|
||||||
Some(self.0.cause())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn backtrace(&self) -> Option<&::failure::Backtrace> {
|
|
||||||
Some(self.0.backtrace())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn causes(&self) -> ::failure::Causes {
|
|
||||||
self.0.causes()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Display for ThrottledError {
|
|
||||||
fn fmt(&self, f: &mut Formatter) -> FmtResult {
|
|
||||||
Display::fmt(&self.0, f)
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue