2022-02-24 20:24:08 +01:00
|
|
|
//! The combined data provider.
|
|
|
|
//!
|
|
|
|
//! This combines and collates data using the other providers.
|
|
|
|
|
|
|
|
use cached::proc_macro::cached;
|
|
|
|
use chrono::serde::ts_seconds;
|
|
|
|
use chrono::{DateTime, Utc};
|
|
|
|
use rocket::serde::Serialize;
|
|
|
|
|
|
|
|
pub(crate) use super::buienradar::{self, Sample as BuienradarSample};
|
|
|
|
pub(crate) use super::luchtmeetnet::{self, Item as LuchtmeetnetItem};
|
|
|
|
use crate::maps::MapsHandle;
|
|
|
|
use crate::position::Position;
|
2022-06-06 14:53:56 +02:00
|
|
|
use crate::{Error, Metric};
|
|
|
|
|
|
|
|
/// The possible merge errors that can occur.
|
|
|
|
#[allow(clippy::enum_variant_names)]
|
|
|
|
#[derive(Debug, thiserror::Error, PartialEq)]
|
|
|
|
pub(crate) enum MergeError {
|
|
|
|
/// No AQI item found.
|
|
|
|
#[error("No AQI item found")]
|
|
|
|
NoAqiItemFound,
|
|
|
|
|
|
|
|
/// No pollen item found.
|
|
|
|
#[error("No pollen item found")]
|
|
|
|
NoPollenItemFound,
|
|
|
|
|
|
|
|
/// No AQI item found within 30 minutes of first pollen item.
|
|
|
|
#[error("No AQI item found within 30 minutes of first pollen item")]
|
|
|
|
NoCloseAqiItemFound,
|
|
|
|
|
|
|
|
/// No pollen item found within 30 minutes of first AQI item.
|
|
|
|
#[error("No pollen item found within 30 minutes of first AQI item")]
|
|
|
|
NoClosePollenItemFound,
|
|
|
|
}
|
2022-02-24 20:24:08 +01:00
|
|
|
|
|
|
|
/// The combined data item.
|
2022-05-08 12:53:09 +02:00
|
|
|
#[derive(Clone, Debug, PartialEq, Serialize)]
|
2022-02-24 20:24:08 +01:00
|
|
|
#[serde(crate = "rocket::serde")]
|
|
|
|
pub(crate) struct Item {
|
|
|
|
/// The time(stamp) of the forecast.
|
|
|
|
#[serde(serialize_with = "ts_seconds::serialize")]
|
|
|
|
time: DateTime<Utc>,
|
|
|
|
|
|
|
|
/// The forecasted value.
|
|
|
|
value: f32,
|
|
|
|
}
|
|
|
|
|
2022-05-08 13:54:17 +02:00
|
|
|
impl Item {
|
|
|
|
#[cfg(test)]
|
|
|
|
pub(crate) fn new(time: DateTime<Utc>, value: f32) -> Self {
|
|
|
|
Self { time, value }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-02-24 20:24:08 +01:00
|
|
|
/// Merges pollen samples and AQI items into combined items.
|
|
|
|
///
|
2022-05-07 21:37:11 +02:00
|
|
|
/// The merging drops items from either the pollen samples or from the AQI items if they are not
|
2022-06-06 14:53:56 +02:00
|
|
|
/// stamped within half an hour of the first item of the latest starting series, thus lining them
|
2022-05-07 21:37:11 +02:00
|
|
|
/// before they are combined.
|
2022-02-24 20:24:08 +01:00
|
|
|
fn merge(
|
|
|
|
pollen_samples: Vec<BuienradarSample>,
|
|
|
|
aqi_items: Vec<LuchtmeetnetItem>,
|
2022-06-06 14:53:56 +02:00
|
|
|
) -> Result<Vec<Item>, MergeError> {
|
2022-02-24 20:24:08 +01:00
|
|
|
let mut pollen_samples = pollen_samples;
|
|
|
|
let mut aqi_items = aqi_items;
|
|
|
|
|
2022-05-10 12:24:19 +02:00
|
|
|
// Only retain samples/items that have timestamps that are at least an hour ago.
|
2022-05-08 12:53:32 +02:00
|
|
|
let now = Utc::now();
|
2022-05-10 12:24:19 +02:00
|
|
|
pollen_samples.retain(|smp| smp.time.signed_duration_since(now).num_seconds() > -3600);
|
|
|
|
aqi_items.retain(|item| item.time.signed_duration_since(now).num_seconds() > -3600);
|
2022-05-08 12:53:32 +02:00
|
|
|
|
2022-02-24 20:24:08 +01:00
|
|
|
// Align the iterators based on the (hourly) timestamps!
|
2022-06-06 14:53:56 +02:00
|
|
|
let pollen_first_time = pollen_samples
|
|
|
|
.first()
|
|
|
|
.ok_or(MergeError::NoPollenItemFound)?
|
|
|
|
.time;
|
|
|
|
let aqi_first_time = aqi_items.first().ok_or(MergeError::NoAqiItemFound)?.time;
|
2022-02-24 20:24:08 +01:00
|
|
|
if pollen_first_time < aqi_first_time {
|
|
|
|
// Drain one or more pollen samples to line up.
|
2022-06-06 14:53:56 +02:00
|
|
|
let idx = pollen_samples
|
|
|
|
.iter()
|
|
|
|
.position(|smp| {
|
|
|
|
smp.time
|
|
|
|
.signed_duration_since(aqi_first_time)
|
|
|
|
.num_seconds()
|
|
|
|
.abs()
|
|
|
|
< 1800
|
|
|
|
})
|
|
|
|
.ok_or(MergeError::NoCloseAqiItemFound)?;
|
2022-02-24 20:24:08 +01:00
|
|
|
pollen_samples.drain(..idx);
|
|
|
|
} else {
|
|
|
|
// Drain one or more AQI items to line up.
|
2022-06-06 14:53:56 +02:00
|
|
|
let idx = aqi_items
|
|
|
|
.iter()
|
|
|
|
.position(|item| {
|
|
|
|
item.time
|
|
|
|
.signed_duration_since(pollen_first_time)
|
|
|
|
.num_seconds()
|
|
|
|
.abs()
|
|
|
|
< 1800
|
|
|
|
})
|
|
|
|
.ok_or(MergeError::NoClosePollenItemFound)?;
|
2022-02-24 20:24:08 +01:00
|
|
|
aqi_items.drain(..idx);
|
|
|
|
}
|
|
|
|
|
|
|
|
// Combine the samples with items by taking the maximum of pollen sample score and AQI item
|
|
|
|
// value.
|
|
|
|
let items = pollen_samples
|
|
|
|
.into_iter()
|
|
|
|
.zip(aqi_items.into_iter())
|
|
|
|
.map(|(pollen_sample, aqi_item)| {
|
|
|
|
let time = pollen_sample.time;
|
|
|
|
let value = (pollen_sample.score as f32).max(aqi_item.value);
|
|
|
|
|
|
|
|
Item { time, value }
|
|
|
|
})
|
|
|
|
.collect();
|
|
|
|
|
2022-06-06 14:53:56 +02:00
|
|
|
Ok(items)
|
2022-02-24 20:24:08 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
/// Retrieves the combined forecasted items for the provided position and metric.
|
|
|
|
///
|
|
|
|
/// It supports the following metric:
|
|
|
|
/// * [`Metric::PAQI`]
|
|
|
|
#[cached(
|
|
|
|
time = 1800,
|
|
|
|
key = "(Position, Metric)",
|
|
|
|
convert = r#"{ (position, metric) }"#,
|
2022-06-06 14:53:56 +02:00
|
|
|
result = true
|
2022-02-24 20:24:08 +01:00
|
|
|
)]
|
|
|
|
pub(crate) async fn get(
|
|
|
|
position: Position,
|
|
|
|
metric: Metric,
|
|
|
|
maps_handle: &MapsHandle,
|
2022-06-06 14:53:56 +02:00
|
|
|
) -> Result<Vec<Item>, Error> {
|
2022-02-24 20:24:08 +01:00
|
|
|
if metric != Metric::PAQI {
|
2022-06-06 14:53:56 +02:00
|
|
|
return Err(Error::UnsupportedMetric(metric));
|
2022-02-24 20:24:08 +01:00
|
|
|
};
|
2022-06-06 14:53:56 +02:00
|
|
|
let pollen_items = buienradar::get_samples(position, Metric::Pollen, maps_handle).await?;
|
|
|
|
let aqi_items = luchtmeetnet::get(position, Metric::AQI).await?;
|
|
|
|
let items = merge(pollen_items, aqi_items)?;
|
2022-02-24 20:24:08 +01:00
|
|
|
|
2022-06-06 14:53:56 +02:00
|
|
|
Ok(items)
|
2022-02-24 20:24:08 +01:00
|
|
|
}
|
2022-05-08 12:53:32 +02:00
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod tests {
|
|
|
|
use chrono::{Duration, Timelike};
|
|
|
|
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn merge() {
|
|
|
|
let t_now = Utc::now()
|
|
|
|
.with_second(0)
|
|
|
|
.unwrap()
|
|
|
|
.with_nanosecond(0)
|
|
|
|
.unwrap();
|
|
|
|
let t_m2 = t_now.checked_sub_signed(Duration::days(1)).unwrap();
|
|
|
|
let t_m1 = t_now.checked_sub_signed(Duration::hours(2)).unwrap();
|
|
|
|
let t_0 = t_now.checked_add_signed(Duration::minutes(12)).unwrap();
|
|
|
|
let t_1 = t_now.checked_add_signed(Duration::minutes(72)).unwrap();
|
|
|
|
let t_2 = t_now.checked_add_signed(Duration::minutes(132)).unwrap();
|
|
|
|
|
|
|
|
let pollen_samples = Vec::from([
|
2022-05-08 13:54:17 +02:00
|
|
|
BuienradarSample::new(t_m2, 4),
|
|
|
|
BuienradarSample::new(t_m1, 5),
|
|
|
|
BuienradarSample::new(t_0, 1),
|
|
|
|
BuienradarSample::new(t_1, 3),
|
|
|
|
BuienradarSample::new(t_2, 2),
|
2022-05-08 12:53:32 +02:00
|
|
|
]);
|
|
|
|
let aqi_items = Vec::from([
|
2022-05-08 13:54:17 +02:00
|
|
|
LuchtmeetnetItem::new(t_m2, 4.0),
|
|
|
|
LuchtmeetnetItem::new(t_m1, 5.0),
|
|
|
|
LuchtmeetnetItem::new(t_0, 1.1),
|
|
|
|
LuchtmeetnetItem::new(t_1, 2.9),
|
|
|
|
LuchtmeetnetItem::new(t_2, 2.4),
|
2022-05-08 12:53:32 +02:00
|
|
|
]);
|
|
|
|
|
2022-05-08 14:01:22 +02:00
|
|
|
// Perform a normal merge.
|
2022-05-08 13:29:54 +02:00
|
|
|
let merged = super::merge(pollen_samples.clone(), aqi_items.clone());
|
2022-06-06 14:53:56 +02:00
|
|
|
assert!(merged.is_ok());
|
2022-06-05 21:47:12 +02:00
|
|
|
let paqi = merged.unwrap();
|
2022-05-08 12:53:32 +02:00
|
|
|
assert_eq!(
|
|
|
|
paqi,
|
|
|
|
Vec::from([
|
2022-05-08 13:54:17 +02:00
|
|
|
Item::new(t_0, 1.1),
|
|
|
|
Item::new(t_1, 3.0),
|
|
|
|
Item::new(t_2, 2.4),
|
2022-05-08 12:53:32 +02:00
|
|
|
])
|
|
|
|
);
|
2022-05-08 13:29:54 +02:00
|
|
|
|
|
|
|
// The pollen samples are shifted, i.e. one hour in the future.
|
|
|
|
let shifted_pollen_samples = pollen_samples[2..]
|
|
|
|
.iter()
|
|
|
|
.cloned()
|
|
|
|
.map(|mut item| {
|
|
|
|
item.time = item.time.checked_add_signed(Duration::hours(1)).unwrap();
|
|
|
|
item
|
|
|
|
})
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
let merged = super::merge(shifted_pollen_samples, aqi_items.clone());
|
2022-06-06 14:53:56 +02:00
|
|
|
assert!(merged.is_ok());
|
2022-06-05 21:47:12 +02:00
|
|
|
let paqi = merged.unwrap();
|
2022-05-10 12:26:10 +02:00
|
|
|
assert_eq!(paqi, Vec::from([Item::new(t_1, 2.9), Item::new(t_2, 3.0)]));
|
2022-05-08 13:29:54 +02:00
|
|
|
|
|
|
|
// The AQI items are shifted, i.e. one hour in the future.
|
|
|
|
let shifted_aqi_items = aqi_items[2..]
|
|
|
|
.iter()
|
|
|
|
.cloned()
|
|
|
|
.map(|mut item| {
|
|
|
|
item.time = item.time.checked_add_signed(Duration::hours(1)).unwrap();
|
|
|
|
item
|
|
|
|
})
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
let merged = super::merge(pollen_samples.clone(), shifted_aqi_items);
|
2022-06-06 14:53:56 +02:00
|
|
|
assert!(merged.is_ok());
|
2022-06-05 21:47:12 +02:00
|
|
|
let paqi = merged.unwrap();
|
2022-05-10 12:26:10 +02:00
|
|
|
assert_eq!(paqi, Vec::from([Item::new(t_1, 3.0), Item::new(t_2, 2.9)]));
|
2022-05-08 13:29:54 +02:00
|
|
|
|
2022-05-10 12:26:10 +02:00
|
|
|
// The maximum sample/item should not be later then the interval the PAQI items cover.
|
|
|
|
let merged = super::merge(pollen_samples[..3].to_vec(), aqi_items.clone());
|
2022-06-06 14:53:56 +02:00
|
|
|
assert!(merged.is_ok());
|
2022-06-05 21:47:12 +02:00
|
|
|
let paqi = merged.unwrap();
|
2022-05-10 12:26:10 +02:00
|
|
|
assert_eq!(paqi, Vec::from([Item::new(t_0, 1.1)]));
|
|
|
|
|
|
|
|
let merged = super::merge(pollen_samples.clone(), aqi_items[..3].to_vec());
|
2022-06-06 14:53:56 +02:00
|
|
|
assert!(merged.is_ok());
|
2022-06-05 21:47:12 +02:00
|
|
|
let paqi = merged.unwrap();
|
2022-05-10 12:26:10 +02:00
|
|
|
assert_eq!(paqi, Vec::from([Item::new(t_0, 1.1)]));
|
|
|
|
|
2022-05-08 14:01:22 +02:00
|
|
|
// Merging fails because the samples/items are too far (6 hours) apart.
|
2022-05-08 13:29:54 +02:00
|
|
|
let shifted_aqi_items = aqi_items
|
|
|
|
.iter()
|
|
|
|
.cloned()
|
|
|
|
.map(|mut item| {
|
|
|
|
item.time = item.time.checked_add_signed(Duration::hours(6)).unwrap();
|
|
|
|
item
|
|
|
|
})
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
let merged = super::merge(pollen_samples.clone(), shifted_aqi_items);
|
2022-06-06 14:53:56 +02:00
|
|
|
assert_eq!(merged, Err(MergeError::NoCloseAqiItemFound));
|
|
|
|
|
|
|
|
let shifted_pollen_samples = pollen_samples
|
|
|
|
.iter()
|
|
|
|
.cloned()
|
|
|
|
.map(|mut item| {
|
|
|
|
item.time = item.time.checked_add_signed(Duration::hours(6)).unwrap();
|
|
|
|
item
|
|
|
|
})
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
let merged = super::merge(shifted_pollen_samples, aqi_items.clone());
|
|
|
|
assert_eq!(merged, Err(MergeError::NoClosePollenItemFound));
|
2022-05-08 13:29:54 +02:00
|
|
|
|
|
|
|
// The pollen samples list is empty, or everything is too old.
|
|
|
|
let merged = super::merge(Vec::new(), aqi_items.clone());
|
2022-06-06 14:53:56 +02:00
|
|
|
assert_eq!(merged, Err(MergeError::NoPollenItemFound));
|
2022-05-08 13:29:54 +02:00
|
|
|
let merged = super::merge(pollen_samples[0..2].to_vec(), aqi_items.clone());
|
2022-06-06 14:53:56 +02:00
|
|
|
assert_eq!(merged, Err(MergeError::NoPollenItemFound));
|
2022-05-08 13:29:54 +02:00
|
|
|
|
|
|
|
// The AQI items list is empty, or everything is too old.
|
|
|
|
let merged = super::merge(pollen_samples.clone(), Vec::new());
|
2022-06-06 14:53:56 +02:00
|
|
|
assert_eq!(merged, Err(MergeError::NoAqiItemFound));
|
2022-05-08 13:29:54 +02:00
|
|
|
let merged = super::merge(pollen_samples, aqi_items[0..2].to_vec());
|
2022-06-06 14:53:56 +02:00
|
|
|
assert_eq!(merged, Err(MergeError::NoAqiItemFound));
|
2022-05-08 12:53:32 +02:00
|
|
|
}
|
|
|
|
}
|