From a4db0628a0377b39be02f0e83832b0c3527933e1 Mon Sep 17 00:00:00 2001 From: Till Höppner Date: Wed, 2 Mar 2016 12:57:36 +0100 Subject: Merging of any number of logs --- Cargo.toml | 15 ++++---- base/Cargo.toml | 2 +- base/src/context.rs | 10 ++++++ base/src/dummy.rs | 3 +- base/src/lib.rs | 6 ++-- cli/Cargo.toml | 10 +++--- cli/src/lib.rs | 60 ++++++++++++++++++++++++------- formats/energymech/Cargo.toml | 4 +-- formats/energymech/src/lib.rs | 20 +++++++++-- formats/weechat/Cargo.toml | 4 +-- formats/weechat/src/lib.rs | 82 ++++++++++++++++++++++--------------------- ops/Cargo.toml | 6 ++-- ops/src/lib.rs | 78 +++++++++++++++++++++++++++++++++++++++- src/lib.rs | 3 ++ tests/files/mod.rs | 25 +++++++++++++ tests/lib.rs | 46 ++++++++++++++++++++++++ 16 files changed, 296 insertions(+), 78 deletions(-) create mode 100644 tests/files/mod.rs create mode 100644 tests/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 5c03dac..b7c0a93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,18 +4,21 @@ description = "IRC log converter/collector/cruncher" license = "Apache-2.0" name = "ilc" repository = "https://github.com/tilpner/ilc" -version = "0.2.2" +version = "0.3.0" [[bin]] name = "ilc" doc = false [dependencies] -ilc-cli = "0.1.0" -ilc-base = "0.1.0" -ilc-ops = "0.1.0" -ilc-format-weechat = "0.1.0" -ilc-format-energymech = "0.1.0" +ilc-cli = "~0.1" +ilc-base = "~0.2" +ilc-ops = "~0.1" +ilc-format-weechat = "~0.2" +ilc-format-energymech = "~0.2" + +[dev-dependencies] +flate2 = "~0.2" [profile.release] debug = false diff --git a/base/Cargo.toml b/base/Cargo.toml index 541de02..f82cff0 100644 --- a/base/Cargo.toml +++ b/base/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ilc-base" -version = "0.1.2" +version = "0.2.0" description = "IRC log converter/collector/cruncher" homepage = "https://github.com/tilpner/ilc" license = "Apache-2.0" diff --git a/base/src/context.rs b/base/src/context.rs index 4393457..fe264ab 100644 --- a/base/src/context.rs +++ b/base/src/context.rs @@ -7,3 +7,13 @@ pub struct Context { pub override_date: Option, pub channel: Option, } + +impl Default for Context { + fn default() -> Context { + Context { + timezone: FixedOffset::west(0), + override_date: None, + channel: None, + } + } +} diff --git a/base/src/dummy.rs b/base/src/dummy.rs index 9317c4e..a0ab8b4 100644 --- a/base/src/dummy.rs +++ b/base/src/dummy.rs @@ -18,10 +18,11 @@ use std::io::BufRead; use event::Event; use context::Context; +#[derive(Copy, Clone)] pub struct Dummy; impl ::Decode for Dummy { - fn decode<'a>(&'a mut self, + fn decode<'a>(&'a self, _context: &'a Context, _input: &'a mut BufRead) -> Box>> + 'a> { diff --git a/base/src/lib.rs b/base/src/lib.rs index 1f56dcd..f2777e3 100644 --- a/base/src/lib.rs +++ b/base/src/lib.rs @@ -26,7 +26,7 @@ pub mod dummy; use std::io::{BufRead, Write}; pub use context::Context; -pub use event::Event; +pub use event::{Event, Time}; pub use error::*; pub trait Encode { @@ -37,8 +37,8 @@ pub trait Encode { -> error::Result<()>; } -pub trait Decode { - fn decode<'a>(&'a mut self, +pub trait Decode { + fn decode<'a>(&'a self, context: &'a Context, input: &'a mut BufRead) -> Box>> + 'a>; diff --git a/cli/Cargo.toml b/cli/Cargo.toml index e984319..8e66b7e 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ilc-cli" -version = "0.1.1" +version = "0.1.2" description = "IRC log converter/collector/cruncher" homepage = "https://github.com/tilpner/ilc" license = "Apache-2.0" @@ -16,7 +16,7 @@ clap = "2.1.2" chrono = "0.2.19" env_logger = "0.3.2" glob = "0.2.10" -ilc-base = "0.1.0" -ilc-ops = "0.1.0" -ilc-format-weechat = { optional = true, version = "0.1.0" } -ilc-format-energymech = { optional = true, version = "0.1.0" } +ilc-base = "~0.2" +ilc-ops = "~0.1" +ilc-format-weechat = { optional = true, version = "~0.2" } +ilc-format-energymech = { optional = true, version = "~0.2" } diff --git a/cli/src/lib.rs b/cli/src/lib.rs index 40bc2e7..9469cfb 100644 --- a/cli/src/lib.rs +++ b/cli/src/lib.rs @@ -47,6 +47,10 @@ mod chain; pub fn main() { env_logger::init().unwrap(); + if option_env!("FUSE").is_some() { + info!("Compiled with FUSEs") + } + let args = App::new("ilc") .version(crate_version!()) .setting(AppSettings::GlobalVersion) @@ -181,21 +185,22 @@ pub fn main() { &*e.encoder()) } ("merge", Some(args)) => { - // TODO: avoid (de-)serialization to weechat let e = Environment(&args); - let (ctx, i, d, o, e) = (&e.context(), - &mut e.input(), - &mut *e.decoder(), - &mut *e.output(), - &*e.encoder()); - let mut buffer = Vec::new(); - match sort::sort(ctx, i, d, &mut buffer, &Weechat) { - Err(e) => error(Box::new(e)), - _ => (), - } - let mut read = io::Cursor::new(&buffer); - dedup::dedup(ctx, &mut read, &mut Weechat, o, e) + let mut inputs = e.inputs(); + // let mut decoders = e.decoders(); + + let borrowed_inputs = inputs.iter_mut() + .map(|a| a as &mut BufRead) + .collect(); + // let borrowed_decoders = decoders.iter_mut() + // .map(|a| &mut **a as &mut Decode) + // .collect(); + merge::merge(&e.context(), + borrowed_inputs, + &mut *e.decoder(), + &mut *e.output(), + &*e.encoder()) } (sc, _) if !sc.is_empty() => panic!("Unimplemented subcommand `{}`, this is a bug", sc), _ => die("No command specified"), @@ -272,15 +277,44 @@ impl<'a> Environment<'a> { pub fn context(&self) -> Context { build_context(self.0) } + pub fn input(&self) -> Box { open_files(gather_input(self.0)) } + + pub fn inputs(&self) -> Vec> { + gather_input(self.0) + .iter() + .map(|path| { + Box::new(BufReader::new(File::open(path) + .unwrap_or_else(|e| { + error(Box::new(e)) + }))) as Box + }) + .collect() + } + pub fn output(&self) -> Box { open_output(self.0) } + pub fn decoder(&self) -> Box { force_decoder(self.0.value_of("format").or(self.0.value_of("input_format"))) } + + /* pub fn decoders(&self) -> Vec> { + * self.0 + * .value_of("format") + * .into_iter() + * .chain(self.0 + * .values_of("input_formats") + * .map(|i| Box::new(i) as Box>) + * .unwrap_or(Box::new(iter::empty()) as Box>)) + * .map(Option::Some) + * .map(force_decoder) + * .collect() + * } */ + pub fn encoder(&self) -> Box { force_encoder(self.0.value_of("format").or(self.0.value_of("output_format"))) } diff --git a/formats/energymech/Cargo.toml b/formats/energymech/Cargo.toml index 09e41b1..b5cad0a 100644 --- a/formats/energymech/Cargo.toml +++ b/formats/energymech/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ilc-format-energymech" -version = "0.1.1" +version = "0.2.0" description = "IRC log converter/collector/cruncher" homepage = "https://github.com/tilpner/ilc" license = "Apache-2.0" @@ -10,4 +10,4 @@ authors = ["Till Höppner "] [dependencies] log = "0.3.5" chrono = "0.2.19" -ilc-base = "0.1.0" +ilc-base = "~0.2" diff --git a/formats/energymech/src/lib.rs b/formats/energymech/src/lib.rs index e0863e7..ed687f0 100644 --- a/formats/energymech/src/lib.rs +++ b/formats/energymech/src/lib.rs @@ -29,6 +29,7 @@ use log::LogLevel::Info; use chrono::*; +#[derive(Copy, Clone)] pub struct Energymech; static TIME_FORMAT: &'static str = "%H:%M:%S"; @@ -219,12 +220,15 @@ impl<'a> Iterator for Iter<'a> { channel: self.context.channel.clone().map(Into::into), })); } + if option_env!("FUSE").is_some() { + panic!("Shouldn't reach here, this is a bug!") + } } } } impl Decode for Energymech { - fn decode<'a>(&'a mut self, + fn decode<'a>(&'a self, context: &'a Context, input: &'a mut BufRead) -> Box>> + 'a> { @@ -250,6 +254,13 @@ impl Encode for Energymech { from, content)) } + &Event { ty: Type::Notice { ref from, ref content }, ref time, .. } => { + try!(writeln!(&mut output, + "[{}] -{}- {}", + time.with_format(&context.timezone, TIME_FORMAT), + from, + content)) + } &Event { ty: Type::Action { ref from, ref content }, ref time, .. } => { try!(writeln!(&mut output, "[{}] * {} {}", @@ -302,7 +313,12 @@ impl Encode for Energymech { nick.as_ref().expect("Nick not present, but required."), new_topic)) } - _ => (), + _ => { + if option_env!("FUSE").is_some() { + panic!("Shouldn't reach here, this is a bug!") + } + () + } } Ok(()) } diff --git a/formats/weechat/Cargo.toml b/formats/weechat/Cargo.toml index 5079d50..244932a 100644 --- a/formats/weechat/Cargo.toml +++ b/formats/weechat/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ilc-format-weechat" -version = "0.1.1" +version = "0.2.0" description = "IRC log converter/collector/cruncher" homepage = "https://github.com/tilpner/ilc" license = "Apache-2.0" @@ -10,4 +10,4 @@ authors = ["Till Höppner "] [dependencies] log = "0.3.5" chrono = "0.2.19" -ilc-base = "0.1.0" +ilc-base = "~0.2" diff --git a/formats/weechat/src/lib.rs b/formats/weechat/src/lib.rs index 65bb4fa..4e1bd42 100644 --- a/formats/weechat/src/lib.rs +++ b/formats/weechat/src/lib.rs @@ -26,6 +26,7 @@ use ilc_base::format::{rejoin, strip_one}; use log::LogLevel::Info; +#[derive(Copy, Clone)] pub struct Weechat; static TIME_DATE_FORMAT: &'static str = "%Y-%m-%d %H:%M:%S"; @@ -76,44 +77,40 @@ impl<'a> Iterator for Iter<'a> { let len = tokens.len(); - if len >= 6 && tokens[5] == "has" { - // 2016-02-25 01:15:05 --> Foo (host@mask.foo) has joined #example - if len >= 8 && tokens[6] == "joined" { - return Some(Ok(Event { - ty: Type::Join { - nick: tokens[3].to_owned().into(), - mask: Some(strip_one(tokens[4]).into()), - }, - channel: Some(tokens[7].to_owned().into()), - time: parse_time(&self.context, tokens[0], tokens[1]), - })); - } - // 2016-02-25 01:36:13 <-- Foo (host@mask.foo) has left #channel (Some reason) - else if len >= 9 && tokens[6] == "left" { - return Some(Ok(Event { - ty: Type::Part { - nick: tokens[3].to_owned().into(), - mask: Some(strip_one(&tokens[4]).into()), - reason: Some(strip_one(&rejoin(&tokens[8..], &split_tokens[8..])) - .into()), - }, - channel: Some(tokens[7].to_owned().into()), - time: parse_time(&self.context, tokens[0], tokens[1]), - })); - } - // 2016-02-25 01:38:55 <-- Foo (host@mask.foo) has quit (Some reason) - else if len >= 8 && tokens[6] == "quit" { - return Some(Ok(Event { - ty: Type::Quit { - nick: tokens[3].to_owned().into(), - mask: Some(strip_one(tokens[4]).into()), - reason: Some(strip_one(&rejoin(&tokens[7..], &split_tokens[7..])) - .into()), - }, - time: parse_time(&self.context, tokens[0], tokens[1]), - channel: self.context.channel.clone().map(Into::into), - })); - } + // 2016-02-25 01:15:05 --> Foo (host@mask.foo) has joined #example + if len >= 8 && tokens[5] == "has" && tokens[6] == "joined" { + return Some(Ok(Event { + ty: Type::Join { + nick: tokens[3].to_owned().into(), + mask: Some(strip_one(tokens[4]).into()), + }, + channel: Some(tokens[7].to_owned().into()), + time: parse_time(&self.context, tokens[0], tokens[1]), + })); + } + // 2016-02-25 01:36:13 <-- Foo (host@mask.foo) has left #channel (Some reason) + else if len >= 9 && tokens[5] == "has" && tokens[6] == "left" { + return Some(Ok(Event { + ty: Type::Part { + nick: tokens[3].to_owned().into(), + mask: Some(strip_one(&tokens[4]).into()), + reason: Some(strip_one(&rejoin(&tokens[8..], &split_tokens[8..])).into()), + }, + channel: Some(tokens[7].to_owned().into()), + time: parse_time(&self.context, tokens[0], tokens[1]), + })); + } + // 2016-02-25 01:38:55 <-- Foo (host@mask.foo) has quit (Some reason) + else if len >= 8 && tokens[5] == "has" && tokens[6] == "quit" { + return Some(Ok(Event { + ty: Type::Quit { + nick: tokens[3].to_owned().into(), + mask: Some(strip_one(tokens[4]).into()), + reason: Some(strip_one(&rejoin(&tokens[7..], &split_tokens[7..])).into()), + }, + time: parse_time(&self.context, tokens[0], tokens[1]), + channel: self.context.channel.clone().map(Into::into), + })); } else if len >= 3 && tokens[2] == "--" { // 2016-02-25 04:32:15 -- Notice(playbot-veno): "" if len >= 5 && tokens[3].starts_with("Notice(") { @@ -176,7 +173,7 @@ impl<'a> Iterator for Iter<'a> { } impl Decode for Weechat { - fn decode<'a>(&'a mut self, + fn decode<'a>(&'a self, context: &'a Context, input: &'a mut BufRead) -> Box>> + 'a> { @@ -252,7 +249,12 @@ impl Encode for Weechat { from, content)) } - _ => (), + _ => { + if option_env!("FUSE").is_some() { + panic!("Shouldn't reach here, this is a bug!") + } + () + } } Ok(()) } diff --git a/ops/Cargo.toml b/ops/Cargo.toml index b0b0ce6..9a31871 100644 --- a/ops/Cargo.toml +++ b/ops/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ilc-ops" -version = "0.1.1" +version = "0.1.2" description = "IRC log converter/collector/cruncher" homepage = "https://github.com/tilpner/ilc" license = "Apache-2.0" @@ -8,5 +8,7 @@ repository = "https://github.com/tilpner/ilc" authors = ["Till Höppner "] [dependencies] -ilc-base = "0.1.0" +log = "0.3.5" +ilc-base = "~0.2" blist = "0.0.4" +bit-set = "0.3.0" diff --git a/ops/src/lib.rs b/ops/src/lib.rs index e5d92cb..f148037 100644 --- a/ops/src/lib.rs +++ b/ops/src/lib.rs @@ -1,4 +1,7 @@ +#[macro_use] +extern crate log; extern crate blist; +extern crate bit_set; extern crate ilc_base; mod ageset; @@ -13,7 +16,10 @@ pub mod parse { /// This will return `Err` if the decoder yields `Err`. pub fn parse(ctx: &Context, input: &mut BufRead, decoder: &mut Decode) -> ilc_base::Result<()> { for e in decoder.decode(&ctx, input) { - try!(e); + match e { + Ok(e) => debug!("{:?}", e), + Err(e) => error!("{:?}", e), + } } Ok(()) } @@ -149,3 +155,73 @@ pub mod dedup { Ok(()) } } + +/// "Efficient" n-way merging +pub mod merge { + use std::io::{BufRead, Write}; + use bit_set::BitSet; + use ilc_base::{self, Context, Decode, Encode, Event}; + + /// Merge several individually sorted logs, *without* reading everything + /// into memory. + /// + /// The `input` and `decode` parameter will be zipped, so make sure they match up. + /// + /// Output will be inconsistent if every input isn't sorted by itself. + /// Logs with incomplete dates are free to do to weird stuff. + pub fn merge<'a>(ctx: &Context, + input: Vec<&'a mut BufRead>, + decode: &mut Decode, + output: &mut Write, + encode: &Encode) + -> ilc_base::Result<()> { + let mut events = input.into_iter() + .map(|i| decode.decode(&ctx, i).peekable()) + .collect::>(); + let mut empty = BitSet::with_capacity(events.len()); + + loop { + if events.is_empty() { + return Ok(()); + } + + let earliest_idx = { + // Lifetimes can be a lot easier if you don't nest closures. + // Uglier too, yes. But hey, at least it compiles. + // Currently earliest event + let mut current = None; + // Index of stream that has "current" as head + let mut stream_idx = None; + + for (idx, stream) in events.iter_mut().enumerate() { + let peek = stream.peek(); + if let Some(ev) = peek { + // Ignore errors in stream + if let &Ok(ref ev) = ev { + if current.map(|c: &Event| ev.time < c.time).unwrap_or(true) { + current = Some(ev); + stream_idx = Some(idx); + } + } + } else { + empty.insert(idx); + } + } + stream_idx + }; + + // Safe because of matching against Some(&Ok(ref ev)) earlier + let earliest = earliest_idx.map(|idx| events[idx].next().unwrap().unwrap()); + + if let Some(event) = earliest { + try!(encode.encode(&ctx, output, &event)); + } + + // Keep non-empty streams + for (offset, idx) in empty.iter().enumerate() { + events.remove(offset + idx); + } + empty.clear(); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 724e9f9..ab8040a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,3 +16,6 @@ pub use ilc_ops::freq::freq; pub use ilc_ops::parse::parse; pub use ilc_ops::seen::seen; pub use ilc_ops::sort::sort; + +pub use ilc_format_weechat::Weechat; +pub use ilc_format_energymech::Energymech; diff --git a/tests/files/mod.rs b/tests/files/mod.rs new file mode 100644 index 0000000..5b25e90 --- /dev/null +++ b/tests/files/mod.rs @@ -0,0 +1,25 @@ +extern crate flate2; + +use std::fs::File; +use std::path::Path; +use std::io::{BufReader, Read, Write}; + +use self::flate2::FlateReadExt; + +#[allow(dead_code)] +pub fn read(path: &str) -> Vec { + let path = Path::new("tests").join("input").join(&format!("{}.gz", path)); + let size = path.metadata().expect("Couldn't determine filesize").len(); + let mut out = Vec::with_capacity(size as usize); + let mut input = BufReader::new(File::open(path).expect("Couldn't open file")) + .gz_decode() + .expect("Couldn't decode GZ stream"); + input.read_to_end(&mut out).expect("Couldn't read data"); + out +} + +#[allow(dead_code)] +pub fn write>(p: P, b: &[u8]) { + let mut out = File::create(p).expect("Couldn't create file"); + out.write_all(b).expect("Couldn't write data"); +} diff --git a/tests/lib.rs b/tests/lib.rs new file mode 100644 index 0000000..d236f49 --- /dev/null +++ b/tests/lib.rs @@ -0,0 +1,46 @@ +extern crate ilc; + +use std::default::Default; +use std::io::Cursor; + +use ilc::*; + +mod files; + +#[test] +fn identity() { + let original = files::read("2016-02-26.log"); + let mut output = Vec::new(); + + convert(&Context::default(), + &mut (&original as &[u8]), + &mut Energymech, + &mut output, + &Energymech) + .expect("Conversion failed"); + + files::write("identity.out", &output); + + // don't assert_eq!, as the failed ouput doesn't help anyone + assert!(&original == &output); +} + +/* #[test] + * fn merge() { + * let part1 = Cursor::new(files::read("2016-02-26.log.1")); + * let part2 = Cursor::new(files::read("2016-02-26.log.2")); + * + * let mut output = Vec::new(); + * + * merge(&Context::default(), + * vec![&mut part1, &mut part2], + * &mut Energymech, + * &mut output, + * &Energymech) + * .expect("Merge failed"); + * + * files::write("merged.out", &output); + * + * let original = files::read("2016-02-26.log"); + * assert!(&original == &output); + * } */ -- cgit v1.2.3