aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTill Höppner2016-03-02 12:57:36 +0100
committerTill Höppner2016-03-02 12:57:36 +0100
commita4db0628a0377b39be02f0e83832b0c3527933e1 (patch)
tree375e33b2942b6374e352b554d7202664812ddf2f
parent52d4c29f5bce85abadeb9fd394f55caf488b37f3 (diff)
downloadilc-0.3.tar.gz
ilc-0.3.tar.xz
ilc-0.3.zip
Merging of any number of logsv0.3.0v0.3
-rw-r--r--Cargo.toml15
-rw-r--r--base/Cargo.toml2
-rw-r--r--base/src/context.rs10
-rw-r--r--base/src/dummy.rs3
-rw-r--r--base/src/lib.rs6
-rw-r--r--cli/Cargo.toml10
-rw-r--r--cli/src/lib.rs60
-rw-r--r--formats/energymech/Cargo.toml4
-rw-r--r--formats/energymech/src/lib.rs20
-rw-r--r--formats/weechat/Cargo.toml4
-rw-r--r--formats/weechat/src/lib.rs82
-rw-r--r--ops/Cargo.toml6
-rw-r--r--ops/src/lib.rs78
-rw-r--r--src/lib.rs3
-rw-r--r--tests/files/mod.rs25
-rw-r--r--tests/lib.rs46
16 files changed, 296 insertions, 78 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 5c03dac..b7c0a93 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -4,18 +4,21 @@ description = "IRC log converter/collector/cruncher"
license = "Apache-2.0"
name = "ilc"
repository = "https://github.com/tilpner/ilc"
-version = "0.2.2"
+version = "0.3.0"
[[bin]]
name = "ilc"
doc = false
[dependencies]
-ilc-cli = "0.1.0"
-ilc-base = "0.1.0"
-ilc-ops = "0.1.0"
-ilc-format-weechat = "0.1.0"
-ilc-format-energymech = "0.1.0"
+ilc-cli = "~0.1"
+ilc-base = "~0.2"
+ilc-ops = "~0.1"
+ilc-format-weechat = "~0.2"
+ilc-format-energymech = "~0.2"
+
+[dev-dependencies]
+flate2 = "~0.2"
[profile.release]
debug = false
diff --git a/base/Cargo.toml b/base/Cargo.toml
index 541de02..f82cff0 100644
--- a/base/Cargo.toml
+++ b/base/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "ilc-base"
-version = "0.1.2"
+version = "0.2.0"
description = "IRC log converter/collector/cruncher"
homepage = "https://github.com/tilpner/ilc"
license = "Apache-2.0"
diff --git a/base/src/context.rs b/base/src/context.rs
index 4393457..fe264ab 100644
--- a/base/src/context.rs
+++ b/base/src/context.rs
@@ -7,3 +7,13 @@ pub struct Context {
pub override_date: Option<NaiveDate>,
pub channel: Option<String>,
}
+
+impl Default for Context {
+ fn default() -> Context {
+ Context {
+ timezone: FixedOffset::west(0),
+ override_date: None,
+ channel: None,
+ }
+ }
+}
diff --git a/base/src/dummy.rs b/base/src/dummy.rs
index 9317c4e..a0ab8b4 100644
--- a/base/src/dummy.rs
+++ b/base/src/dummy.rs
@@ -18,10 +18,11 @@ use std::io::BufRead;
use event::Event;
use context::Context;
+#[derive(Copy, Clone)]
pub struct Dummy;
impl ::Decode for Dummy {
- fn decode<'a>(&'a mut self,
+ fn decode<'a>(&'a self,
_context: &'a Context,
_input: &'a mut BufRead)
-> Box<Iterator<Item = ::Result<Event<'a>>> + 'a> {
diff --git a/base/src/lib.rs b/base/src/lib.rs
index 1f56dcd..f2777e3 100644
--- a/base/src/lib.rs
+++ b/base/src/lib.rs
@@ -26,7 +26,7 @@ pub mod dummy;
use std::io::{BufRead, Write};
pub use context::Context;
-pub use event::Event;
+pub use event::{Event, Time};
pub use error::*;
pub trait Encode {
@@ -37,8 +37,8 @@ pub trait Encode {
-> error::Result<()>;
}
-pub trait Decode {
- fn decode<'a>(&'a mut self,
+pub trait Decode {
+ fn decode<'a>(&'a self,
context: &'a Context,
input: &'a mut BufRead)
-> Box<Iterator<Item = error::Result<Event<'a>>> + 'a>;
diff --git a/cli/Cargo.toml b/cli/Cargo.toml
index e984319..8e66b7e 100644
--- a/cli/Cargo.toml
+++ b/cli/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "ilc-cli"
-version = "0.1.1"
+version = "0.1.2"
description = "IRC log converter/collector/cruncher"
homepage = "https://github.com/tilpner/ilc"
license = "Apache-2.0"
@@ -16,7 +16,7 @@ clap = "2.1.2"
chrono = "0.2.19"
env_logger = "0.3.2"
glob = "0.2.10"
-ilc-base = "0.1.0"
-ilc-ops = "0.1.0"
-ilc-format-weechat = { optional = true, version = "0.1.0" }
-ilc-format-energymech = { optional = true, version = "0.1.0" }
+ilc-base = "~0.2"
+ilc-ops = "~0.1"
+ilc-format-weechat = { optional = true, version = "~0.2" }
+ilc-format-energymech = { optional = true, version = "~0.2" }
diff --git a/cli/src/lib.rs b/cli/src/lib.rs
index 40bc2e7..9469cfb 100644
--- a/cli/src/lib.rs
+++ b/cli/src/lib.rs
@@ -47,6 +47,10 @@ mod chain;
pub fn main() {
env_logger::init().unwrap();
+ if option_env!("FUSE").is_some() {
+ info!("Compiled with FUSEs")
+ }
+
let args = App::new("ilc")
.version(crate_version!())
.setting(AppSettings::GlobalVersion)
@@ -181,21 +185,22 @@ pub fn main() {
&*e.encoder())
}
("merge", Some(args)) => {
- // TODO: avoid (de-)serialization to weechat
let e = Environment(&args);
- let (ctx, i, d, o, e) = (&e.context(),
- &mut e.input(),
- &mut *e.decoder(),
- &mut *e.output(),
- &*e.encoder());
- let mut buffer = Vec::new();
- match sort::sort(ctx, i, d, &mut buffer, &Weechat) {
- Err(e) => error(Box::new(e)),
- _ => (),
- }
- let mut read = io::Cursor::new(&buffer);
- dedup::dedup(ctx, &mut read, &mut Weechat, o, e)
+ let mut inputs = e.inputs();
+ // let mut decoders = e.decoders();
+
+ let borrowed_inputs = inputs.iter_mut()
+ .map(|a| a as &mut BufRead)
+ .collect();
+ // let borrowed_decoders = decoders.iter_mut()
+ // .map(|a| &mut **a as &mut Decode)
+ // .collect();
+ merge::merge(&e.context(),
+ borrowed_inputs,
+ &mut *e.decoder(),
+ &mut *e.output(),
+ &*e.encoder())
}
(sc, _) if !sc.is_empty() => panic!("Unimplemented subcommand `{}`, this is a bug", sc),
_ => die("No command specified"),
@@ -272,15 +277,44 @@ impl<'a> Environment<'a> {
pub fn context(&self) -> Context {
build_context(self.0)
}
+
pub fn input(&self) -> Box<BufRead> {
open_files(gather_input(self.0))
}
+
+ pub fn inputs(&self) -> Vec<Box<BufRead>> {
+ gather_input(self.0)
+ .iter()
+ .map(|path| {
+ Box::new(BufReader::new(File::open(path)
+ .unwrap_or_else(|e| {
+ error(Box::new(e))
+ }))) as Box<BufRead>
+ })
+ .collect()
+ }
+
pub fn output(&self) -> Box<Write> {
open_output(self.0)
}
+
pub fn decoder(&self) -> Box<Decode> {
force_decoder(self.0.value_of("format").or(self.0.value_of("input_format")))
}
+
+ /* pub fn decoders(&self) -> Vec<Box<Decode>> {
+ * self.0
+ * .value_of("format")
+ * .into_iter()
+ * .chain(self.0
+ * .values_of("input_formats")
+ * .map(|i| Box::new(i) as Box<Iterator<Item = _>>)
+ * .unwrap_or(Box::new(iter::empty()) as Box<Iterator<Item = _>>))
+ * .map(Option::Some)
+ * .map(force_decoder)
+ * .collect()
+ * } */
+
pub fn encoder(&self) -> Box<Encode> {
force_encoder(self.0.value_of("format").or(self.0.value_of("output_format")))
}
diff --git a/formats/energymech/Cargo.toml b/formats/energymech/Cargo.toml
index 09e41b1..b5cad0a 100644
--- a/formats/energymech/Cargo.toml
+++ b/formats/energymech/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "ilc-format-energymech"
-version = "0.1.1"
+version = "0.2.0"
description = "IRC log converter/collector/cruncher"
homepage = "https://github.com/tilpner/ilc"
license = "Apache-2.0"
@@ -10,4 +10,4 @@ authors = ["Till Höppner <till@hoeppner.ws>"]
[dependencies]
log = "0.3.5"
chrono = "0.2.19"
-ilc-base = "0.1.0"
+ilc-base = "~0.2"
diff --git a/formats/energymech/src/lib.rs b/formats/energymech/src/lib.rs
index e0863e7..ed687f0 100644
--- a/formats/energymech/src/lib.rs
+++ b/formats/energymech/src/lib.rs
@@ -29,6 +29,7 @@ use log::LogLevel::Info;
use chrono::*;
+#[derive(Copy, Clone)]
pub struct Energymech;
static TIME_FORMAT: &'static str = "%H:%M:%S";
@@ -219,12 +220,15 @@ impl<'a> Iterator for Iter<'a> {
channel: self.context.channel.clone().map(Into::into),
}));
}
+ if option_env!("FUSE").is_some() {
+ panic!("Shouldn't reach here, this is a bug!")
+ }
}
}
}
impl Decode for Energymech {
- fn decode<'a>(&'a mut self,
+ fn decode<'a>(&'a self,
context: &'a Context,
input: &'a mut BufRead)
-> Box<Iterator<Item = ilc_base::Result<Event<'a>>> + 'a> {
@@ -250,6 +254,13 @@ impl Encode for Energymech {
from,
content))
}
+ &Event { ty: Type::Notice { ref from, ref content }, ref time, .. } => {
+ try!(writeln!(&mut output,
+ "[{}] -{}- {}",
+ time.with_format(&context.timezone, TIME_FORMAT),
+ from,
+ content))
+ }
&Event { ty: Type::Action { ref from, ref content }, ref time, .. } => {
try!(writeln!(&mut output,
"[{}] * {} {}",
@@ -302,7 +313,12 @@ impl Encode for Energymech {
nick.as_ref().expect("Nick not present, but required."),
new_topic))
}
- _ => (),
+ _ => {
+ if option_env!("FUSE").is_some() {
+ panic!("Shouldn't reach here, this is a bug!")
+ }
+ ()
+ }
}
Ok(())
}
diff --git a/formats/weechat/Cargo.toml b/formats/weechat/Cargo.toml
index 5079d50..244932a 100644
--- a/formats/weechat/Cargo.toml
+++ b/formats/weechat/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "ilc-format-weechat"
-version = "0.1.1"
+version = "0.2.0"
description = "IRC log converter/collector/cruncher"
homepage = "https://github.com/tilpner/ilc"
license = "Apache-2.0"
@@ -10,4 +10,4 @@ authors = ["Till Höppner <till@hoeppner.ws>"]
[dependencies]
log = "0.3.5"
chrono = "0.2.19"
-ilc-base = "0.1.0"
+ilc-base = "~0.2"
diff --git a/formats/weechat/src/lib.rs b/formats/weechat/src/lib.rs
index 65bb4fa..4e1bd42 100644
--- a/formats/weechat/src/lib.rs
+++ b/formats/weechat/src/lib.rs
@@ -26,6 +26,7 @@ use ilc_base::format::{rejoin, strip_one};
use log::LogLevel::Info;
+#[derive(Copy, Clone)]
pub struct Weechat;
static TIME_DATE_FORMAT: &'static str = "%Y-%m-%d %H:%M:%S";
@@ -76,44 +77,40 @@ impl<'a> Iterator for Iter<'a> {
let len = tokens.len();
- if len >= 6 && tokens[5] == "has" {
- // 2016-02-25 01:15:05 --> Foo (host@mask.foo) has joined #example
- if len >= 8 && tokens[6] == "joined" {
- return Some(Ok(Event {
- ty: Type::Join {
- nick: tokens[3].to_owned().into(),
- mask: Some(strip_one(tokens[4]).into()),
- },
- channel: Some(tokens[7].to_owned().into()),
- time: parse_time(&self.context, tokens[0], tokens[1]),
- }));
- }
- // 2016-02-25 01:36:13 <-- Foo (host@mask.foo) has left #channel (Some reason)
- else if len >= 9 && tokens[6] == "left" {
- return Some(Ok(Event {
- ty: Type::Part {
- nick: tokens[3].to_owned().into(),
- mask: Some(strip_one(&tokens[4]).into()),
- reason: Some(strip_one(&rejoin(&tokens[8..], &split_tokens[8..]))
- .into()),
- },
- channel: Some(tokens[7].to_owned().into()),
- time: parse_time(&self.context, tokens[0], tokens[1]),
- }));
- }
- // 2016-02-25 01:38:55 <-- Foo (host@mask.foo) has quit (Some reason)
- else if len >= 8 && tokens[6] == "quit" {
- return Some(Ok(Event {
- ty: Type::Quit {
- nick: tokens[3].to_owned().into(),
- mask: Some(strip_one(tokens[4]).into()),
- reason: Some(strip_one(&rejoin(&tokens[7..], &split_tokens[7..]))
- .into()),
- },
- time: parse_time(&self.context, tokens[0], tokens[1]),
- channel: self.context.channel.clone().map(Into::into),
- }));
- }
+ // 2016-02-25 01:15:05 --> Foo (host@mask.foo) has joined #example
+ if len >= 8 && tokens[5] == "has" && tokens[6] == "joined" {
+ return Some(Ok(Event {
+ ty: Type::Join {
+ nick: tokens[3].to_owned().into(),
+ mask: Some(strip_one(tokens[4]).into()),
+ },
+ channel: Some(tokens[7].to_owned().into()),
+ time: parse_time(&self.context, tokens[0], tokens[1]),
+ }));
+ }
+ // 2016-02-25 01:36:13 <-- Foo (host@mask.foo) has left #channel (Some reason)
+ else if len >= 9 && tokens[5] == "has" && tokens[6] == "left" {
+ return Some(Ok(Event {
+ ty: Type::Part {
+ nick: tokens[3].to_owned().into(),
+ mask: Some(strip_one(&tokens[4]).into()),
+ reason: Some(strip_one(&rejoin(&tokens[8..], &split_tokens[8..])).into()),
+ },
+ channel: Some(tokens[7].to_owned().into()),
+ time: parse_time(&self.context, tokens[0], tokens[1]),
+ }));
+ }
+ // 2016-02-25 01:38:55 <-- Foo (host@mask.foo) has quit (Some reason)
+ else if len >= 8 && tokens[5] == "has" && tokens[6] == "quit" {
+ return Some(Ok(Event {
+ ty: Type::Quit {
+ nick: tokens[3].to_owned().into(),
+ mask: Some(strip_one(tokens[4]).into()),
+ reason: Some(strip_one(&rejoin(&tokens[7..], &split_tokens[7..])).into()),
+ },
+ time: parse_time(&self.context, tokens[0], tokens[1]),
+ channel: self.context.channel.clone().map(Into::into),
+ }));
} else if len >= 3 && tokens[2] == "--" {
// 2016-02-25 04:32:15 -- Notice(playbot-veno): ""
if len >= 5 && tokens[3].starts_with("Notice(") {
@@ -176,7 +173,7 @@ impl<'a> Iterator for Iter<'a> {
}
impl Decode for Weechat {
- fn decode<'a>(&'a mut self,
+ fn decode<'a>(&'a self,
context: &'a Context,
input: &'a mut BufRead)
-> Box<Iterator<Item = ilc_base::Result<Event<'a>>> + 'a> {
@@ -252,7 +249,12 @@ impl Encode for Weechat {
from,
content))
}
- _ => (),
+ _ => {
+ if option_env!("FUSE").is_some() {
+ panic!("Shouldn't reach here, this is a bug!")
+ }
+ ()
+ }
}
Ok(())
}
diff --git a/ops/Cargo.toml b/ops/Cargo.toml
index b0b0ce6..9a31871 100644
--- a/ops/Cargo.toml
+++ b/ops/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "ilc-ops"
-version = "0.1.1"
+version = "0.1.2"
description = "IRC log converter/collector/cruncher"
homepage = "https://github.com/tilpner/ilc"
license = "Apache-2.0"
@@ -8,5 +8,7 @@ repository = "https://github.com/tilpner/ilc"
authors = ["Till Höppner <till@hoeppner.ws>"]
[dependencies]
-ilc-base = "0.1.0"
+log = "0.3.5"
+ilc-base = "~0.2"
blist = "0.0.4"
+bit-set = "0.3.0"
diff --git a/ops/src/lib.rs b/ops/src/lib.rs
index e5d92cb..f148037 100644
--- a/ops/src/lib.rs
+++ b/ops/src/lib.rs
@@ -1,4 +1,7 @@
+#[macro_use]
+extern crate log;
extern crate blist;
+extern crate bit_set;
extern crate ilc_base;
mod ageset;
@@ -13,7 +16,10 @@ pub mod parse {
/// This will return `Err` if the decoder yields `Err`.
pub fn parse(ctx: &Context, input: &mut BufRead, decoder: &mut Decode) -> ilc_base::Result<()> {
for e in decoder.decode(&ctx, input) {
- try!(e);
+ match e {
+ Ok(e) => debug!("{:?}", e),
+ Err(e) => error!("{:?}", e),
+ }
}
Ok(())
}
@@ -149,3 +155,73 @@ pub mod dedup {
Ok(())
}
}
+
+/// "Efficient" n-way merging
+pub mod merge {
+ use std::io::{BufRead, Write};
+ use bit_set::BitSet;
+ use ilc_base::{self, Context, Decode, Encode, Event};
+
+ /// Merge several individually sorted logs, *without* reading everything
+ /// into memory.
+ ///
+ /// The `input` and `decode` parameter will be zipped, so make sure they match up.
+ ///
+ /// Output will be inconsistent if every input isn't sorted by itself.
+ /// Logs with incomplete dates are free to do to weird stuff.
+ pub fn merge<'a>(ctx: &Context,
+ input: Vec<&'a mut BufRead>,
+ decode: &mut Decode,
+ output: &mut Write,
+ encode: &Encode)
+ -> ilc_base::Result<()> {
+ let mut events = input.into_iter()
+ .map(|i| decode.decode(&ctx, i).peekable())
+ .collect::<Vec<_>>();
+ let mut empty = BitSet::with_capacity(events.len());
+
+ loop {
+ if events.is_empty() {
+ return Ok(());
+ }
+
+ let earliest_idx = {
+ // Lifetimes can be a lot easier if you don't nest closures.
+ // Uglier too, yes. But hey, at least it compiles.
+ // Currently earliest event
+ let mut current = None;
+ // Index of stream that has "current" as head
+ let mut stream_idx = None;
+
+ for (idx, stream) in events.iter_mut().enumerate() {
+ let peek = stream.peek();
+ if let Some(ev) = peek {
+ // Ignore errors in stream
+ if let &Ok(ref ev) = ev {
+ if current.map(|c: &Event| ev.time < c.time).unwrap_or(true) {
+ current = Some(ev);
+ stream_idx = Some(idx);
+ }
+ }
+ } else {
+ empty.insert(idx);
+ }
+ }
+ stream_idx
+ };
+
+ // Safe because of matching against Some(&Ok(ref ev)) earlier
+ let earliest = earliest_idx.map(|idx| events[idx].next().unwrap().unwrap());
+
+ if let Some(event) = earliest {
+ try!(encode.encode(&ctx, output, &event));
+ }
+
+ // Keep non-empty streams
+ for (offset, idx) in empty.iter().enumerate() {
+ events.remove(offset + idx);
+ }
+ empty.clear();
+ }
+ }
+}
diff --git a/src/lib.rs b/src/lib.rs
index 724e9f9..ab8040a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -16,3 +16,6 @@ pub use ilc_ops::freq::freq;
pub use ilc_ops::parse::parse;
pub use ilc_ops::seen::seen;
pub use ilc_ops::sort::sort;
+
+pub use ilc_format_weechat::Weechat;
+pub use ilc_format_energymech::Energymech;
diff --git a/tests/files/mod.rs b/tests/files/mod.rs
new file mode 100644
index 0000000..5b25e90
--- /dev/null
+++ b/tests/files/mod.rs
@@ -0,0 +1,25 @@
+extern crate flate2;
+
+use std::fs::File;
+use std::path::Path;
+use std::io::{BufReader, Read, Write};
+
+use self::flate2::FlateReadExt;
+
+#[allow(dead_code)]
+pub fn read(path: &str) -> Vec<u8> {
+ let path = Path::new("tests").join("input").join(&format!("{}.gz", path));
+ let size = path.metadata().expect("Couldn't determine filesize").len();
+ let mut out = Vec::with_capacity(size as usize);
+ let mut input = BufReader::new(File::open(path).expect("Couldn't open file"))
+ .gz_decode()
+ .expect("Couldn't decode GZ stream");
+ input.read_to_end(&mut out).expect("Couldn't read data");
+ out
+}
+
+#[allow(dead_code)]
+pub fn write<P: AsRef<Path>>(p: P, b: &[u8]) {
+ let mut out = File::create(p).expect("Couldn't create file");
+ out.write_all(b).expect("Couldn't write data");
+}
diff --git a/tests/lib.rs b/tests/lib.rs
new file mode 100644
index 0000000..d236f49
--- /dev/null
+++ b/tests/lib.rs
@@ -0,0 +1,46 @@
+extern crate ilc;
+
+use std::default::Default;
+use std::io::Cursor;
+
+use ilc::*;
+
+mod files;
+
+#[test]
+fn identity() {
+ let original = files::read("2016-02-26.log");
+ let mut output = Vec::new();
+
+ convert(&Context::default(),
+ &mut (&original as &[u8]),
+ &mut Energymech,
+ &mut output,
+ &Energymech)
+ .expect("Conversion failed");
+
+ files::write("identity.out", &output);
+
+ // don't assert_eq!, as the failed ouput doesn't help anyone
+ assert!(&original == &output);
+}
+
+/* #[test]
+ * fn merge() {
+ * let part1 = Cursor::new(files::read("2016-02-26.log.1"));
+ * let part2 = Cursor::new(files::read("2016-02-26.log.2"));
+ *
+ * let mut output = Vec::new();
+ *
+ * merge(&Context::default(),
+ * vec![&mut part1, &mut part2],
+ * &mut Energymech,
+ * &mut output,
+ * &Energymech)
+ * .expect("Merge failed");
+ *
+ * files::write("merged.out", &output);
+ *
+ * let original = files::read("2016-02-26.log");
+ * assert!(&original == &output);
+ * } */