aboutsummaryrefslogtreecommitdiff
path: root/ops
diff options
context:
space:
mode:
authorTill Höppner2016-03-02 12:57:36 +0100
committerTill Höppner2016-03-02 12:57:36 +0100
commita4db0628a0377b39be02f0e83832b0c3527933e1 (patch)
tree375e33b2942b6374e352b554d7202664812ddf2f /ops
parent52d4c29f5bce85abadeb9fd394f55caf488b37f3 (diff)
downloadilc-0.3.tar.gz
ilc-0.3.tar.xz
ilc-0.3.zip
Merging of any number of logsv0.3.0v0.3
Diffstat (limited to 'ops')
-rw-r--r--ops/Cargo.toml6
-rw-r--r--ops/src/lib.rs78
2 files changed, 81 insertions, 3 deletions
diff --git a/ops/Cargo.toml b/ops/Cargo.toml
index b0b0ce6..9a31871 100644
--- a/ops/Cargo.toml
+++ b/ops/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "ilc-ops"
-version = "0.1.1"
+version = "0.1.2"
description = "IRC log converter/collector/cruncher"
homepage = "https://github.com/tilpner/ilc"
license = "Apache-2.0"
@@ -8,5 +8,7 @@ repository = "https://github.com/tilpner/ilc"
authors = ["Till Höppner <till@hoeppner.ws>"]
[dependencies]
-ilc-base = "0.1.0"
+log = "0.3.5"
+ilc-base = "~0.2"
blist = "0.0.4"
+bit-set = "0.3.0"
diff --git a/ops/src/lib.rs b/ops/src/lib.rs
index e5d92cb..f148037 100644
--- a/ops/src/lib.rs
+++ b/ops/src/lib.rs
@@ -1,4 +1,7 @@
+#[macro_use]
+extern crate log;
extern crate blist;
+extern crate bit_set;
extern crate ilc_base;
mod ageset;
@@ -13,7 +16,10 @@ pub mod parse {
/// This will return `Err` if the decoder yields `Err`.
pub fn parse(ctx: &Context, input: &mut BufRead, decoder: &mut Decode) -> ilc_base::Result<()> {
for e in decoder.decode(&ctx, input) {
- try!(e);
+ match e {
+ Ok(e) => debug!("{:?}", e),
+ Err(e) => error!("{:?}", e),
+ }
}
Ok(())
}
@@ -149,3 +155,73 @@ pub mod dedup {
Ok(())
}
}
+
+/// "Efficient" n-way merging
+pub mod merge {
+ use std::io::{BufRead, Write};
+ use bit_set::BitSet;
+ use ilc_base::{self, Context, Decode, Encode, Event};
+
+ /// Merge several individually sorted logs, *without* reading everything
+ /// into memory.
+ ///
+ /// The `input` and `decode` parameter will be zipped, so make sure they match up.
+ ///
+ /// Output will be inconsistent if every input isn't sorted by itself.
+ /// Logs with incomplete dates are free to do to weird stuff.
+ pub fn merge<'a>(ctx: &Context,
+ input: Vec<&'a mut BufRead>,
+ decode: &mut Decode,
+ output: &mut Write,
+ encode: &Encode)
+ -> ilc_base::Result<()> {
+ let mut events = input.into_iter()
+ .map(|i| decode.decode(&ctx, i).peekable())
+ .collect::<Vec<_>>();
+ let mut empty = BitSet::with_capacity(events.len());
+
+ loop {
+ if events.is_empty() {
+ return Ok(());
+ }
+
+ let earliest_idx = {
+ // Lifetimes can be a lot easier if you don't nest closures.
+ // Uglier too, yes. But hey, at least it compiles.
+ // Currently earliest event
+ let mut current = None;
+ // Index of stream that has "current" as head
+ let mut stream_idx = None;
+
+ for (idx, stream) in events.iter_mut().enumerate() {
+ let peek = stream.peek();
+ if let Some(ev) = peek {
+ // Ignore errors in stream
+ if let &Ok(ref ev) = ev {
+ if current.map(|c: &Event| ev.time < c.time).unwrap_or(true) {
+ current = Some(ev);
+ stream_idx = Some(idx);
+ }
+ }
+ } else {
+ empty.insert(idx);
+ }
+ }
+ stream_idx
+ };
+
+ // Safe because of matching against Some(&Ok(ref ev)) earlier
+ let earliest = earliest_idx.map(|idx| events[idx].next().unwrap().unwrap());
+
+ if let Some(event) = earliest {
+ try!(encode.encode(&ctx, output, &event));
+ }
+
+ // Keep non-empty streams
+ for (offset, idx) in empty.iter().enumerate() {
+ events.remove(offset + idx);
+ }
+ empty.clear();
+ }
+ }
+}