feat: added Logger and wired it up to warp

This commit is contained in:
meeg_leeto 2022-05-13 16:20:34 +01:00
parent d2109fce65
commit 2aff7bb416
1 changed files with 132 additions and 8 deletions

View File

@ -142,9 +142,9 @@ mod conf {
/// Configuration of logging by lonk.
pub struct LogRules {
/// Where to write error logs to. The file will be appended to.
error_log_file: PathBuf,
pub error_log_file: PathBuf,
/// Where to write access ogs to. The file will be appended to.
access_log_file: PathBuf,
pub access_log_file: PathBuf,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
@ -898,6 +898,104 @@ mod service {
}
}
}
/// Affine to logging
pub mod log {
use std::path::PathBuf;
use tokio::{
fs::OpenOptions,
io::{AsyncWriteExt, BufWriter},
sync,
};
/// A struct responsible for logging events, per messages received from
/// other processes.
pub struct Logger {
access_tx: sync::mpsc::UnboundedSender<String>,
error_tx: sync::mpsc::UnboundedSender<String>,
}
impl Logger {
pub fn from_log_rules(config: &crate::conf::LogRules) -> Self {
// Create the communication channels
let (access_tx, access_rx) = sync::mpsc::unbounded_channel::<String>();
let (error_tx, error_rx) = sync::mpsc::unbounded_channel::<String>();
// Start the logging tasks
tokio::spawn(Self::logging_task(
access_rx,
config.access_log_file.clone(),
));
tokio::spawn(Self::logging_task(error_rx, config.error_log_file.clone()));
// Done
Logger {
access_tx,
error_tx,
}
}
/// Log a message into the access log file.
///
/// Returns a Result with empty type; if posting the log message
/// failed for any reason, it's unlikely to recover, and the user
/// should decide either to stop logging, ignore these errors, or
/// halt the program.
pub fn access(&self, msg: String) -> Result<(), ()> {
self.access_tx.send(msg).map_err(|e| ())
}
/// Log a message into the error log file.
///
/// Returns a Result with empty type; if posting the log message
/// failed for any reason, it's unlikely to recover, and the user
/// should decide either to stop logging, ignore these errors, or
/// halt the program.
pub fn error(&self, msg: String) -> Result<(), ()> {
self.error_tx.send(msg).map_err(|e| ())
}
/// The task responsible for receiving the log messages and actually
/// writing them into the corresponding files. One task is created
/// for each target file.
async fn logging_task(mut rx: sync::mpsc::UnboundedReceiver<String>, into: PathBuf) {
// Open the log file in append mode
let file = OpenOptions::new().append(true).open(into.clone()).await;
if let Err(e) = file {
eprintln!(
concat!(
"Could not open {} for logging, with error:\n",
"{}\n",
"Future logging may result in errors."
),
into.clone().to_string_lossy(),
e
);
return;
}
let mut file = BufWriter::new(file.unwrap());
// Listen to the logging message channel
while let Some(log) = rx.recv().await {
let write_result = file.write(log.as_bytes()).await;
if let Err(e) = write_result {
eprintln!(
concat!(
"Error writing to {}!\n",
"{}\n",
"Continuing, but future logging may error again."
),
into.clone().to_string_lossy(),
e
)
}
}
// All logging tx channels were dropped, close this task
}
}
}
}
use service::*;
@ -1030,6 +1128,9 @@ async fn serve() {
}
};
// Create logger
let logger = log::Logger::from_log_rules(&config.log_rules);
// Create slug factory
let slug_factory = slug::SlugFactory::from_rules(config.slug_rules);
@ -1039,11 +1140,30 @@ async fn serve() {
db::SlugDatabase::from_client(client, config.db.expire_seconds)
};
// We leak the slug factory and the database, because we know that these
// will live forever, and want them to have 'static lifetime so that warp is
// happy.
// We leak the slug factory, the database, and the logger, because we know
// that these will live forever, and want them to have 'static lifetime so
// that warp is happy.
let slug_factory: &'static slug::SlugFactory = Box::leak(Box::new(slug_factory));
let db: &'static db::SlugDatabase = Box::leak(Box::new(db));
let logger: &'static log::Logger = Box::leak(Box::new(logger));
// Warp logging compatibility layer
let log = warp::log::custom(move |info| {
let log_msg = format!(
"{} requested {}/{}, replied with status {}",
info.remote_addr()
.map(|x| x.to_string())
.unwrap_or_else(|| "".to_string()),
info.path(),
info.method(),
info.status().as_u16(),
);
if info.status().is_client_error() || info.status().is_server_error() {
logger.error(log_msg).ok();
} else {
logger.access(log_msg).ok();
}
});
// POST /shorten/ with link in argument
let shorten = warp::post()
@ -1064,7 +1184,8 @@ async fn serve() {
.unwrap(),
Err((status, message)) => Response::builder().status(status).body(message).unwrap(),
}
});
})
.with(log);
// GET /l/:Slug
let link = warp::path("l")
@ -1078,12 +1199,15 @@ async fn serve() {
.unwrap(),
Err((status, message)) => Response::builder().status(status).body(message).unwrap(),
}
});
})
.with(log);
// GET /
// This should be the last thing matched, so that anything that doesn't
// match another filter will try to match a file.
let homepage = warp::get().and(config.serve_rules.dir.to_filter());
let homepage = warp::get()
.and(config.serve_rules.dir.to_filter())
.with(log);
let get_routes = warp::get().and(link.or(homepage));
let post_routes = warp::post().and(shorten);