bug: #15 fix fragmented TCPStream + spawn a tokio task on each connection
This commit is contained in:
parent
6166310283
commit
7336933642
@ -6,72 +6,48 @@ use crate::config::Config;
|
|||||||
use crate::jwt::JWTSigner;
|
use crate::jwt::JWTSigner;
|
||||||
use crate::stores::FileStore;
|
use crate::stores::FileStore;
|
||||||
use crate::stores::Store;
|
use crate::stores::Store;
|
||||||
use lazy_static::lazy_static;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::future::Future;
|
|
||||||
use std::pin::Pin;
|
|
||||||
|
|
||||||
type FuturePinned<HTTPResponse> = Pin<Box<dyn Future<Output = HTTPResponse>>>;
|
async fn handle_get(request: HTTPRequest, config: Config) -> HTTPResponse {
|
||||||
type Handler = fn(HTTPRequest, Config) -> FuturePinned<HTTPResponse>;
|
let mut store = FileStore::new(config.filestore_path.clone());
|
||||||
|
match &request.body {
|
||||||
|
Some(ref b) => {
|
||||||
|
let is_auth = store.is_auth(&b.get_data()).await;
|
||||||
|
if !is_auth {
|
||||||
|
return HTTPResponse::as_403();
|
||||||
|
}
|
||||||
|
|
||||||
fn handle_get(request: HTTPRequest, config: Config) -> FuturePinned<HTTPResponse> {
|
let jwt_signer = {
|
||||||
Box::pin(async move {
|
match JWTSigner::new(config).await {
|
||||||
let mut store = FileStore::new(config.filestore_path.clone());
|
Ok(s) => s,
|
||||||
match &request.body {
|
|
||||||
Some(ref b) => {
|
|
||||||
let is_auth = store.is_auth(&b.get_data()).await;
|
|
||||||
if !is_auth {
|
|
||||||
return HTTPResponse::as_403();
|
|
||||||
}
|
|
||||||
|
|
||||||
let jwt_signer = {
|
|
||||||
match JWTSigner::new(config).await {
|
|
||||||
Ok(s) => s,
|
|
||||||
Err(e) => {
|
|
||||||
let message = HTTPMessage::error(&e);
|
|
||||||
return HTTPResponse::as_500(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
match jwt_signer.sign() {
|
|
||||||
Ok(t) => HTTPResponse::send_token(&t),
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let message = HTTPMessage::error(&e);
|
let message = HTTPMessage::error(&e);
|
||||||
return HTTPResponse::as_500(message);
|
return HTTPResponse::as_500(message);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match jwt_signer.sign() {
|
||||||
|
Ok(t) => HTTPResponse::send_token(&t),
|
||||||
|
Err(e) => {
|
||||||
|
let message = HTTPMessage::error(&e);
|
||||||
|
return HTTPResponse::as_500(message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
None => HTTPResponse::as_400(),
|
|
||||||
}
|
}
|
||||||
})
|
None => HTTPResponse::as_400(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// validates the token by checking:
|
/// validates the token by checking:
|
||||||
/// * expiration time
|
/// * expiration time
|
||||||
fn handle_validate(request: HTTPRequest, _config: Config) -> FuturePinned<HTTPResponse> {
|
async fn handle_validate(request: HTTPRequest, _config: Config) -> HTTPResponse {
|
||||||
Box::pin(async move {
|
match &request.body {
|
||||||
match &request.body {
|
Some(ref _b) => {
|
||||||
Some(ref _b) => {
|
// TODO: impl the JWT validation
|
||||||
// TODO: impl the JWT validation
|
HTTPResponse::send_token("header.payload.signature")
|
||||||
HTTPResponse::send_token("header.payload.signature")
|
|
||||||
}
|
|
||||||
None => HTTPResponse::as_400(),
|
|
||||||
}
|
}
|
||||||
})
|
None => HTTPResponse::as_400(),
|
||||||
}
|
}
|
||||||
|
|
||||||
lazy_static! {
|
|
||||||
/// defines the map between the URL and its associated callback
|
|
||||||
/// each authorized targets must implement a function returning `FuturePinned<HTTPResponse>`
|
|
||||||
// TODO: a macro should be implemented to mask the implementation details
|
|
||||||
static ref HTTP_METHODS: HashMap<&'static str, Handler> =
|
|
||||||
HashMap::from(
|
|
||||||
[
|
|
||||||
("/get/", handle_get as Handler),
|
|
||||||
("/validate/", handle_validate as Handler)
|
|
||||||
]
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Router;
|
pub struct Router;
|
||||||
@ -81,9 +57,10 @@ impl Router {
|
|||||||
let request = HTTPRequest::from(request_str);
|
let request = HTTPRequest::from(request_str);
|
||||||
let target = request.start_line.get_target();
|
let target = request.start_line.get_target();
|
||||||
|
|
||||||
match HTTP_METHODS.get(target.as_str()) {
|
match target.as_str() {
|
||||||
Some(f) => f(request, config).await,
|
"/get/" => handle_get(request, config).await,
|
||||||
None => HTTPResponse::as_404(),
|
"/validate/" => handle_validate(request, config).await,
|
||||||
|
_ => HTTPResponse::as_404(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
23
src/main.rs
23
src/main.rs
@ -8,6 +8,7 @@ use configparser::ini::Ini;
|
|||||||
use tokio::{
|
use tokio::{
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
net::{TcpListener, TcpStream},
|
net::{TcpListener, TcpStream},
|
||||||
|
time::{timeout, Duration},
|
||||||
};
|
};
|
||||||
|
|
||||||
use config::Config;
|
use config::Config;
|
||||||
@ -58,16 +59,32 @@ async fn main() {
|
|||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, _) = listener.accept().await.unwrap();
|
let (stream, _) = listener.accept().await.unwrap();
|
||||||
handle_connection(stream, router_config.clone()).await;
|
let conf = router_config.clone();
|
||||||
|
tokio::spawn(handle_connection(stream, conf.clone()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// parses the incoming request (partial spec implementation) and build an HTTP response
|
/// parses the incoming request (partial spec implementation) and build an HTTP response
|
||||||
async fn handle_connection(mut stream: TcpStream, config: Config) {
|
async fn handle_connection(mut stream: TcpStream, config: Config) {
|
||||||
|
let mut message = vec![];
|
||||||
let mut buffer: [u8; 1024] = [0; 1024];
|
let mut buffer: [u8; 1024] = [0; 1024];
|
||||||
let n = stream.read(&mut buffer).await.unwrap();
|
|
||||||
|
|
||||||
let request_string = std::str::from_utf8(&buffer[0..n]).unwrap();
|
let duration = Duration::from_micros(500);
|
||||||
|
|
||||||
|
// loop until the message is read
|
||||||
|
// the stream can be fragmented so, using a timout (500um should be enough) for the future for completion
|
||||||
|
// after the timeout, the message is "considered" as entirely read
|
||||||
|
loop {
|
||||||
|
match timeout(duration, stream.read(&mut buffer)).await {
|
||||||
|
Ok(v) => {
|
||||||
|
let n = v.unwrap();
|
||||||
|
message.extend_from_slice(&buffer[0..n]);
|
||||||
|
}
|
||||||
|
Err(_e) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let request_string = std::str::from_utf8(&message).unwrap();
|
||||||
let response = ROUTER.route(request_string, config).await;
|
let response = ROUTER.route(request_string, config).await;
|
||||||
let response_str: String = response.into();
|
let response_str: String = response.into();
|
||||||
|
|
||||||
|
|||||||
@ -5,7 +5,7 @@ from datetime import datetime
|
|||||||
|
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
|
|
||||||
URL = "http://localhost:9001"
|
URL = "http://127.0.0.1:9001"
|
||||||
|
|
||||||
|
|
||||||
class TestResponse(TestCase):
|
class TestResponse(TestCase):
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user