Commit 482e147b authored by boxdot's avatar boxdot Committed by Ben Boeckel
Browse files

Implement AsyncQuery for Endpoint, Ignore, Paged, Raw.

parent 7fb170ef
......@@ -6,10 +6,11 @@
use std::borrow::Cow;
use async_trait::async_trait;
use http::{self, header, Method, Request};
use serde::de::DeserializeOwned;
use crate::api::{query, ApiError, BodyError, Client, Query, QueryParams};
use crate::api::{query, ApiError, AsyncClient, AsyncQuery, BodyError, Client, Query, QueryParams};
/// A trait for providing the necessary information for a single REST API endpoint.
pub trait Endpoint {
......@@ -61,6 +62,37 @@ where
}
}
#[async_trait]
impl<E, T, C> AsyncQuery<T, C> for E
where
E: Endpoint + Sync,
T: DeserializeOwned + 'static,
C: AsyncClient + Sync,
{
async fn query_async(&self, client: &C) -> Result<T, ApiError<C::Error>> {
let mut url = client.rest_endpoint(&self.endpoint())?;
self.parameters().add_to_url(&mut url);
let req = Request::builder()
.method(self.method())
.uri(query::url_to_http_uri(url));
let (req, data) = if let Some((mime, data)) = self.body()? {
let req = req.header(header::CONTENT_TYPE, mime);
(req, data)
} else {
(req, Vec::new())
};
let rsp = client.rest_async(req, data).await?;
let status = rsp.status();
let v = serde_json::from_slice(rsp.body())?;
if !status.is_success() {
return Err(ApiError::from_gitlab(v));
}
serde_json::from_value::<T>(v).map_err(ApiError::data_type::<T>)
}
}
#[cfg(test)]
mod tests {
use http::StatusCode;
......
......@@ -4,9 +4,10 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use async_trait::async_trait;
use http::{header, Request};
use crate::api::{query, ApiError, Client, Endpoint, Query};
use crate::api::{query, ApiError, AsyncClient, AsyncQuery, Client, Endpoint, Query};
/// A query modifier that ignores the data returned from an endpoint.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
......@@ -49,6 +50,35 @@ where
}
}
#[async_trait]
impl<E, C> AsyncQuery<(), C> for Ignore<E>
where
E: Endpoint + Sync,
C: AsyncClient + Sync,
{
async fn query_async(&self, client: &C) -> Result<(), ApiError<C::Error>> {
let mut url = client.rest_endpoint(&self.endpoint.endpoint())?;
self.endpoint.parameters().add_to_url(&mut url);
let req = Request::builder()
.method(self.endpoint.method())
.uri(query::url_to_http_uri(url));
let (req, data) = if let Some((mime, data)) = self.endpoint.body()? {
let req = req.header(header::CONTENT_TYPE, mime);
(req, data)
} else {
(req, Vec::new())
};
let rsp = client.rest_async(req, data).await?;
if !rsp.status().is_success() {
let v = serde_json::from_slice(rsp.body())?;
return Err(ApiError::from_gitlab(v));
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use http::StatusCode;
......
......@@ -4,12 +4,15 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use http::{header, HeaderMap, Request};
use serde::de::DeserializeOwned;
use thiserror::Error;
use url::Url;
use crate::api::{query, ApiError, Client, Endpoint, Query};
use crate::api::{query, ApiError, AsyncClient, AsyncQuery, Client, Endpoint, Query};
/// Errors which may occur with pagination.
#[non_exhaustive]
......@@ -262,6 +265,104 @@ where
}
}
#[async_trait]
impl<E, T, C> AsyncQuery<Vec<T>, C> for Paged<E>
where
E: Endpoint + Sync,
E: Pageable,
T: DeserializeOwned + Send,
C: AsyncClient + Sync,
{
async fn query_async(&self, client: &C) -> Result<Vec<T>, ApiError<C::Error>> {
let url = {
let mut url = client.rest_endpoint(&self.endpoint.endpoint())?;
self.endpoint.parameters().add_to_url(&mut url);
url
};
let mut page_num = 1;
let per_page = self.pagination.page_limit();
let per_page_str = format!("{}", per_page);
let results = Arc::new(Mutex::new(Vec::new()));
let mut next_url = None;
let use_keyset_pagination = self.endpoint.use_keyset_pagination();
let body = self.endpoint.body()?;
loop {
let page_url = if let Some(url) = next_url.take() {
url
} else {
let page_str = format!("{}", page_num);
let mut page_url = url.clone();
{
let mut pairs = page_url.query_pairs_mut();
pairs.append_pair("per_page", &per_page_str);
if use_keyset_pagination {
pairs.append_pair("pagination", "keyset");
} else {
pairs.append_pair("page", &page_str);
}
}
page_url
};
let req = Request::builder()
.method(self.endpoint.method())
.uri(query::url_to_http_uri(page_url));
let (req, data) = if let Some((mime, data)) = body.as_ref() {
let req = req.header(header::CONTENT_TYPE, *mime);
(req, data.clone())
} else {
(req, Vec::new())
};
let rsp = client.rest_async(req, data).await?;
let status = rsp.status();
if use_keyset_pagination {
next_url = next_page_from_headers(rsp.headers())?;
}
let v = serde_json::from_slice(rsp.body())?;
if !status.is_success() {
return Err(ApiError::from_gitlab(v));
}
let page =
serde_json::from_value::<Vec<T>>(v).map_err(ApiError::data_type::<Vec<T>>)?;
let page_len = page.len();
// Gitlab used to have issues returning paginated results; these have been fixed since,
// but if it is needed, the bug manifests as Gitlab returning *all* results instead of
// just the requested results. This can cause an infinite loop here if the number of
// total results is exactly equal to `per_page`.
let is_last_page = {
let mut locked_results = results.lock().expect("poisoned results");
locked_results.extend(page);
self.pagination.is_last_page(page_len, &locked_results)
};
if is_last_page {
break;
}
if use_keyset_pagination {
if next_url.is_none() {
break;
}
} else {
page_num += 1;
}
}
let mut locked_results = results.lock().expect("poisoned results");
Ok(std::mem::take(&mut locked_results))
}
}
fn next_page_from_headers(headers: &HeaderMap) -> Result<Option<Url>, PaginationError> {
let link_headers = headers.get_all(reqwest::header::LINK).iter();
// GitLab 14.0 will deprecate this header in preference for the W3C spec's `Link` header. Make
......@@ -277,7 +378,7 @@ fn next_page_from_headers(headers: &HeaderMap) -> Result<Option<Url>, Pagination
})
.collect::<Result<Vec<_>, PaginationError>>()?
.into_iter()
.filter_map(|header| {
.find_map(|header| {
let is_next_link = header
.params
.into_iter()
......@@ -289,7 +390,6 @@ fn next_page_from_headers(headers: &HeaderMap) -> Result<Option<Url>, Pagination
None
}
})
.next()
.transpose()
}
......
......@@ -4,9 +4,10 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use async_trait::async_trait;
use http::{header, Request};
use crate::api::{query, ApiError, Client, Endpoint, Query};
use crate::api::{query, ApiError, AsyncClient, AsyncQuery, Client, Endpoint, Query};
/// A query modifier that returns the raw data from the endpoint.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
......@@ -49,6 +50,35 @@ where
}
}
#[async_trait]
impl<E, C> AsyncQuery<Vec<u8>, C> for Raw<E>
where
E: Endpoint + Sync,
C: AsyncClient + Sync,
{
async fn query_async(&self, client: &C) -> Result<Vec<u8>, ApiError<C::Error>> {
let mut url = client.rest_endpoint(&self.endpoint.endpoint())?;
self.endpoint.parameters().add_to_url(&mut url);
let req = Request::builder()
.method(self.endpoint.method())
.uri(query::url_to_http_uri(url));
let (req, data) = if let Some((mime, data)) = self.endpoint.body()? {
let req = req.header(header::CONTENT_TYPE, mime);
(req, data)
} else {
(req, Vec::new())
};
let rsp = client.rest_async(req, data).await?;
if !rsp.status().is_success() {
let v = serde_json::from_slice(rsp.body())?;
return Err(ApiError::from_gitlab(v));
}
Ok(rsp.into_body().as_ref().into())
}
}
#[cfg(test)]
mod tests {
use http::StatusCode;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment