throttle calls to get popular repos from github

This commit is contained in:
Sam Rijs 2018-02-05 19:38:04 +11:00
parent 106fa95a84
commit b101a3fade
6 changed files with 198 additions and 41 deletions

View file

@ -1,10 +1,16 @@
mod analyzer; use std::sync::Arc;
use std::time::Duration;
use futures::{Future, Stream, stream}; use futures::{Future, Stream, stream};
use hyper::Client; use hyper::Client;
use hyper::client::HttpConnector; use hyper::client::HttpConnector;
use hyper_tls::HttpsConnector; use hyper_tls::HttpsConnector;
use slog::Logger; use slog::Logger;
use tokio_service::Service;
mod analyzer;
use ::utils::throttle::{Throttle, ThrottledError};
use ::models::repo::{Repository, RepoPath}; use ::models::repo::{Repository, RepoPath};
use ::models::crates::{CrateName, CrateRelease, CrateManifest, AnalyzedDependencies}; use ::models::crates::{CrateName, CrateRelease, CrateManifest, AnalyzedDependencies};
@ -13,17 +19,31 @@ use ::parsers::manifest::{ManifestParseError, parse_manifest_toml};
use ::interactors::crates::{QueryCrateError, query_crate}; use ::interactors::crates::{QueryCrateError, query_crate};
use ::interactors::github::{RetrieveFileAtPathError, retrieve_file_at_path}; use ::interactors::github::{RetrieveFileAtPathError, retrieve_file_at_path};
use ::interactors::github::get_popular_repos; use ::interactors::github::GetPopularRepos;
pub use ::interactors::github::GetPopularReposError;
use self::analyzer::DependencyAnalyzer; use self::analyzer::DependencyAnalyzer;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Engine { pub struct Engine {
pub client: Client<HttpsConnector<HttpConnector>>, client: Client<HttpsConnector<HttpConnector>>,
pub logger: Logger logger: Logger,
get_popular_repos: Arc<Throttle<GetPopularRepos<Client<HttpsConnector<HttpConnector>>>>>
} }
impl Engine {
pub fn new(client: Client<HttpsConnector<HttpConnector>>, logger: Logger) -> Engine {
Engine {
client: client.clone(), logger,
get_popular_repos: Arc::new(Throttle::new(GetPopularRepos(client), Duration::from_secs(10)))
}
}
}
#[derive(Debug)]
pub struct GetPopularReposError(ThrottledError<::interactors::github::GetPopularReposError>);
#[derive(Debug)] #[derive(Debug)]
pub enum AnalyzeDependenciesError { pub enum AnalyzeDependenciesError {
QueryCrate(QueryCrateError), QueryCrate(QueryCrateError),
@ -42,7 +62,9 @@ impl Engine {
pub fn get_popular_repos(&self) -> pub fn get_popular_repos(&self) ->
impl Future<Item=Vec<Repository>, Error=GetPopularReposError> impl Future<Item=Vec<Repository>, Error=GetPopularReposError>
{ {
get_popular_repos(self.client.clone()) self.get_popular_repos.call(())
.map_err(GetPopularReposError)
.map(|repos| repos.clone())
} }
pub fn analyze_dependencies(&self, repo_path: RepoPath) -> pub fn analyze_dependencies(&self, repo_path: RepoPath) ->

View file

@ -74,33 +74,46 @@ struct GithubOwner {
login: String login: String
} }
pub fn get_popular_repos<S>(service: S) -> #[derive(Debug, Clone)]
impl Future<Item=Vec<Repository>, Error=GetPopularReposError> pub struct GetPopularRepos<S>(pub S);
where S: Service<Request=Request, Response=Response, Error=HyperError>
impl<S> Service for GetPopularRepos<S>
where S: Service<Request=Request, Response=Response, Error=HyperError> + Clone + 'static,
S::Future: 'static
{ {
let uri_future = format!("{}/search/repositories?q=language:rust&sort=stars", GITHUB_API_BASE_URI) type Request = ();
.parse().into_future().map_err(GetPopularReposError::Uri); type Response = Vec<Repository>;
type Error = GetPopularReposError;
type Future = Box<Future<Item=Self::Response, Error=Self::Error>>;
uri_future.and_then(move |uri| { fn call(&self, _req: ()) -> Self::Future {
let mut request = Request::new(Method::Get, uri); println!("call api");
request.headers_mut().set(UserAgent::new("deps.rs")); let service = self.0.clone();
service.call(request).map_err(GetPopularReposError::Transport).and_then(|response| { let uri_future = format!("{}/search/repositories?q=language:rust&sort=stars", GITHUB_API_BASE_URI)
let status = response.status(); .parse().into_future().map_err(GetPopularReposError::Uri);
if !status.is_success() {
future::Either::A(future::err(GetPopularReposError::Status(status))) Box::new(uri_future.and_then(move |uri| {
} else { let mut request = Request::new(Method::Get, uri);
let body_future = response.body().concat2().map_err(GetPopularReposError::Transport); request.headers_mut().set(UserAgent::new("deps.rs"));
let decode_future = body_future
.and_then(|body| serde_json::from_slice(body.as_ref()).map_err(GetPopularReposError::Decode)); service.call(request).map_err(GetPopularReposError::Transport).and_then(|response| {
future::Either::B(decode_future.and_then(|search_response: GithubSearchResponse| { let status = response.status();
search_response.items.into_iter().map(|item| { if !status.is_success() {
let path = RepoPath::from_parts("github", &item.owner.login, &item.name) future::Either::A(future::err(GetPopularReposError::Status(status)))
.map_err(GetPopularReposError::Validate)?; } else {
Ok(Repository { path, description: item.description }) let body_future = response.body().concat2().map_err(GetPopularReposError::Transport);
}).collect::<Result<Vec<_>, _>>() let decode_future = body_future
})) .and_then(|body| serde_json::from_slice(body.as_ref()).map_err(GetPopularReposError::Decode));
} future::Either::B(decode_future.and_then(|search_response: GithubSearchResponse| {
}) search_response.items.into_iter().map(|item| {
}) let path = RepoPath::from_parts("github", &item.owner.login, &item.name)
.map_err(GetPopularReposError::Validate)?;
Ok(Repository { path, description: item.description })
}).collect::<Result<Vec<_>, _>>()
}))
}
})
}))
}
} }

View file

@ -20,6 +20,7 @@ extern crate tokio_core;
extern crate tokio_service; extern crate tokio_service;
extern crate toml; extern crate toml;
mod utils;
mod models; mod models;
mod parsers; mod parsers;
mod interactors; mod interactors;
@ -65,10 +66,7 @@ fn main() {
let http = Http::new(); let http = Http::new();
let engine = Engine { let engine = Engine::new(client.clone(), logger.clone());
client: client.clone(),
logger: logger.clone()
};
let server = Server::new(engine); let server = Server::new(engine);

View file

@ -1,12 +1,12 @@
use std::str::FromStr; use std::str::FromStr;
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct Repository { pub struct Repository {
pub path: RepoPath, pub path: RepoPath,
pub description: String pub description: String
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct RepoPath { pub struct RepoPath {
pub site: RepoSite, pub site: RepoSite,
pub qual: RepoQualifier, pub qual: RepoQualifier,
@ -26,7 +26,7 @@ impl RepoPath {
#[derive(Debug)] #[derive(Debug)]
pub struct RepoValidationError; pub struct RepoValidationError;
#[derive(Clone, Copy)] #[derive(Clone, Copy, Debug)]
pub enum RepoSite { pub enum RepoSite {
Github Github
} }
@ -58,7 +58,7 @@ impl AsRef<str> for RepoSite {
} }
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct RepoQualifier(String); pub struct RepoQualifier(String);
impl FromStr for RepoQualifier { impl FromStr for RepoQualifier {
@ -83,7 +83,7 @@ impl AsRef<str> for RepoQualifier {
} }
} }
#[derive(Clone)] #[derive(Clone, Debug)]
pub struct RepoName(String); pub struct RepoName(String);
impl FromStr for RepoName { impl FromStr for RepoName {

1
src/utils/mod.rs Normal file
View file

@ -0,0 +1 @@
pub mod throttle;

123
src/utils/throttle.rs Normal file
View file

@ -0,0 +1,123 @@
use std::error::Error;
use std::fmt::{Debug, Display, Formatter, Result as FmtResult};
use std::time::{Duration, Instant};
use std::ops::Deref;
use std::sync::Mutex;
use futures::{Future, Poll};
use futures::future::{Shared, SharedError, SharedItem};
use tokio_service::Service;
pub struct Throttle<S: Service<Request=()>> {
inner: S,
duration: Duration,
current: Mutex<Option<(Instant, Shared<S::Future>)>>
}
impl<S> Debug for Throttle<S>
where S: Service<Request=()> + Debug,
{
fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
fmt.debug_struct("Throttle")
.field("inner", &self.inner)
.field("duration", &self.duration)
.finish()
}
}
impl<S: Service<Request=()>> Throttle<S> {
pub fn new(service: S, duration: Duration) -> Throttle<S> {
Throttle {
inner: service,
duration,
current: Mutex::new(None)
}
}
}
impl<S: Service<Request=()>> Service for Throttle<S> {
type Request = ();
type Response = ThrottledItem<S::Response>;
type Error = ThrottledError<S::Error>;
type Future = Throttled<S::Future>;
fn call(&self, _: ()) -> Self::Future {
let now = Instant::now();
let mut current = self.current.lock().expect("lock poisoned");
if let Some((valid_until, ref shared_future)) = *current {
if valid_until > now {
if let Some(Ok(_)) = shared_future.peek() {
return Throttled(shared_future.clone());
}
}
}
let shared_future = self.inner.call(()).shared();
*current = Some((now + self.duration, shared_future.clone()));
Throttled(shared_future)
}
}
pub struct Throttled<F: Future>(Shared<F>);
impl<F> Debug for Throttled<F>
where F: Future + Debug,
F::Item: Debug,
F::Error: Debug
{
fn fmt(&self, fmt: &mut Formatter) -> FmtResult {
self.0.fmt(fmt)
}
}
impl<F: Future> Future for Throttled<F> {
type Item = ThrottledItem<F::Item>;
type Error = ThrottledError<F::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.0.poll()
.map_err(ThrottledError)
.map(|async| async.map(ThrottledItem))
}
}
#[derive(Debug)]
pub struct ThrottledItem<T>(SharedItem<T>);
impl<T> Deref for ThrottledItem<T> {
type Target = T;
fn deref(&self) -> &T {
&self.0.deref()
}
}
#[derive(Debug)]
pub struct ThrottledError<E>(SharedError<E>);
impl<E> Deref for ThrottledError<E> {
type Target = E;
fn deref(&self) -> &E {
&self.0.deref()
}
}
impl<E> Display for ThrottledError<E>
where E: Display,
{
fn fmt(&self, f: &mut Formatter) -> FmtResult {
self.0.fmt(f)
}
}
impl<E> Error for ThrottledError<E>
where E: Error,
{
fn description(&self) -> &str {
self.0.description()
}
fn cause(&self) -> Option<&Error> {
self.0.cause()
}
}