From 09f6509050f24e54c9859e0905e8b6731d91f39f Mon Sep 17 00:00:00 2001 From: Till Hoeppner Date: Wed, 29 Jul 2015 00:01:46 +0200 Subject: implement sort and dedup of logs --- src/ageset.rs | 41 ++++++++++++++++++++++++++++++++++++++++ src/event.rs | 49 +++++++++++++++++++++++++++++++++++++++++++++--- src/format/energymech.rs | 12 +++++++----- src/format/weechat3.rs | 12 +++++++----- src/main.rs | 43 ++++++++++++++++++++++++++++++++++++++++-- 5 files changed, 142 insertions(+), 15 deletions(-) create mode 100644 src/ageset.rs (limited to 'src') diff --git a/src/ageset.rs b/src/ageset.rs new file mode 100644 index 0000000..084afba --- /dev/null +++ b/src/ageset.rs @@ -0,0 +1,41 @@ +use std::collections::HashSet; +use std::hash::Hash; + +use blist::BList; + +/// So... this is a rather weird thing. +/// It allows to semi-efficiently check the oldest (earliest insertion) +/// elements for certain criteria and remove them in the order of insertion +/// if the criteria is met. +pub struct AgeSet { + fifo: BList, + set: HashSet +} + +impl AgeSet where T: Eq + Hash + Clone { + pub fn new() -> Self { + AgeSet { + fifo: BList::new(), + set: HashSet::new() + } + } + + pub fn contains(&self, t: &T) -> bool { + self.set.contains(t) + } + + pub fn prune(&mut self, kill: F) where F: Fn(&T) -> bool { + while let Some(ref e) = self.fifo.front().map(T::clone) { + if kill(&e) { + let removed = self.fifo.pop_front().unwrap(); + self.set.remove(&e); + assert!(*e == removed); + } else { break } + } + } + + pub fn push(&mut self, t: T) { + self.fifo.push_back(t.clone()); + self.set.insert(t); + } +} diff --git a/src/event.rs b/src/event.rs index 9c59df2..83bb876 100644 --- a/src/event.rs +++ b/src/event.rs @@ -16,9 +16,12 @@ //! These will be used by all formats for encoding and decoding. use std::borrow::Cow; +use std::cmp::Ordering; +use std::hash::{ Hash, Hasher }; use chrono::naive::time::NaiveTime; use chrono::offset::fixed::FixedOffset; +use chrono::offset::local::Local; use chrono::offset::TimeZone; /// A whole log, in memory. This structure does not specify its @@ -30,7 +33,7 @@ pub struct Log<'a> { /// Different log formats carry different amounts of information. Some might /// hold enough information to calculate precise timestamps, others might /// only suffice for the time of day. -#[derive(Clone, Debug, PartialEq, Hash, RustcEncodable, RustcDecodable)] +#[derive(Clone, Debug, PartialEq, Eq, Ord, Hash, RustcEncodable, RustcDecodable)] pub enum Time { Unknown, Hms(u8, u8, u8), @@ -53,9 +56,39 @@ impl Time { &Time::Timestamp(t) => format!("{}", tz.timestamp(t, 0).format(f)) } } + + pub fn as_timestamp(&self) -> i64 { + use self::Time::*; + match self { + &Unknown => 0, + &Hms(h, m, s) => Local::today() + .and_hms(h as u32, m as u32, s as u32) + .timestamp(), + &Timestamp(i) => i + } + } + + pub fn to_timestamp(&self) -> Time { Time::Timestamp(self.as_timestamp()) } } -#[derive(Clone, Debug, PartialEq, Hash, RustcEncodable, RustcDecodable)] +impl PartialOrd for Time { + fn partial_cmp(&self, other: &Time) -> Option { + use self::Time::*; + match (self, other) { + (&Unknown, _) | (_, &Unknown) => None, + (&Hms(a_h, a_m, a_s), &Hms(b_h, b_m, b_s)) => { + if (a_h >= b_h && a_m >= b_m && a_s > b_s) + || (a_h >= b_h && a_m > b_m && a_s >= b_s) + || (a_h > b_h && a_m >= b_m && a_s >= b_s) + { Some(Ordering::Greater) } else { Some(Ordering::Less) } + }, + (&Timestamp(a), &Timestamp(b)) => Some(a.cmp(&b)), + _ => unimplemented!() + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, RustcEncodable, RustcDecodable)] pub struct Event<'a> { pub ty: Type<'a>, pub time: Time, @@ -64,7 +97,7 @@ pub struct Event<'a> { /// All representable events, such as messages, quits, joins /// and topic changes. -#[derive(Clone, Debug, Hash, PartialEq, RustcEncodable, RustcDecodable)] +#[derive(Clone, Debug, Hash, PartialEq, Eq, RustcEncodable, RustcDecodable)] pub enum Type<'a> { Connect, Disconnect, @@ -116,3 +149,13 @@ pub enum Type<'a> { masks: Cow<'a, str> } } + +#[derive(Clone, Debug, PartialEq, Eq, RustcEncodable, RustcDecodable)] +pub struct NoTimeHash<'a>(pub Event<'a>); + +impl<'a> Hash for NoTimeHash<'a> { + fn hash(&self, state: &mut H) where H: Hasher { + self.0.ty.hash(state); + self.0.channel.hash(state); + } +} diff --git a/src/format/energymech.rs b/src/format/energymech.rs index 0e46dce..df48318 100644 --- a/src/format/energymech.rs +++ b/src/format/energymech.rs @@ -31,7 +31,7 @@ static TIME_FORMAT: &'static str = "%H:%M:%S"; pub struct Iter<'a, R: 'a> where R: BufRead { context: &'a Context, input: R, - buffer: String + buffer: Vec } impl<'a, R: 'a> Iterator for Iter<'a, R> where R: BufRead { @@ -54,18 +54,20 @@ impl<'a, R: 'a> Iterator for Iter<'a, R> where R: BufRead { loop { self.buffer.clear(); - match self.input.read_line(&mut self.buffer) { + match self.input.read_until(b'\n', &mut self.buffer) { Ok(0) | Err(_) => return None, Ok(_) => () } + let buffer = String::from_utf8_lossy(&self.buffer); + let mut split_tokens: Vec = Vec::new(); - let tokens = self.buffer.split( |c: char| { + let tokens = buffer.split( |c: char| { if c.is_whitespace() { split_tokens.push(c); true } else { false } }).collect::>(); if log_enabled!(Info) { - info!("Original: `{}`", self.buffer); + info!("Original: `{}`", buffer); info!("Parsing: {:?}", tokens); } @@ -157,7 +159,7 @@ impl<'a, R: 'a> Decode<'a, R> for Energymech where R: BufRead { Iter { context: context, input: input, - buffer: String::new() + buffer: Vec::new() } } } diff --git a/src/format/weechat3.rs b/src/format/weechat3.rs index 3ff781c..e733a38 100644 --- a/src/format/weechat3.rs +++ b/src/format/weechat3.rs @@ -29,7 +29,7 @@ static TIME_DATE_FORMAT: &'static str = "%Y-%m-%d %H:%M:%S"; pub struct Iter<'a, R: 'a> where R: BufRead { context: &'a Context, input: R, - buffer: String + buffer: Vec } impl<'a, R: 'a> Iterator for Iter<'a, R> where R: BufRead { @@ -41,18 +41,20 @@ impl<'a, R: 'a> Iterator for Iter<'a, R> where R: BufRead { loop { self.buffer.clear(); - match self.input.read_line(&mut self.buffer) { + match self.input.read_until(b'\n', &mut self.buffer) { Ok(0) | Err(_) => return None, Ok(_) => () } + let buffer = String::from_utf8_lossy(&self.buffer); + let mut split_tokens: Vec = Vec::new(); - let tokens = self.buffer.split(|c: char| { + let tokens = buffer.split(|c: char| { if c.is_whitespace() { split_tokens.push(c); true } else { false } }).collect::>(); if log_enabled!(Info) { - info!("Original: `{}`", self.buffer); + info!("Original: `{}`", buffer); info!("Parsing: {:?}", tokens); } @@ -143,7 +145,7 @@ impl<'a, I: 'a> Decode<'a, I> for Weechat3 where I: BufRead { Iter { context: context, input: input, - buffer: String::new() + buffer: Vec::new() } } } diff --git a/src/main.rs b/src/main.rs index aa39b69..9ed0646 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,6 +25,7 @@ extern crate regex; extern crate log; extern crate env_logger; extern crate glob; +extern crate blist; use std::process; use std::io::{ self, Read, BufRead, BufReader, Write, BufWriter }; @@ -42,9 +43,12 @@ use glob::glob; use ilc::context::Context; use ilc::format::{ self, Encode, Decode, DecodeBox }; -use ilc::event::{ Event, Type }; +use ilc::event::{ Event, Type, NoTimeHash }; + +use ageset::AgeSet; mod chain; +mod ageset; static USAGE: &'static str = r#" d8b 888 @@ -62,6 +66,8 @@ Usage: ilc parse [options] [-i FILE...] ilc convert [options] [-i FILE...] ilc freq [options] [-i FILE...] + ilc sort [options] [-i FILE...] + ilc dedup [options] [-i FILE...] ilc (-h | --help | -v | --version) Options: @@ -81,6 +87,8 @@ struct Args { cmd_parse: bool, cmd_convert: bool, cmd_freq: bool, + cmd_sort: bool, + cmd_dedup: bool, arg_file: Vec, flag_in: Vec, flag_out: Option, @@ -180,7 +188,7 @@ fn main() { let encoder = force_encoder(args.flag_outf); for e in decoder.decode_box(&context, &mut input) { match e { - Ok(e) => { let _ = encoder.encode(&context, &mut io::stdout(), &e); }, + Ok(e) => { let _ = encoder.encode(&context, &mut output, &e); }, Err(e) => error(Box::new(e)) } } @@ -235,5 +243,36 @@ fn main() { for &(ref name, ref stat) in stats.iter().take(10) { let _ = write!(&mut output, "{}:\n\tLines: {}\n\tWords: {}\n", name, stat.lines, stat.words); } + } else if args.cmd_sort { + let mut decoder = force_decoder(args.flag_inf); + let encoder = force_encoder(args.flag_outf); + let mut events: Vec = decoder.decode_box(&context, &mut input) + .flat_map(Result::ok) + .collect(); + + events.sort_by(|a, b| a.time.cmp(&b.time)); + for e in events { + let _ = encoder.encode(&context, &mut output, &e); + } + } else if args.cmd_dedup { + let mut decoder = force_decoder(args.flag_inf); + let encoder = force_encoder(args.flag_outf); + let mut backlog = AgeSet::new(); + + for e in decoder.decode_box(&context, &mut input) { + if let Ok(e) = e { + let newest_event = e.clone(); + backlog.prune(move |a: &NoTimeHash| { + let age = newest_event.time.as_timestamp() - a.0.time.as_timestamp(); + age > 5000 + }); + // write `e` if it's a new event + let n = NoTimeHash(e); + if !backlog.contains(&n) { + let _ = encoder.encode(&context, &mut output, &n.0); + backlog.push(n); + } + } + } } } -- cgit v1.2.3