diff options
Diffstat (limited to 'ops')
-rw-r--r-- | ops/Cargo.toml | 6 | ||||
-rw-r--r-- | ops/src/lib.rs | 78 |
2 files changed, 81 insertions, 3 deletions
diff --git a/ops/Cargo.toml b/ops/Cargo.toml index b0b0ce6..9a31871 100644 --- a/ops/Cargo.toml +++ b/ops/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ilc-ops" -version = "0.1.1" +version = "0.1.2" description = "IRC log converter/collector/cruncher" homepage = "https://github.com/tilpner/ilc" license = "Apache-2.0" @@ -8,5 +8,7 @@ repository = "https://github.com/tilpner/ilc" authors = ["Till Höppner <till@hoeppner.ws>"] [dependencies] -ilc-base = "0.1.0" +log = "0.3.5" +ilc-base = "~0.2" blist = "0.0.4" +bit-set = "0.3.0" diff --git a/ops/src/lib.rs b/ops/src/lib.rs index e5d92cb..f148037 100644 --- a/ops/src/lib.rs +++ b/ops/src/lib.rs @@ -1,4 +1,7 @@ +#[macro_use] +extern crate log; extern crate blist; +extern crate bit_set; extern crate ilc_base; mod ageset; @@ -13,7 +16,10 @@ pub mod parse { /// This will return `Err` if the decoder yields `Err`. pub fn parse(ctx: &Context, input: &mut BufRead, decoder: &mut Decode) -> ilc_base::Result<()> { for e in decoder.decode(&ctx, input) { - try!(e); + match e { + Ok(e) => debug!("{:?}", e), + Err(e) => error!("{:?}", e), + } } Ok(()) } @@ -149,3 +155,73 @@ pub mod dedup { Ok(()) } } + +/// "Efficient" n-way merging +pub mod merge { + use std::io::{BufRead, Write}; + use bit_set::BitSet; + use ilc_base::{self, Context, Decode, Encode, Event}; + + /// Merge several individually sorted logs, *without* reading everything + /// into memory. + /// + /// The `input` and `decode` parameter will be zipped, so make sure they match up. + /// + /// Output will be inconsistent if every input isn't sorted by itself. + /// Logs with incomplete dates are free to do to weird stuff. + pub fn merge<'a>(ctx: &Context, + input: Vec<&'a mut BufRead>, + decode: &mut Decode, + output: &mut Write, + encode: &Encode) + -> ilc_base::Result<()> { + let mut events = input.into_iter() + .map(|i| decode.decode(&ctx, i).peekable()) + .collect::<Vec<_>>(); + let mut empty = BitSet::with_capacity(events.len()); + + loop { + if events.is_empty() { + return Ok(()); + } + + let earliest_idx = { + // Lifetimes can be a lot easier if you don't nest closures. + // Uglier too, yes. But hey, at least it compiles. + // Currently earliest event + let mut current = None; + // Index of stream that has "current" as head + let mut stream_idx = None; + + for (idx, stream) in events.iter_mut().enumerate() { + let peek = stream.peek(); + if let Some(ev) = peek { + // Ignore errors in stream + if let &Ok(ref ev) = ev { + if current.map(|c: &Event| ev.time < c.time).unwrap_or(true) { + current = Some(ev); + stream_idx = Some(idx); + } + } + } else { + empty.insert(idx); + } + } + stream_idx + }; + + // Safe because of matching against Some(&Ok(ref ev)) earlier + let earliest = earliest_idx.map(|idx| events[idx].next().unwrap().unwrap()); + + if let Some(event) = earliest { + try!(encode.encode(&ctx, output, &event)); + } + + // Keep non-empty streams + for (offset, idx) in empty.iter().enumerate() { + events.remove(offset + idx); + } + empty.clear(); + } + } +} |