feat: merge v1.0.0 from dev

This commit is contained in:
meeg_leeto 2022-05-13 17:21:49 +01:00
commit 5a9d4f6098
6 changed files with 269 additions and 34 deletions

1
.gitignore vendored
View File

@ -1 +1,2 @@
/target
data/log/*.log

2
Cargo.lock generated
View File

@ -838,7 +838,7 @@ dependencies = [
[[package]]
name = "lonk"
version = "0.1.0"
version = "1.0.0"
dependencies = [
"argh",
"async-object-pool",

View File

@ -1,6 +1,6 @@
[package]
name = "lonk"
version = "0.1.0"
version = "1.0.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

View File

@ -1,4 +1,5 @@
{
"version": 1,
"db": {
"address": "redis://redis:6379",
"expire_seconds": 259200
@ -7,6 +8,10 @@
"length": 5,
"chars": "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-"
},
"log_rules": {
"error_log_file": "/data/log/error.log",
"access_log_file": "/data/log/access.log"
},
"serve_rules": {
"dir": {
"Dir": "/data/served"

0
data/log/.gitinclude Normal file
View File

View File

@ -1,5 +1,4 @@
use argh::FromArgs;
use core::panic;
use validators::traits::ValidateString;
use warp::{http::Response, hyper::StatusCode, Filter};
@ -138,6 +137,15 @@ mod conf {
pub addr: ServeAddr,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
/// Configuration of logging by lonk.
pub struct LogRules {
/// Where to write error logs to. The file will be appended to.
pub error_log_file: PathBuf,
/// Where to write access ogs to. The file will be appended to.
pub access_log_file: PathBuf,
}
#[derive(Deserialize, Serialize, Debug, Clone)]
/// Configuration struct. This struct is a typed representation of the
/// configuration file, with each of the domain-specific configurations
@ -153,6 +161,8 @@ mod conf {
pub version: Option<usize>,
/// Configuration regarding the Redis database.
pub db: DbConfig,
/// Configuration regarding logging.
pub log_rules: LogRules,
/// Configuration regarding the types of (URL shorten) slugs produced.
pub slug_rules: SlugRules,
/// Configuration regarding where and how the HTTP server is served.
@ -171,6 +181,8 @@ mod conf {
ServeFileNotExists(PathBuf),
ServeDirNotDir(PathBuf),
ServeDirNotExists(PathBuf),
AccessLogDirectoryNotExists(PathBuf),
ErrorLogDirectoryNotExists(PathBuf),
}
impl Config {
@ -210,68 +222,128 @@ mod conf {
}
}
// Check access and error log parent directories
// - Access log file
let weak_canonical = normalize_path(&self.log_rules.access_log_file);
if let Some(parent) = weak_canonical.parent() {
if !parent.exists() {
return Err(ConfigParseError::AccessLogDirectoryNotExists(
parent.to_path_buf(),
));
}
}
// - Error log file
let weak_canonical = normalize_path(&self.log_rules.error_log_file);
if let Some(parent) = weak_canonical.parent() {
if !parent.exists() {
return Err(ConfigParseError::ErrorLogDirectoryNotExists(
parent.to_path_buf(),
));
}
}
Ok(self)
}
}
/// Yanked from the source of cargo. Weaker than canonicalize, because it
/// doesn't require the target file to exist.
fn normalize_path(path: &std::path::Path) -> PathBuf {
use std::path::*;
let mut components = path.components().peekable();
let mut ret = if let Some(c @ Component::Prefix(..)) = components.peek().cloned() {
components.next();
PathBuf::from(c.as_os_str())
} else {
PathBuf::new()
};
for component in components {
match component {
Component::Prefix(..) => unreachable!(),
Component::RootDir => {
ret.push(component.as_os_str());
}
Component::CurDir => {}
Component::ParentDir => {
ret.pop();
}
Component::Normal(c) => {
ret.push(c);
}
}
}
ret
}
impl ConfigParseError {
pub fn panic_with_message(self, config_file_name: &str) -> ! {
match self {
ConfigParseError::SerdeError(err) => match err.classify() {
serde_json::error::Category::Io => {
panic!("IO error when reading configuration file.")
eprintln!("IO error when reading configuration file.")
}
serde_json::error::Category::Syntax => panic!(
"Configuration file is syntactically incorrect.
See {}:line {}, column {}.",
serde_json::error::Category::Syntax => eprintln!(
concat!(
"Configuration file is syntactically incorrect.\n",
"See {}:{}:{}."
),
config_file_name,
err.line(),
err.column()
),
serde_json::error::Category::Data => panic!(
"Error deserializing configuration file; expected different data type.
See {}:line {}, column {}.",
serde_json::error::Category::Data => eprintln!(
concat!("Error deserializing configuration file; expected different data type.\n",
"See {}:{}:{}."),
config_file_name,
err.line(),
err.column()
),
serde_json::error::Category::Eof => {
panic!("Unexpected end of file when reading configuration file.")
eprintln!("Unexpected end of file when reading configuration file.")
}
},
ConfigParseError::OldVersion(old_version) => {
panic!(
"Configuration file has outdated version.
Expected version field to be {} but got {}.",
eprintln!(
concat!("Configuration file has outdated version.\n",
"Expected version field to be {} but got {}."),
old_version,
config_version()
);
}
ConfigParseError::ServeDirNotExists(dir) => {
panic!(
eprintln!(
"Configuration file indicates directory {} should be served, but it does not exist.",
dir.to_string_lossy()
)
}
ConfigParseError::ServeDirNotDir(dir) => {
panic!(
eprintln!(
"Configuration file indicates directory {} should be served, but it is not a directory.",
dir.to_string_lossy()
)
}
ConfigParseError::ServeFileNotExists(file) => {
panic!(
eprintln!(
"Configuration file indicates file {} should be served, but it does not exist.",
file.to_string_lossy()
)
}
ConfigParseError::ServeFileNotFile(file) => {
panic!(
eprintln!(
"Configuration file indicates file {} should be served, but it is not a file.",
file.to_string_lossy()
)
}
ConfigParseError::AccessLogDirectoryNotExists(dir) => {
eprintln!("Access log file should have parent directory {}, but this directory does not exist.", dir.to_string_lossy())
}
ConfigParseError::ErrorLogDirectoryNotExists(dir) => {
eprintln!("Error log file should have parent directory {}, but this directory does not exist.", dir.to_string_lossy())
}
}
std::process::exit(1);
}
}
@ -329,6 +401,15 @@ mod conf {
}
}
impl Default for LogRules {
fn default() -> Self {
Self {
error_log_file: "/etc/lonk/log/error.log".into(),
access_log_file: "/etc/lonk/log/access.log".into(),
}
}
}
impl Default for Config {
fn default() -> Self {
Self {
@ -336,6 +417,7 @@ mod conf {
db: Default::default(),
slug_rules: Default::default(),
serve_rules: Default::default(),
log_rules: Default::default(),
}
}
}
@ -606,10 +688,12 @@ mod service {
// Collision!
response_channel.send(AddResult::Fail).ok();
eprintln!(
"Collision for slug {}!
Slug space may have been exhausted.
If you see this message repeatedly,
consider increasing the slug size.",
concat!(
"Collision for slug {}!\n",
"Slug space may have been exhausted.\n",
"If you see this message repeatedly,",
"consider increasing the slug size."
),
slug_key
);
return;
@ -828,6 +912,116 @@ mod service {
}
}
}
/// Affine to logging
pub mod log {
use std::path::PathBuf;
use tokio::{fs::OpenOptions, io::AsyncWriteExt, sync};
/// A struct responsible for logging events, per messages received from
/// other processes.
pub struct Logger {
access_tx: sync::mpsc::UnboundedSender<String>,
error_tx: sync::mpsc::UnboundedSender<String>,
}
impl Logger {
pub fn from_log_rules(config: &crate::conf::LogRules) -> Self {
// Create the communication channels
let (access_tx, access_rx) = sync::mpsc::unbounded_channel::<String>();
let (error_tx, error_rx) = sync::mpsc::unbounded_channel::<String>();
// Start the logging tasks
tokio::spawn(Self::logging_task(
access_rx,
config.access_log_file.clone(),
));
tokio::spawn(Self::logging_task(error_rx, config.error_log_file.clone()));
// Done
Logger {
access_tx,
error_tx,
}
}
/// Log a message into the access log file.
///
/// Returns a Result with empty type; if posting the log message
/// failed for any reason, it's unlikely to recover, and the user
/// should decide either to stop logging, ignore these errors, or
/// halt the program.
pub fn access(&self, msg: String) -> Result<(), ()> {
self.access_tx.send(msg).map_err(|_| ())
}
/// Log a message into the error log file.
///
/// Returns a Result with empty type; if posting the log message
/// failed for any reason, it's unlikely to recover, and the user
/// should decide either to stop logging, ignore these errors, or
/// halt the program.
pub fn error(&self, msg: String) -> Result<(), ()> {
self.error_tx.send(msg).map_err(|_| ())
}
/// The task responsible for receiving the log messages and actually
/// writing them into the corresponding files. One task is created
/// for each target file.
async fn logging_task(mut rx: sync::mpsc::UnboundedReceiver<String>, into: PathBuf) {
// Open the log file in append mode
let file = OpenOptions::new()
.append(true)
.create(true)
.open(into.clone())
.await;
if let Err(e) = file {
eprintln!(
concat!(
"Could not open {} for logging, with error:\n",
"{}\n",
"Future logging may result in errors."
),
into.clone().to_string_lossy(),
e
);
return;
}
let mut file = file.unwrap();
// Listen to the logging message channel
while let Some(log) = rx.recv().await {
let write_result = file
.write_buf(
&mut format!(
"{} ",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("Bad system time")
.as_secs()
)
.as_bytes(),
)
.await
.and(file.write_buf(&mut log.as_bytes()).await);
if let Err(e) = write_result {
eprintln!(
concat!(
"Error writing to {}!\n",
"{}\n",
"Continuing, but future logging may error again."
),
into.clone().to_string_lossy(),
e
)
}
}
// All logging tx channels were dropped, close this task
}
}
}
}
use service::*;
@ -837,6 +1031,7 @@ async fn shorten(
slug_factory: &slug::SlugFactory,
db: &db::SlugDatabase,
b64str: &str,
logger: &log::Logger,
) -> Result<slug::Slug, (StatusCode, String)> {
// Parse the URL given by the user. It should arrive as a Base64 string,
// and anything other than this should cleanly result in an HTTP rejection.
@ -868,10 +1063,15 @@ async fn shorten(
// ...and attempt to insert it into the database.
// Failure to do so is reported to the user.
let insert_result = db.insert_slug(new_slug, url).await;
let insert_result = db.insert_slug(new_slug, url.clone()).await;
match insert_result {
Ok(result) => match result {
service::db::AddResult::Success(slug) => Ok(slug),
service::db::AddResult::Success(slug) => {
logger
.access(format!("{} -> {}\n", slug.inner_str(), url))
.ok();
Ok(slug)
}
service::db::AddResult::Fail => Err((
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
debuginfo!("Got insertion response, but it was error.").into(),
@ -936,16 +1136,17 @@ async fn serve() {
let config_file = std::fs::File::open(config_file_name.clone()).unwrap_or_else(|err| {
match err.kind() {
std::io::ErrorKind::NotFound => {
panic!("Configuration file {} does not exist.", config_file_name)
eprintln!("Configuration file {} does not exist.", config_file_name)
}
std::io::ErrorKind::PermissionDenied => {
panic!("Read permission to {} was denied.", config_file_name)
eprintln!("Read permission to {} was denied.", config_file_name)
}
_ => panic!(
_ => eprintln!(
"Error when trying to read configuration file {}: {}",
config_file_name, err
),
};
std::process::exit(1);
});
let parse_result = tokio::task::spawn_blocking(move || {
conf::Config::from_sync_buffer(std::io::BufReader::new(config_file))
@ -959,6 +1160,9 @@ async fn serve() {
}
};
// Create logger
let logger = log::Logger::from_log_rules(&config.log_rules);
// Create slug factory
let slug_factory = slug::SlugFactory::from_rules(config.slug_rules);
@ -968,11 +1172,32 @@ async fn serve() {
db::SlugDatabase::from_client(client, config.db.expire_seconds)
};
// We leak the slug factory and the database, because we know that these
// will live forever, and want them to have 'static lifetime so that warp is
// happy.
// We leak the slug factory, the database, and the logger, because we know
// that these will live forever, and want them to have 'static lifetime so
// that warp is happy.
let slug_factory: &'static slug::SlugFactory = Box::leak(Box::new(slug_factory));
let db: &'static db::SlugDatabase = Box::leak(Box::new(db));
let logger: &'static log::Logger = Box::leak(Box::new(logger));
// Warp logging compatibility layer
let log = warp::log::custom(move |info| {
let log_msg = format!(
"{} ({}) {} {}, replied with status {}\n",
info.remote_addr()
.map(|x| x.to_string())
.unwrap_or_else(|| "<Unknown remote address>".to_string()),
info.user_agent()
.unwrap_or_else(|| "<No user agent provided>"),
info.method(),
info.path(),
info.status().as_u16(),
);
if info.status().is_client_error() || info.status().is_server_error() {
logger.error(log_msg).ok();
} else {
logger.access(log_msg).ok();
}
});
// POST /shorten/ with link in argument
let shorten = warp::post()
@ -987,13 +1212,14 @@ async fn serve() {
.body(String::new())
.unwrap();
}
match shorten(&slug_factory, &db, b64str.unwrap()).await {
match shorten(&slug_factory, &db, b64str.unwrap(), logger).await {
Ok(slug) => Response::builder()
.body(slug.inner_str().to_string())
.unwrap(),
Err((status, message)) => Response::builder().status(status).body(message).unwrap(),
}
});
})
.with(log);
// GET /l/:Slug
let link = warp::path("l")
@ -1007,12 +1233,15 @@ async fn serve() {
.unwrap(),
Err((status, message)) => Response::builder().status(status).body(message).unwrap(),
}
});
})
.with(log);
// GET /
// This should be the last thing matched, so that anything that doesn't
// match another filter will try to match a file.
let homepage = warp::get().and(config.serve_rules.dir.to_filter());
let homepage = warp::get()
.and(config.serve_rules.dir.to_filter())
.with(log);
let get_routes = warp::get().and(link.or(homepage));
let post_routes = warp::post().and(shorten);