We need to add the following dependencies:
cargo add tokio --features full
cargo add warp
cargo add serde_json
cargo add serde --features derive
cargo add bytes
Using full
as feature in tokio
is a bit lazy, but we do it here
for the sake of simplicity. Furthermore, we do not bother with TLS
for now. We will, however, use asynchronous handler functions
instead of synchronous closures, since this is more realistic for
larger projects.
use bytes::Bytes;
use serde::{Serialize, Deserialize};
use std::net::IpAddr;
use warp::{Filter, Rejection};
use warp::http::StatusCode;
#[derive(Debug, Serialize, Deserialize)]
struct Person {
name: String,
age: i32,
}
#[derive(Debug, Serialize, Deserialize)]
struct ErrorBody {
error_code: i32,
error_message: String,
}
async fn get_handler(name: String) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
println!("Got GET request for name {}", name);
let p = Person{name, age: 54};
Ok(warp::reply::with_status(warp::reply::json(&p), StatusCode::OK))
}
async fn post_handler(body: Bytes) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
let p : Person = serde_json::from_slice(&body[..]).unwrap();
println!("Got POST for Person: {:?}", p);
Ok(warp::reply::with_status(warp::reply::json(&p), StatusCode::OK))
}
#[tokio::main]
async fn main() {
let api_get =
warp::path!("api" / "person" / String)
.and(warp::get())
.and_then(get_handler);
let api_post =
warp::path!("api" / "person")
.and(warp::post())
.and(warp::body::bytes())
.and_then(post_handler);
let api = api_get.or(api_post);
let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
warp::serve(api).run((ip_addr, 8000)).await;
}
use bytes::Bytes;
use serde::{Serialize, Deserialize};
use std::net::IpAddr;
use warp::{Filter, Rejection};
use warp::http::StatusCode;
#[derive(Debug, Serialize, Deserialize)]
struct Person {
name: String,
age: i32,
}
#[derive(Debug, Serialize, Deserialize)]
struct ErrorBody {
error_code: i32,
error_message: String,
}
async fn get_handler(name: String) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
println!("Got GET request for name {}", name);
let p = Person{name, age: 54};
Ok(warp::reply::with_status(warp::reply::json(&p), StatusCode::OK))
}
async fn post_handler(body: Bytes) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
let r : serde_json::Result<Person> = serde_json::from_slice(&body[..]);
match r {
Ok(p) => {
println!("Got POST for Person: {:?}", p);
Ok(warp::reply::with_status(warp::reply::json(&p),
StatusCode::OK))
},
Err(e) => {
println!("Got bad POST for Person: {:?}", e);
let eb = ErrorBody{
error_code: 1,
error_message: format!("{:?}", e),
};
Ok(warp::reply::with_status(warp::reply::json(&eb),
StatusCode::BAD_REQUEST))
}
}
}
#[tokio::main]
async fn main() {
let api_get =
warp::path!("api" / "person" / String)
.and(warp::get())
.and_then(get_handler);
let api_post =
warp::path!("api" / "person")
.and(warp::post())
.and(warp::body::bytes())
.and_then(post_handler);
let api = api_get.or(api_post);
let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
warp::serve(api).run((ip_addr, 8000)).await;
}
## Rejections and centralized error handling
```rust
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::net::IpAddr;
use warp::http::StatusCode;
use warp::{Filter, Rejection};
#[derive(Debug, Serialize, Deserialize)]
struct Person {
name: String,
age: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ErrorBody {
error_code: u16,
error_message: String,
}
impl warp::reject::Reject for ErrorBody {}
async fn get_handler(
name: String,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
println!("Got GET request for name {}", name);
let p = Person { name, age: 54 };
Ok(warp::reply::with_status(
warp::reply::json(&p),
StatusCode::OK,
))
}
async fn post_handler(
body: Bytes,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
let r: serde_json::Result<Person> = serde_json::from_slice(&body[..]);
match r {
Ok(p) => {
println!("Got POST for Person: {:?}", p);
Ok(warp::reply::with_status(
warp::reply::json(&p),
StatusCode::OK,
))
}
Err(e) => {
println!("Got bad POST for Person: {:?}", e);
Err(warp::reject::custom(ErrorBody {
error_code: 1,
error_message: format!("{:?}", e),
}))
}
}
}
pub async fn handle_errors(err: Rejection) -> Result<impl warp::Reply, Infallible> {
let code: StatusCode;
let eb: ErrorBody;
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
eb = ErrorBody {
error_code: 404,
error_message: "NOT_FOUND".to_string(),
};
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: "METHOD_NOT_ALLOWED".to_string(),
};
} else if let Some(e) = err.find::<ErrorBody>() {
code = StatusCode::UNAUTHORIZED;
eb = e.clone();
} else {
code = StatusCode::INTERNAL_SERVER_ERROR;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: "INTERNAL_SERVER_ERROR".to_string(),
};
}
Ok(warp::reply::with_status(warp::reply::json(&eb), code))
}
#[tokio::main]
async fn main() {
let api_get = warp::path!("api" / "person" / String)
.and(warp::get())
.and_then(get_handler);
let api_post = warp::path!("api" / "person")
.and(warp::post())
.and(warp::body::bytes())
.and_then(post_handler);
let api = api_get.or(api_post).recover(handle_errors);
let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
warp::serve(api).run((ip_addr, 8000)).await;
}
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::{Arc, RwLock};
use warp::http::StatusCode;
use warp::{Filter, Rejection};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Person {
name: String,
age: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ErrorBody {
error_code: u16,
error_message: String,
}
impl warp::reject::Reject for ErrorBody {}
struct PersonBase {
map: HashMap<String, Person>,
}
fn with_person_base(
pb: Arc<RwLock<PersonBase>>,
) -> impl Filter<Extract = (Arc<RwLock<PersonBase>>,), Error = Infallible> + Clone {
warp::any().map(move || pb.clone())
}
async fn get_handler(
name: String,
pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
println!("Got GET request for name {}", name);
let guard = pb.read().unwrap();
let p = guard.map.get(&name);
match p {
Some(pp) => Ok(warp::reply::with_status(
warp::reply::json(&pp),
StatusCode::OK,
)),
None => {
let err = ErrorBody {
error_code: 2,
error_message: format!("No person with name {name} in database!"),
};
Ok(warp::reply::with_status(
warp::reply::json(&err),
StatusCode::NOT_FOUND,
))
}
}
}
async fn post_handler(
body: Bytes,
pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
let r: serde_json::Result<Person> = serde_json::from_slice(&body[..]);
match r {
Ok(p) => {
println!("Got POST for Person: {:?}", p);
let mut guard = pb.write().unwrap();
let p_db = guard.map.get(&p.name);
match p_db {
None => {
guard.map.insert(p.name.clone(), p.clone());
Ok(warp::reply::with_status(
warp::reply::json(&p),
StatusCode::OK,
))
}
Some(_) => {
let err = ErrorBody {
error_code: 3,
error_message: format!("Person with name {} already in database!", p.name),
};
Ok(warp::reply::with_status(
warp::reply::json(&err),
StatusCode::CONFLICT,
))
}
}
}
Err(e) => {
println!("Got bad POST for Person: {:?}", e);
Err(warp::reject::custom(ErrorBody {
error_code: 1,
error_message: format!("{:?}", e),
}))
}
}
}
pub async fn handle_errors(err: Rejection) -> Result<impl warp::Reply, Infallible> {
let code: StatusCode;
let eb: ErrorBody;
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
eb = ErrorBody {
error_code: 404,
error_message: "NOT_FOUND".to_string(),
};
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: "METHOD_NOT_ALLOWED".to_string(),
};
} else if let Some(e) = err.find::<ErrorBody>() {
code = StatusCode::BAD_REQUEST;
eb = e.clone();
} else {
code = StatusCode::INTERNAL_SERVER_ERROR;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: "INTERNAL_SERVER_ERROR".to_string(),
};
}
Ok(warp::reply::with_status(warp::reply::json(&eb), code))
}
#[tokio::main]
async fn main() {
let person_base = Arc::new(RwLock::new(PersonBase {
map: HashMap::new(),
}));
let api_get = warp::path!("api" / "person" / String)
.and(warp::get())
.and(with_person_base(person_base.clone()))
.and_then(get_handler);
let api_post = warp::path!("api" / "person")
.and(warp::post())
.and(warp::body::bytes())
.and(with_person_base(person_base.clone()))
.and_then(post_handler);
let api = api_get.or(api_post).recover(handle_errors);
let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
warp::serve(api).run((ip_addr, 8000)).await;
}
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::{Arc, RwLock};
use warp::http::StatusCode;
use warp::{Filter, Rejection};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Person {
name: String,
age: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ErrorBody {
error_code: u16,
error_message: String,
}
impl warp::reject::Reject for ErrorBody {}
#[derive(Debug)]
struct Unauthorized {
error: String,
}
impl warp::reject::Reject for Unauthorized {}
struct PersonBase {
map: HashMap<String, Person>,
}
fn with_person_base(
pb: Arc<RwLock<PersonBase>>,
) -> impl Filter<Extract = (Arc<RwLock<PersonBase>>,), Error = Infallible> + Clone {
warp::any().map(move || pb.clone())
}
fn authorize(auth: &Option<String>) -> Result<(), Rejection> {
match auth {
Some(auth) => {
println!("Auth header:{}<<<", auth);
// Put your authentication here...
if auth == "xxx" {
return Err(warp::reject::custom(Unauthorized {
error: "Bad header!".to_string(),
}));
}
}
None => {
return Err(warp::reject::custom(Unauthorized {
error: "Missing auth header!".to_string(),
}));
}
}
Ok(())
}
async fn get_handler(
name: String,
auth: Option<String>,
pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
authorize(&auth)?;
println!("Got GET request for name {}", name);
let guard = pb.read().unwrap();
let p = guard.map.get(&name);
match p {
Some(pp) => Ok(warp::reply::with_status(
warp::reply::json(&pp),
StatusCode::OK,
)),
None => {
let err = ErrorBody {
error_code: 2,
error_message: format!("No person with name {name} in database!"),
};
Ok(warp::reply::with_status(
warp::reply::json(&err),
StatusCode::NOT_FOUND,
))
}
}
}
async fn post_handler(
auth: Option<String>,
body: Bytes,
pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
authorize(&auth)?;
let r: serde_json::Result<Person> = serde_json::from_slice(&body[..]);
match r {
Ok(p) => {
println!("Got POST for Person: {:?}", p);
let mut guard = pb.write().unwrap();
let p_db = guard.map.get(&p.name);
match p_db {
None => {
guard.map.insert(p.name.clone(), p.clone());
Ok(warp::reply::with_status(
warp::reply::json(&p),
StatusCode::OK,
))
}
Some(_) => {
let err = ErrorBody {
error_code: 3,
error_message: format!("Person with name {} already in database!", p.name),
};
Ok(warp::reply::with_status(
warp::reply::json(&err),
StatusCode::CONFLICT,
))
}
}
}
Err(e) => {
println!("Got bad POST for Person: {:?}", e);
Err(warp::reject::custom(ErrorBody {
error_code: 1,
error_message: format!("{:?}", e),
}))
}
}
}
pub async fn handle_errors(err: Rejection) -> Result<impl warp::Reply, Infallible> {
let code: StatusCode;
let eb: ErrorBody;
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
eb = ErrorBody {
error_code: 404,
error_message: "NOT_FOUND".to_string(),
};
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: "METHOD_NOT_ALLOWED".to_string(),
};
} else if let Some(e) = err.find::<Unauthorized>() {
code = StatusCode::UNAUTHORIZED;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: e.error.clone(),
};
} else if let Some(e) = err.find::<ErrorBody>() {
code = StatusCode::BAD_REQUEST;
eb = e.clone();
} else {
code = StatusCode::INTERNAL_SERVER_ERROR;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: "INTERNAL_SERVER_ERROR".to_string(),
};
}
Ok(warp::reply::with_status(warp::reply::json(&eb), code))
}
#[tokio::main]
async fn main() {
let person_base = Arc::new(RwLock::new(PersonBase {
map: HashMap::new(),
}));
let api_get = warp::path!("api" / "person" / String)
.and(warp::get())
.and(warp::filters::header::optional::<String>("authorization"))
.and(with_person_base(person_base.clone()))
.and_then(get_handler);
let api_post = warp::path!("api" / "person")
.and(warp::post())
.and(warp::filters::header::optional::<String>("authorization"))
.and(warp::body::bytes())
.and(with_person_base(person_base.clone()))
.and_then(post_handler);
let api = api_get.or(api_post).recover(handle_errors);
let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
warp::serve(api).run((ip_addr, 8000)).await;
}
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::IpAddr;
use std::sync::{Arc, Mutex, RwLock};
use tokio::sync::oneshot;
use warp::http::StatusCode;
use warp::{Filter, Rejection};
#[derive(Debug, Serialize, Deserialize, Clone)]
struct Person {
name: String,
age: i32,
}
#[derive(Debug, Serialize, Deserialize, Clone)]
struct ErrorBody {
error_code: u16,
error_message: String,
}
impl warp::reject::Reject for ErrorBody {}
#[derive(Debug)]
struct Unauthorized {
error: String,
}
impl warp::reject::Reject for Unauthorized {}
struct PersonBase {
map: HashMap<String, Person>,
}
fn with_person_base(
pb: Arc<RwLock<PersonBase>>,
) -> impl Filter<Extract = (Arc<RwLock<PersonBase>>,), Error = Infallible> + Clone {
warp::any().map(move || pb.clone())
}
fn authorize(auth: &Option<String>) -> Result<(), Rejection> {
match auth {
Some(auth) => {
println!("Auth header:{}<<<", auth);
// Put your authentication here...
if auth == "xxx" {
return Err(warp::reject::custom(Unauthorized {
error: "Bad header!".to_string(),
}));
}
}
None => {
return Err(warp::reject::custom(Unauthorized {
error: "Missing auth header!".to_string(),
}));
}
}
Ok(())
}
async fn shutdown_handler(
auth: Option<String>,
tx_arc: Arc<Mutex<Option<tokio::sync::oneshot::Sender<()>>>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
authorize(&auth)?;
let mut tx = tx_arc.lock().unwrap();
let tx_out = tx.take();
if let Some(tx) = tx_out {
tx.send(()).unwrap();
}
Ok(warp::reply::with_status(
warp::reply::json(&ErrorBody {
error_code: 0,
error_message: "".to_string(),
}),
StatusCode::OK,
))
}
async fn get_handler(
name: String,
auth: Option<String>,
pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
authorize(&auth)?;
println!("Got GET request for name {}", name);
let guard = pb.read().unwrap();
let p = guard.map.get(&name);
match p {
Some(pp) => Ok(warp::reply::with_status(
warp::reply::json(&pp),
StatusCode::OK,
)),
None => {
let err = ErrorBody {
error_code: 2,
error_message: format!("No person with name {name} in database!"),
};
Ok(warp::reply::with_status(
warp::reply::json(&err),
StatusCode::NOT_FOUND,
))
}
}
}
async fn post_handler(
auth: Option<String>,
body: Bytes,
pb: Arc<RwLock<PersonBase>>,
) -> Result<warp::reply::WithStatus<warp::reply::Json>, Rejection> {
authorize(&auth)?;
let r: serde_json::Result<Person> = serde_json::from_slice(&body[..]);
match r {
Ok(p) => {
println!("Got POST for Person: {:?}", p);
let mut guard = pb.write().unwrap();
let p_db = guard.map.get(&p.name);
match p_db {
None => {
guard.map.insert(p.name.clone(), p.clone());
Ok(warp::reply::with_status(
warp::reply::json(&p),
StatusCode::OK,
))
}
Some(_) => {
let err = ErrorBody {
error_code: 3,
error_message: format!("Person with name {} already in database!", p.name),
};
Ok(warp::reply::with_status(
warp::reply::json(&err),
StatusCode::CONFLICT,
))
}
}
}
Err(e) => {
println!("Got bad POST for Person: {:?}", e);
Err(warp::reject::custom(ErrorBody {
error_code: 1,
error_message: format!("{:?}", e),
}))
}
}
}
pub async fn handle_errors(err: Rejection) -> Result<impl warp::Reply, Infallible> {
let code: StatusCode;
let eb: ErrorBody;
if err.is_not_found() {
code = StatusCode::NOT_FOUND;
eb = ErrorBody {
error_code: 404,
error_message: "NOT_FOUND".to_string(),
};
} else if let Some(_) = err.find::<warp::reject::MethodNotAllowed>() {
code = StatusCode::METHOD_NOT_ALLOWED;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: "METHOD_NOT_ALLOWED".to_string(),
};
} else if let Some(e) = err.find::<Unauthorized>() {
code = StatusCode::UNAUTHORIZED;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: e.error.clone(),
};
} else if let Some(e) = err.find::<ErrorBody>() {
code = StatusCode::BAD_REQUEST;
eb = e.clone();
} else {
code = StatusCode::INTERNAL_SERVER_ERROR;
eb = ErrorBody {
error_code: code.as_u16(),
error_message: "INTERNAL_SERVER_ERROR".to_string(),
};
}
Ok(warp::reply::with_status(warp::reply::json(&eb), code))
}
#[tokio::main]
async fn main() {
let person_base = Arc::new(RwLock::new(PersonBase {
map: HashMap::new(),
}));
let (tx, rx) = oneshot::channel::<()>();
let tx_arc = Arc::new(Mutex::new(Some(tx)));
let api_shutdown = warp::path!("api" / "shutdown")
.and(warp::delete())
.and(warp::filters::header::optional::<String>("authorization"))
.and(warp::any().map(move || tx_arc.clone()))
.and_then(shutdown_handler);
let api_get = warp::path!("api" / "person" / String)
.and(warp::get())
.and(warp::filters::header::optional::<String>("authorization"))
.and(with_person_base(person_base.clone()))
.and_then(get_handler);
let api_post = warp::path!("api" / "person")
.and(warp::post())
.and(warp::filters::header::optional::<String>("authorization"))
.and(warp::body::bytes())
.and(with_person_base(person_base.clone()))
.and_then(post_handler);
let api = api_shutdown.or(api_get).or(api_post).recover(handle_errors);
let ip_addr: IpAddr = "0.0.0.0".parse().unwrap();
let (_addr, server) = warp::serve(api).bind_with_graceful_shutdown((ip_addr, 8000), async {
rx.await.ok();
});
let j = tokio::task::spawn(server);
j.await.unwrap();
}
- Warp: https://github.com/seanmonstar/warp
- Warp documentation: https://docs.rs/warp/latest/warp/
Arc
: https://doc.rust-lang.org/std/sync/struct.Arc.htmlRwLock
: https://doc.rust-lang.org/std/sync/struct.RwLock.html- tokio: https://tokio.rs/
- Video: https://youtu.be/lRqxRprCDGU
- Overview: https://gist.github.com/max-itzpapalotl/18f7675a60f6f9603250367bcb63992e