Skip to content
22 changes: 22 additions & 0 deletions influxdb/src/query/write_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,28 @@ impl From<&str> for Type {
Type::Text(b.into())
}
}

#[cfg(feature = "chrono")]
impl<Tz: chrono::TimeZone> From<chrono::DateTime<Tz>> for Type {
fn from(dt: chrono::DateTime<Tz>) -> Self {
match dt.timestamp_nanos_opt() {
Some(nanos) => Type::SignedInteger(nanos),
None => {
// For dates before 1677-09-21, or after
// 2262-04-11, we're just going to return 0.
Type::SignedInteger(0)
}
}
}
}

#[cfg(feature = "time")]
impl From<time::UtcDateTime> for Type {
fn from(dt: time::UtcDateTime) -> Self {
Type::SignedInteger(dt.unix_timestamp_nanos().try_into().unwrap_or(0))
}
}

impl<T> From<&T> for Type
where
T: Copy + Into<Type>,
Expand Down
31 changes: 31 additions & 0 deletions influxdb/tests/derive_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ struct WeatherReading {
wind_strength: Option<u64>,
}

#[derive(Debug, PartialEq)]
#[cfg_attr(feature = "derive", derive(InfluxDbWriteable))]
struct WeatherReadingWithNonstandardTime {
#[influxdb(time)]
reading_time: DateTime<Utc>,
#[influxdb(ignore)]
time: DateTime<Utc>,
#[influxdb(ignore)]
humidity: i32,
pressure: i32,
#[influxdb(tag)]
wind_strength: Option<u64>,
}

#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(Deserialize))]
struct WeatherReadingWithoutIgnored {
Expand All @@ -47,6 +61,23 @@ fn test_build_query() {
);
}

#[test]
fn test_build_nonstandard_query() {
let weather_reading = WeatherReadingWithNonstandardTime {
reading_time: Timestamp::Hours(1).into(),
time: Timestamp::Hours(1).into(),
humidity: 30,
pressure: 100,
wind_strength: Some(5),
};
let query = weather_reading.into_query("weather_reading");
let query = query.build().unwrap();
assert_eq!(
query.get(),
"weather_reading,wind_strength=5 pressure=100i 3600000000000"
);
}

#[cfg(feature = "derive")]
/// INTEGRATION TEST
///
Expand Down
81 changes: 58 additions & 23 deletions influxdb_derive/src/writeable.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use proc_macro2::TokenStream;
use quote::{format_ident, quote};
use quote::quote;
use syn::{
parse::{Parse, ParseStream},
punctuated::Punctuated,
Expand All @@ -9,27 +9,32 @@ use syn::{
#[derive(Debug)]
struct WriteableField {
ident: Ident,
is_time: bool,
is_tag: bool,
is_ignore: bool,
}

mod kw {
use syn::custom_keyword;

custom_keyword!(time);
custom_keyword!(tag);
custom_keyword!(ignore);
}

#[allow(dead_code)] // TODO do we need to store the keywords?
enum FieldAttr {
Time(kw::time),
Tag(kw::tag),
Ignore(kw::ignore),
}

impl Parse for FieldAttr {
fn parse(input: ParseStream<'_>) -> syn::Result<Self> {
let lookahead = input.lookahead1();
if lookahead.peek(kw::tag) {
if lookahead.peek(kw::time) {
Ok(Self::Time(input.parse()?))
} else if lookahead.peek(kw::tag) {
Ok(Self::Tag(input.parse()?))
} else if lookahead.peek(kw::ignore) {
Ok(Self::Ignore(input.parse()?))
Expand All @@ -52,6 +57,7 @@ impl TryFrom<Field> for WriteableField {

fn try_from(field: Field) -> syn::Result<WriteableField> {
let ident = field.ident.expect("fields without ident are not supported");
let mut has_time_attr = false;
let mut is_tag = false;
let mut is_ignore = false;

Expand All @@ -60,6 +66,7 @@ impl TryFrom<Field> for WriteableField {
Meta::List(list) if list.path.is_ident("influxdb") => {
for attr in syn::parse2::<FieldAttrs>(list.tokens)?.0 {
match attr {
FieldAttr::Time(_) => has_time_attr = true,
FieldAttr::Tag(_) => is_tag = true,
FieldAttr::Ignore(_) => is_ignore = true,
}
Expand All @@ -69,8 +76,23 @@ impl TryFrom<Field> for WriteableField {
}
}

if [has_time_attr, is_tag, is_ignore]
.iter()
.filter(|&&b| b)
.count()
> 1
{
panic!("only one of time, tag, or ignore can be used");
}

// A field is considered a time field if:
// 1. It has the #[influxdb(time)] attribute, OR
// 2. It's named "time" and doesn't have #[influxdb(ignore)]
let is_time = has_time_attr || (ident == "time" && !is_ignore);

Ok(WriteableField {
ident,
is_time,
is_tag,
is_ignore,
})
Expand All @@ -97,39 +119,52 @@ pub fn expand_writeable(input: DeriveInput) -> syn::Result<TokenStream> {
}
};

let time_field = format_ident!("time");
let time_field_str = time_field.to_string();
#[allow(clippy::cmp_owned)] // that's not how idents work clippy
let fields = match fields {
let writeable_fields: Vec<WriteableField> = match fields {
Fields::Named(fields) => fields
.named
.into_iter()
.filter_map(|f| {
WriteableField::try_from(f)
.map(|wf| {
if !wf.is_ignore && wf.ident.to_string() != time_field_str {
let ident = wf.ident;
Some(match wf.is_tag {
true => quote!(query.add_tag(stringify!(#ident), self.#ident)),
false => quote!(query.add_field(stringify!(#ident), self.#ident)),
})
} else {
None
}
})
.transpose()
})
.map(WriteableField::try_from)
.collect::<syn::Result<Vec<_>>>()?,
_ => panic!("a struct without named fields is not supported"),
_ => panic!("A struct without named fields is not supported!"),
};

// Find the time field
let mut time_field = None;
for wf in &writeable_fields {
if wf.is_time {
if time_field.is_some() {
panic!("multiple time fields found!");
}
time_field = Some(wf.ident.clone());
}
}

// There must be exactly one time field
let time_field = time_field.expect("no time field found");

// Generate field assignments (excluding time and ignored fields)
let field_assignments = writeable_fields
.into_iter()
.filter_map(|wf| {
if wf.is_ignore || wf.is_time {
None
} else {
let ident = wf.ident;
Some(match wf.is_tag {
true => quote!(query.add_tag(stringify!(#ident), self.#ident)),
false => quote!(query.add_field(stringify!(#ident), self.#ident)),
})
}
})
.collect::<Vec<_>>();

Ok(quote! {
impl #impl_generics ::influxdb::InfluxDbWriteable for #ident #ty_generics #where_clause {
fn into_query<I: Into<String>>(self, name: I) -> ::influxdb::WriteQuery {
let timestamp: ::influxdb::Timestamp = self.#time_field.into();
let mut query = timestamp.into_query(name);
#(
query = #fields;
query = #field_assignments;
)*
query
}
Expand Down
Loading