From 64106c4d3d4ddba8c7bc2af75376e6d3d3d75601 Mon Sep 17 00:00:00 2001 From: Date: Mon, 29 Jun 2015 20:16:15 +0000 Subject: Update documentation --- src/carboxyl/lib.rs.html | 419 ++++++++++ src/carboxyl/lift.rs.html | 605 ++++++++++++++ src/carboxyl/pending.rs.html | 271 +++++++ src/carboxyl/readonly.rs.html | 173 ++++ src/carboxyl/signal.rs.html | 1601 +++++++++++++++++++++++++++++++++++++ src/carboxyl/source.rs.html | 359 +++++++++ src/carboxyl/stream.rs.html | 1645 ++++++++++++++++++++++++++++++++++++++ src/carboxyl/transaction.rs.html | 459 +++++++++++ 8 files changed, 5532 insertions(+) create mode 100644 src/carboxyl/lib.rs.html create mode 100644 src/carboxyl/lift.rs.html create mode 100644 src/carboxyl/pending.rs.html create mode 100644 src/carboxyl/readonly.rs.html create mode 100644 src/carboxyl/signal.rs.html create mode 100644 src/carboxyl/source.rs.html create mode 100644 src/carboxyl/stream.rs.html create mode 100644 src/carboxyl/transaction.rs.html (limited to 'src/carboxyl') diff --git a/src/carboxyl/lib.rs.html b/src/carboxyl/lib.rs.html new file mode 100644 index 0000000..912fa0d --- /dev/null +++ b/src/carboxyl/lib.rs.html @@ -0,0 +1,419 @@ + + + + + + + + + + lib.rs.html -- source + + + + + + + + + + + + + + + +
  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+ 11
+ 12
+ 13
+ 14
+ 15
+ 16
+ 17
+ 18
+ 19
+ 20
+ 21
+ 22
+ 23
+ 24
+ 25
+ 26
+ 27
+ 28
+ 29
+ 30
+ 31
+ 32
+ 33
+ 34
+ 35
+ 36
+ 37
+ 38
+ 39
+ 40
+ 41
+ 42
+ 43
+ 44
+ 45
+ 46
+ 47
+ 48
+ 49
+ 50
+ 51
+ 52
+ 53
+ 54
+ 55
+ 56
+ 57
+ 58
+ 59
+ 60
+ 61
+ 62
+ 63
+ 64
+ 65
+ 66
+ 67
+ 68
+ 69
+ 70
+ 71
+ 72
+ 73
+ 74
+ 75
+ 76
+ 77
+ 78
+ 79
+ 80
+ 81
+ 82
+ 83
+ 84
+ 85
+ 86
+ 87
+ 88
+ 89
+ 90
+ 91
+ 92
+ 93
+ 94
+ 95
+ 96
+ 97
+ 98
+ 99
+100
+101
+102
+103
+104
+105
+106
+107
+108
+109
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149
+150
+151
+152
+153
+154
+155
+156
+157
+158
+159
+160
+161
+
+//! *Carboxyl* provides primitives for functional reactive programming in Rust.
+//! It draws inspiration from the [Sodium][sodium] libraries and Push-Pull FRP,
+//! as described by [Elliott (2009)][elliott_push_pull].
+//!
+//! [sodium]: https://github.com/SodiumFRP/sodium/
+//! [elliott_push_pull]: http://conal.net/papers/push-pull-frp/push-pull-frp.pdf
+//!
+//!
+//! # Overview
+//!
+//! Functional reactive programming (FRP) is a composable and modular
+//! abstraction for creating dynamic and reactive systems. In its most general
+//! form it models these systems as a composition of two basic primitives:
+//! *streams* are a series of singular events and *signals* are continuously
+//! changing values.
+//!
+//! *Carboxyl* is an imperative, hybrid push- and pull-based implementation of
+//! FRP. Streams and the discrete components of signals are data-driven, i.e.
+//! whenever an event occurs the resulting changes are propagated to everything
+//! that depends on it.
+//!
+//! However, the continuous components of signals are demand-driven. Internally,
+//! *Carboxyl* stores the state of a signal as a function. This function has to
+//! be evaluated by consumers of a signal to obtain a concrete value.
+//!
+//! Nonetheless, *Carboxyl* has no explicit notion of time. Its signals are
+//! functions that can be evaluated at any time, but they do not carry any
+//! inherent notion of time. Synchronization and atomicity is achieved by a
+//! transaction system.
+//!
+//!
+//! # Functional reactive primitives
+//!
+//! This library provides two basic types: `Stream` and `Signal`. A stream is a
+//! discrete sequence of events, a signal is a container for values that change
+//! (discretely) over time.
+//!
+//! The FRP primitives are mostly implemented as methods of the basic types to
+//! ease method chaining, except for the various lifting functions, as they do
+//! not really belong to any type in particular.
+//!
+//! In addition, the `Sink` type allows one to create a stream of events by
+//! sending values into it. It is the only way to create a stream from scratch,
+//! i.e. without using any of the other primitives.
+//!
+//!
+//! # Usage example
+//!
+//! Here is a simple example of how you can use the primitives provided by
+//! *Carboxyl*. First of all, events can be sent into a *sink*. From a sink one
+//! can create a *stream* of events. Streams can also be filtered, mapped and
+//! merged. One can e.g. hold the last event from a stream as a signal.
+//!
+//! ```
+//! use carboxyl::Sink;
+//!
+//! let sink = Sink::new();
+//! let stream = sink.stream();
+//! let signal = stream.hold(3);
+//!
+//! // The current value of the signal is initially 3
+//! assert_eq!(signal.sample(), 3);
+//!
+//! // When we fire an event, the signal get updated accordingly
+//! sink.send(5);
+//! assert_eq!(signal.sample(), 5);
+//! ```
+//!
+//! One can also directly iterate over the stream instead of holding it in a
+//! signal:
+//!
+//! ```
+//! # use carboxyl::Sink;
+//! # let sink = Sink::new();
+//! # let stream = sink.stream();
+//! let mut events = stream.events();
+//! sink.send(4);
+//! assert_eq!(events.next(), Some(4));
+//! ```
+//!
+//! Streams and signals can be combined using various primitives. We can map a
+//! stream to another stream using a function:
+//!
+//! ```
+//! # use carboxyl::Sink;
+//! # let sink: Sink<i32> = Sink::new();
+//! # let stream = sink.stream();
+//! let squares = stream.map(|x| x * x).hold(0);
+//! sink.send(4);
+//! assert_eq!(squares.sample(), 16);
+//! ```
+//!
+//! Or we can filter a stream to create a new one that only contains events that
+//! satisfy a certain predicate:
+//!
+//! ```
+//! # use carboxyl::Sink;
+//! # let sink: Sink<i32> = Sink::new();
+//! # let stream = sink.stream();
+//! let negatives = stream.filter(|&x| x < 0).hold(0);
+//!
+//! // This won't arrive at the signal.
+//! sink.send(4);
+//! assert_eq!(negatives.sample(), 0);
+//!
+//! // But this will!
+//! sink.send(-3);
+//! assert_eq!(negatives.sample(), -3);
+//! ```
+//!
+//! There are some other methods on streams and signals, that you can find in
+//! their respective APIs.
+//!
+//! Note that all these objects are `Send + Sync + Clone`. This means you can
+//! easily pass them around in your code, make clones, give them to another
+//! thread, and they will still be updated correctly.
+//!
+//! You may have noticed that certain primitives take a function as an argument.
+//! There is a limitation on what kind of functions can and should be used here.
+//! In general, as FRP provides an abstraction around mutable state, they should
+//! be pure functions (i.e. free of side effects).
+//!
+//! For the most part this is guaranteed by Rust's type system. A static
+//! function with a matching signature always works. A closure though is very
+//! restricted: it must not borrow its environment, as it is impossible to
+//! satisfy the lifetime requirements for that. So you can only move stuff into
+//! it from the environment. However, the moved contents of the closure may also
+//! not be altered, which is guaranteed by the `Fn(…) -> …)` trait bound.
+//!
+//! However, both closures and functions could still have side effects such as
+//! I/O, changing mutable state via `Mutex` or `RefCell`, etc. While Rust's type
+//! system cannot prevent this, you should generally not pass such functions to
+//! the FRP primitives, as they break the benefits you get from using FRP.
+//! (An exception here is debugging output.)
+
+#![feature(arc_weak, fnbox)]
+#![cfg_attr(test, feature(test, scoped))]
+#![warn(missing_docs)]
+
+#[cfg(test)]
+extern crate test;
+#[cfg(test)]
+extern crate rand;
+#[cfg(test)]
+extern crate quickcheck;
+#[macro_use(lazy_static)]
+extern crate lazy_static;
+
+pub use stream::{ Sink, Stream };
+pub use signal::{ Signal, SignalMut };
+
+mod transaction;
+mod source;
+mod pending;
+mod readonly;
+mod stream;
+mod signal;
+#[macro_use]
+pub mod lift;
+#[cfg(test)]
+mod testing;
+
+
+ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/carboxyl/lift.rs.html b/src/carboxyl/lift.rs.html new file mode 100644 index 0000000..224dbe8 --- /dev/null +++ b/src/carboxyl/lift.rs.html @@ -0,0 +1,605 @@ + + + + + + + + + + lift.rs.html -- source + + + + + + + + + + + + + + + +
  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+ 11
+ 12
+ 13
+ 14
+ 15
+ 16
+ 17
+ 18
+ 19
+ 20
+ 21
+ 22
+ 23
+ 24
+ 25
+ 26
+ 27
+ 28
+ 29
+ 30
+ 31
+ 32
+ 33
+ 34
+ 35
+ 36
+ 37
+ 38
+ 39
+ 40
+ 41
+ 42
+ 43
+ 44
+ 45
+ 46
+ 47
+ 48
+ 49
+ 50
+ 51
+ 52
+ 53
+ 54
+ 55
+ 56
+ 57
+ 58
+ 59
+ 60
+ 61
+ 62
+ 63
+ 64
+ 65
+ 66
+ 67
+ 68
+ 69
+ 70
+ 71
+ 72
+ 73
+ 74
+ 75
+ 76
+ 77
+ 78
+ 79
+ 80
+ 81
+ 82
+ 83
+ 84
+ 85
+ 86
+ 87
+ 88
+ 89
+ 90
+ 91
+ 92
+ 93
+ 94
+ 95
+ 96
+ 97
+ 98
+ 99
+100
+101
+102
+103
+104
+105
+106
+107
+108
+109
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149
+150
+151
+152
+153
+154
+155
+156
+157
+158
+159
+160
+161
+162
+163
+164
+165
+166
+167
+168
+169
+170
+171
+172
+173
+174
+175
+176
+177
+178
+179
+180
+181
+182
+183
+184
+185
+186
+187
+188
+189
+190
+191
+192
+193
+194
+195
+196
+197
+198
+199
+200
+201
+202
+203
+204
+205
+206
+207
+208
+209
+210
+211
+212
+213
+214
+215
+216
+217
+218
+219
+220
+221
+222
+223
+224
+225
+226
+227
+228
+229
+230
+231
+232
+233
+234
+235
+236
+237
+238
+239
+240
+241
+242
+243
+244
+245
+246
+247
+248
+249
+250
+251
+252
+253
+254
+
+//! Lifting of n-ary functions.
+//!
+//! A lift maps a function on values to a function on signals. Given a function of
+//! type `F: Fn(A, B, …) -> R` and signals of types `Signal<A>, Signal<B>, …` the
+//! `lift!` macro creates a `Signal<R>`, whose content is computed using the
+//! function.
+//!
+//! Currently lift is only implemented for functions with up to four arguments.
+//! This limitation is due to the current implementation strategy (and maybe
+//! limitations of Rust's type system), but it can be increased to arbitrary but
+//! finite arity if required.
+//!
+//! # Example
+//!
+//! ```
+//! # #[macro_use] extern crate carboxyl;
+//! # fn main() {
+//! # use carboxyl::Sink;
+//! let sink_a = Sink::new();
+//! let sink_b = Sink::new();
+//! let product = lift!(
+//!     |a, b| a * b,
+//!     &sink_a.stream().hold(0),
+//!     &sink_b.stream().hold(0)
+//! );
+//! assert_eq!(product.sample(), 0);
+//! sink_a.send(3);
+//! sink_b.send(5);
+//! assert_eq!(product.sample(), 15);
+//! # }
+//! ```
+
+use std::sync::Arc;
+use signal::{ Signal, SignalFn, signal_build, signal_current, signal_source, reg_signal, sample_raw };
+use transaction::commit;
+
+
+#[macro_export]
+macro_rules! lift {
+    ($f: expr)
+        => ( $crate::lift::lift0($f) );
+
+    ($f: expr, $a: expr)
+        => ( $crate::lift::lift1($f, $a) );
+
+    ($f: expr, $a: expr, $b: expr)
+        => ( $crate::lift::lift2($f, $a, $b) );
+
+    ($f: expr, $a: expr, $b: expr, $c: expr)
+        => ( $crate::lift::lift3($f, $a, $b, $c) );
+
+    ($f: expr, $a: expr, $b: expr, $c: expr, $d: expr)
+        => ( $crate::lift::lift4($f, $a, $b, $c, $d) );
+}
+
+
+/// Lift a 0-ary function.
+pub fn lift0<A, F>(f: F) -> Signal<A>
+    where F: Fn() -> A + Send + Sync + 'static
+{
+    commit(|| signal_build(SignalFn::from_fn(f), ()))
+}
+
+
+/// Lift a unary function.
+pub fn lift1<A, B, F>(f: F, sa: &Signal<A>) -> Signal<B>
+    where A: Send + Sync + Clone + 'static,
+          B: Send + Sync + Clone + 'static,
+          F: Fn(A) -> B + Send + Sync + 'static,
+{
+    fn make_callback<A, B, F>(f: &Arc<F>, parent: &Signal<A>) -> SignalFn<B>
+        where A: Send + Sync + Clone + 'static,
+              B: Send + Sync + Clone + 'static,
+              F: Fn(A) -> B + Send + Sync + 'static,
+    {
+        let pclone = parent.clone();
+        let f = f.clone();
+        match *signal_current(&parent).read().unwrap().future() {
+            SignalFn::Const(ref a) => SignalFn::Const(f(a.clone())),
+            SignalFn::Func(_) => SignalFn::from_fn(move || f(sample_raw(&pclone))),
+        }
+    }
+
+    commit(|| {
+        let f = Arc::new(f);
+        let signal = signal_build(make_callback(&f, &sa), ());
+        let sa_clone = sa.clone();
+        reg_signal(&mut signal_source(&sa).write().unwrap(), &signal,
+            move |_| make_callback(&f, &sa_clone));
+        signal
+    })
+}
+
+
+/// Lift a binary function.
+pub fn lift2<A, B, C, F>(f: F, sa: &Signal<A>, sb: &Signal<B>) -> Signal<C>
+    where A: Send + Sync + Clone + 'static,
+          B: Send + Sync + Clone + 'static,
+          C: Send + Sync + Clone + 'static,
+          F: Fn(A, B) -> C + Send + Sync + 'static,
+{
+    fn make_callback<A, B, C, F>(f: &Arc<F>, sa: &Signal<A>, sb: &Signal<B>) -> SignalFn<C>
+        where A: Send + Sync + Clone + 'static,
+              B: Send + Sync + Clone + 'static,
+              C: Send + Sync + Clone + 'static,
+              F: Fn(A, B) -> C + Send + Sync + 'static,
+    {
+        use signal::SignalFn::{ Const, Func };
+        let sa_clone = sa.clone();
+        let sb_clone = sb.clone();
+        let f = f.clone();
+        match (
+            signal_current(&sa).read().unwrap().future(),
+            signal_current(&sb).read().unwrap().future(),
+        ) {
+            (&Const(ref a), &Const(ref b)) => Const(f(a.clone(), b.clone())),
+            (&Const(ref a), &Func(_)) => {
+                let a = a.clone();
+                SignalFn::from_fn(move || f(a.clone(), sample_raw(&sb_clone)))
+            },
+            (&Func(_), &Const(ref b)) => {
+                let b = b.clone();
+                SignalFn::from_fn(move || f(sample_raw(&sa_clone), b.clone()))
+            },
+            (&Func(_), &Func(_)) => SignalFn::from_fn(
+                move || f(sample_raw(&sa_clone), sample_raw(&sb_clone))
+            ),
+        }
+    }
+
+    commit(move || {
+        let f = Arc::new(f);
+        let signal = signal_build(make_callback(&f, &sa, &sb), ());
+        reg_signal(&mut signal_source(&sa).write().unwrap(), &signal, {
+            let sa_clone = sa.clone();
+            let sb_clone = sb.clone();
+            let f = f.clone();
+            move |_| make_callback(&f, &sa_clone, &sb_clone)
+        });
+        reg_signal(&mut signal_source(&sb).write().unwrap(), &signal, {
+            let sa_clone = sa.clone();
+            let sb_clone = sb.clone();
+            move |_| make_callback(&f, &sa_clone, &sb_clone)
+        });
+        signal
+    })
+}
+
+/// Lift a ternary function.
+pub fn lift3<F, A, B, C, Ret>(f: F, ca: &Signal<A>, cb: &Signal<B>, cc: &Signal<C>)
+    -> Signal<Ret>
+where F: Fn(A, B, C) -> Ret + Send + Sync + 'static,
+      A: Send + Sync + Clone + 'static,
+      B: Send + Sync + Clone + 'static,
+      C: Send + Sync + Clone + 'static,
+      Ret: Send + Sync + Clone + 'static,
+{
+    lift2(move |(a, b), c| f(a, b, c), &lift2(|a, b| (a, b), ca, cb), cc)
+}
+
+/// Lift a quarternary function.
+pub fn lift4<F, A, B, C, D, Ret>(f: F, ca: &Signal<A>, cb: &Signal<B>, cc: &Signal<C>, cd: &Signal<D>)
+    -> Signal<Ret>
+where F: Fn(A, B, C, D) -> Ret + Send + Sync + 'static,
+      A: Send + Sync + Clone + 'static,
+      B: Send + Sync + Clone + 'static,
+      C: Send + Sync + Clone + 'static,
+      D: Send + Sync + Clone + 'static,
+      Ret: Send + Sync + Clone + 'static,
+{
+    lift2(
+        move |(a, b), (c, d)| f(a, b, c, d),
+        &lift2(|a, b| (a, b), ca, cb),
+        &lift2(|c, d| (c, d), cc, cd)
+    )
+}
+
+
+#[cfg(test)]
+mod test {
+    use stream::Sink;
+    use signal::Signal;
+
+    #[test]
+    fn lift0() {
+        let signal = lift!(|| 3);
+        assert_eq!(signal.sample(), 3);
+    }
+
+    #[test]
+    fn lift1() {
+        let sig2 = lift!(|n| n + 2, &Signal::new(3));
+        assert_eq!(sig2.sample(), 5);
+    }
+
+    #[test]
+    fn lift2() {
+        let sink1 = Sink::new();
+        let sink2 = Sink::new();
+        let lifted = lift!(|a, b| a + b, &sink1.stream().hold(0),
+            &sink2.stream().hold(3));
+        assert_eq!(lifted.sample(), 3);
+        sink1.send(1);
+        assert_eq!(lifted.sample(), 4);
+        sink2.send(11);
+        assert_eq!(lifted.sample(), 12);
+    }
+
+    #[test]
+    fn lift2_identical() {
+        let sig = Signal::new(16);
+        let sig2 = lift!(|a, b| a + b, &sig, &sig);
+        assert_eq!(sig2.sample(), 32);
+    }
+
+    #[test]
+    fn lift3() {
+        let sink = Sink::new();
+        assert_eq!(
+            lift!(|x, y, z| x + 2 * y + z,
+                &sink.stream().hold(5),
+                &sink.stream().hold(3),
+                &sink.stream().hold(-4)
+            ).sample(),
+            7
+        );
+    }
+
+    #[test]
+    fn lift4() {
+        let sink = Sink::new();
+        assert_eq!(
+            lift!(|w, x, y, z| 4 * w + x + 2 * y + z,
+                &sink.stream().hold(-2),
+                &sink.stream().hold(5),
+                &sink.stream().hold(3),
+                &sink.stream().hold(-4)
+            ).sample(),
+            -1
+        );
+    }
+
+    #[test]
+    fn lift0_equal_within_transaction() {
+        use rand::random;
+        // Generate a completely random signal
+        let rnd = lift!(random::<i64>);
+        // Make a tuple with itself
+        let gather = lift!(|a, b| (a, b), &rnd, &rnd);
+        // Both components should be equal
+        let (a, b) = gather.sample();
+        assert_eq!(a, b);
+    }
+}
+
+
+ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/carboxyl/pending.rs.html b/src/carboxyl/pending.rs.html new file mode 100644 index 0000000..6e0f11f --- /dev/null +++ b/src/carboxyl/pending.rs.html @@ -0,0 +1,271 @@ + + + + + + + + + + pending.rs.html -- source + + + + + + + + + + + + + + + +
 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
+30
+31
+32
+33
+34
+35
+36
+37
+38
+39
+40
+41
+42
+43
+44
+45
+46
+47
+48
+49
+50
+51
+52
+53
+54
+55
+56
+57
+58
+59
+60
+61
+62
+63
+64
+65
+66
+67
+68
+69
+70
+71
+72
+73
+74
+75
+76
+77
+78
+79
+80
+81
+82
+83
+84
+85
+86
+87
+
+//! Pending wrapper
+
+use std::ops::Deref;
+
+
+/// A pending value. This is a wrapper type that allows one to queue one new
+/// value without actually overwriting the old value. Later the most recently
+/// queued value can be updated.
+pub struct Pending<T> {
+    current: T,
+    update: Option<T>,
+}
+
+impl<T> Pending<T> {
+    /// Create a new pending value.
+    pub fn new(t: T) -> Pending<T> {
+        Pending { current: t, update: None }
+    }
+
+    /// Put an item in the queue. Ignores any previously queued items.
+    pub fn queue(&mut self, new: T) {
+        self.update = Some(new);
+    }
+
+    /// Updates any update pending.
+    pub fn update(&mut self) {
+        if let Some(t) = self.update.take() {
+            self.current = t;
+        }
+    }
+
+    /// Get the future value.
+    pub fn future(&self) -> &T {
+        self.update.as_ref().unwrap_or(&self.current)
+    }
+}
+
+impl<T> Deref for Pending<T> {
+    type Target = T;
+    fn deref(&self) -> &T { &self.current }
+}
+
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn new_derefs_identical() {
+        assert_eq!(*Pending::new(3), 3);
+    }
+
+    #[test]
+    fn queue_does_not_affect_deref() {
+        let mut p = Pending::new(2);
+        p.queue(4);
+        assert_eq!(*p, 2);
+    }
+
+    #[test]
+    fn new_future_identical() {
+        assert_eq!(*Pending::new(5).future(), 5);
+    }
+
+    #[test]
+    fn queue_affects_future() {
+        let mut p = Pending::new(10);
+        p.queue(6);
+        assert_eq!(*p.future(), 6);
+    }
+
+    #[test]
+    fn updated_deref() {
+        let mut p = Pending::new(-2);
+        p.queue(2);
+        p.update();
+        assert_eq!(*p, 2);
+    }
+
+    #[test]
+    fn updated_future() {
+        let mut p = Pending::new(-7);
+        p.queue(0);
+        p.update();
+        assert_eq!(*p.future(), 0);
+    }
+}
+
+
+ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/carboxyl/readonly.rs.html b/src/carboxyl/readonly.rs.html new file mode 100644 index 0000000..84fce7f --- /dev/null +++ b/src/carboxyl/readonly.rs.html @@ -0,0 +1,173 @@ + + + + + + + + + + readonly.rs.html -- source + + + + + + + + + + + + + + + +
 1
+ 2
+ 3
+ 4
+ 5
+ 6
+ 7
+ 8
+ 9
+10
+11
+12
+13
+14
+15
+16
+17
+18
+19
+20
+21
+22
+23
+24
+25
+26
+27
+28
+29
+30
+31
+32
+33
+34
+35
+36
+37
+38
+
+//! Thread-safe read-only smart pointer.
+
+use std::sync::{ Arc, RwLock, RwLockReadGuard };
+use std::ops::Deref;
+
+/// Guards read-access into a read-only pointer.
+pub struct ReadOnlyGuard<'a, T: 'a> {
+    guard: RwLockReadGuard<'a, T>,
+}
+
+impl<'a, T> Deref for ReadOnlyGuard<'a, T> {
+    type Target = T;
+    fn deref(&self) -> &T { &self.guard }
+}
+
+/// A thread-safe read-only smart pointer.
+pub struct ReadOnly<T> {
+    ptr: Arc<RwLock<T>>,
+}
+
+impl<T> Clone for ReadOnly<T> {
+    fn clone(&self) -> ReadOnly<T> {
+        ReadOnly { ptr: self.ptr.clone() }
+    }
+}
+
+/// Create a new read-only pointer.
+pub fn create<T>(ptr: Arc<RwLock<T>>) -> ReadOnly<T> { ReadOnly { ptr: ptr } }
+
+impl<T> ReadOnly<T> {
+    /// Gain read-access to the stored value.
+    ///
+    /// In case, the underlying data structure has been poisoned, it returns
+    /// `None`.
+    pub fn read(&self) -> Option<ReadOnlyGuard<T>> {
+        self.ptr.read().ok().map(|g| ReadOnlyGuard { guard: g })
+    }
+}
+
+
+ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/carboxyl/signal.rs.html b/src/carboxyl/signal.rs.html new file mode 100644 index 0000000..659a0a7 --- /dev/null +++ b/src/carboxyl/signal.rs.html @@ -0,0 +1,1601 @@ + + + + + + + + + + signal.rs.html -- source + + + + + + + + + + + + + + + +
  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+ 11
+ 12
+ 13
+ 14
+ 15
+ 16
+ 17
+ 18
+ 19
+ 20
+ 21
+ 22
+ 23
+ 24
+ 25
+ 26
+ 27
+ 28
+ 29
+ 30
+ 31
+ 32
+ 33
+ 34
+ 35
+ 36
+ 37
+ 38
+ 39
+ 40
+ 41
+ 42
+ 43
+ 44
+ 45
+ 46
+ 47
+ 48
+ 49
+ 50
+ 51
+ 52
+ 53
+ 54
+ 55
+ 56
+ 57
+ 58
+ 59
+ 60
+ 61
+ 62
+ 63
+ 64
+ 65
+ 66
+ 67
+ 68
+ 69
+ 70
+ 71
+ 72
+ 73
+ 74
+ 75
+ 76
+ 77
+ 78
+ 79
+ 80
+ 81
+ 82
+ 83
+ 84
+ 85
+ 86
+ 87
+ 88
+ 89
+ 90
+ 91
+ 92
+ 93
+ 94
+ 95
+ 96
+ 97
+ 98
+ 99
+100
+101
+102
+103
+104
+105
+106
+107
+108
+109
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149
+150
+151
+152
+153
+154
+155
+156
+157
+158
+159
+160
+161
+162
+163
+164
+165
+166
+167
+168
+169
+170
+171
+172
+173
+174
+175
+176
+177
+178
+179
+180
+181
+182
+183
+184
+185
+186
+187
+188
+189
+190
+191
+192
+193
+194
+195
+196
+197
+198
+199
+200
+201
+202
+203
+204
+205
+206
+207
+208
+209
+210
+211
+212
+213
+214
+215
+216
+217
+218
+219
+220
+221
+222
+223
+224
+225
+226
+227
+228
+229
+230
+231
+232
+233
+234
+235
+236
+237
+238
+239
+240
+241
+242
+243
+244
+245
+246
+247
+248
+249
+250
+251
+252
+253
+254
+255
+256
+257
+258
+259
+260
+261
+262
+263
+264
+265
+266
+267
+268
+269
+270
+271
+272
+273
+274
+275
+276
+277
+278
+279
+280
+281
+282
+283
+284
+285
+286
+287
+288
+289
+290
+291
+292
+293
+294
+295
+296
+297
+298
+299
+300
+301
+302
+303
+304
+305
+306
+307
+308
+309
+310
+311
+312
+313
+314
+315
+316
+317
+318
+319
+320
+321
+322
+323
+324
+325
+326
+327
+328
+329
+330
+331
+332
+333
+334
+335
+336
+337
+338
+339
+340
+341
+342
+343
+344
+345
+346
+347
+348
+349
+350
+351
+352
+353
+354
+355
+356
+357
+358
+359
+360
+361
+362
+363
+364
+365
+366
+367
+368
+369
+370
+371
+372
+373
+374
+375
+376
+377
+378
+379
+380
+381
+382
+383
+384
+385
+386
+387
+388
+389
+390
+391
+392
+393
+394
+395
+396
+397
+398
+399
+400
+401
+402
+403
+404
+405
+406
+407
+408
+409
+410
+411
+412
+413
+414
+415
+416
+417
+418
+419
+420
+421
+422
+423
+424
+425
+426
+427
+428
+429
+430
+431
+432
+433
+434
+435
+436
+437
+438
+439
+440
+441
+442
+443
+444
+445
+446
+447
+448
+449
+450
+451
+452
+453
+454
+455
+456
+457
+458
+459
+460
+461
+462
+463
+464
+465
+466
+467
+468
+469
+470
+471
+472
+473
+474
+475
+476
+477
+478
+479
+480
+481
+482
+483
+484
+485
+486
+487
+488
+489
+490
+491
+492
+493
+494
+495
+496
+497
+498
+499
+500
+501
+502
+503
+504
+505
+506
+507
+508
+509
+510
+511
+512
+513
+514
+515
+516
+517
+518
+519
+520
+521
+522
+523
+524
+525
+526
+527
+528
+529
+530
+531
+532
+533
+534
+535
+536
+537
+538
+539
+540
+541
+542
+543
+544
+545
+546
+547
+548
+549
+550
+551
+552
+553
+554
+555
+556
+557
+558
+559
+560
+561
+562
+563
+564
+565
+566
+567
+568
+569
+570
+571
+572
+573
+574
+575
+576
+577
+578
+579
+580
+581
+582
+583
+584
+585
+586
+587
+588
+589
+590
+591
+592
+593
+594
+595
+596
+597
+598
+599
+600
+601
+602
+603
+604
+605
+606
+607
+608
+609
+610
+611
+612
+613
+614
+615
+616
+617
+618
+619
+620
+621
+622
+623
+624
+625
+626
+627
+628
+629
+630
+631
+632
+633
+634
+635
+636
+637
+638
+639
+640
+641
+642
+643
+644
+645
+646
+647
+648
+649
+650
+651
+652
+653
+654
+655
+656
+657
+658
+659
+660
+661
+662
+663
+664
+665
+666
+667
+668
+669
+670
+671
+672
+673
+674
+675
+676
+677
+678
+679
+680
+681
+682
+683
+684
+685
+686
+687
+688
+689
+690
+691
+692
+693
+694
+695
+696
+697
+698
+699
+700
+701
+702
+703
+704
+705
+706
+707
+708
+709
+710
+711
+712
+713
+714
+715
+716
+717
+718
+719
+720
+721
+722
+723
+724
+725
+726
+727
+728
+729
+730
+731
+732
+733
+734
+735
+736
+737
+738
+739
+740
+741
+742
+743
+744
+745
+746
+747
+748
+749
+750
+751
+752
+
+//! Continuous time signals
+
+use std::sync::{ Arc, Mutex, RwLock };
+use std::ops::Deref;
+use std::fmt;
+#[cfg(test)]
+use quickcheck::{ Arbitrary, Gen };
+
+use source::{ Source, with_weak, CallbackError };
+use stream::{ self, BoxClone, Stream };
+use transaction::{ commit, end };
+use pending::Pending;
+use readonly::{ self, ReadOnly };
+use lift;
+#[cfg(test)]
+use testing::ArcFn;
+
+
+/// A functional signal. Caches its return value during a transaction.
+struct FuncSignal<A> {
+    func: Box<Fn() -> A + Send + Sync + 'static>,
+    cache: Arc<Mutex<Option<A>>>,
+}
+
+impl<A> FuncSignal<A> {
+    pub fn new<F: Fn() -> A + Send + Sync + 'static>(f: F) -> FuncSignal<A> {
+        FuncSignal {
+            func: Box::new(f),
+            cache: Arc::new(Mutex::new(None)),
+        }
+    }
+}
+
+impl<A: Clone + 'static> FuncSignal<A> {
+    /// Call the function or fetch the cached value if present.
+    pub fn call(&self) -> A {
+        let mut cached = self.cache.lock().unwrap();
+        match &mut *cached {
+            &mut Some(ref value) => value.clone(),
+            cached => {
+                // Register callback to reset cache at the end of the transaction
+                let cache = self.cache.clone();
+                end(move || {
+                    let mut live = cache.lock().unwrap();
+                    *live = None;
+                });
+                // Calculate & cache value
+                let value = (self.func)();
+                *cached = Some(value.clone());
+                value
+            },
+        }
+    }
+}
+
+
+pub enum SignalFn<A> {
+    Const(A),
+    Func(FuncSignal<A>),
+}
+
+impl<A> SignalFn<A> {
+    pub fn from_fn<F: Fn() -> A + Send + Sync + 'static>(f: F) -> SignalFn<A> {
+        SignalFn::Func(FuncSignal::new(f))
+    }
+}
+
+impl<A: Clone + 'static> SignalFn<A> {
+    pub fn call(&self) -> A {
+        match *self {
+            SignalFn::Const(ref a) => a.clone(),
+            SignalFn::Func(ref f) => f.call(),
+        }
+    }
+}
+
+
+/// Helper function to register callback handlers related to signal construction.
+pub fn reg_signal<A, B, F>(parent_source: &mut Source<A>, signal: &Signal<B>, handler: F)
+    where A: Send + Sync + 'static,
+          B: Send + Sync + 'static,
+          F: Fn(A) -> SignalFn<B> + Send + Sync + 'static,
+{
+    let weak_source = signal.source.downgrade();
+    let weak_current = signal.current.downgrade();
+    parent_source.register(move |a|
+        weak_current.upgrade().map(|cur| end(
+            move || { let _ = cur.write().map(|mut cur| cur.update()); }))
+            .ok_or(CallbackError::Disappeared)
+        .and(with_weak(&weak_current, |cur| cur.queue(handler(a))))
+        .and(with_weak(&weak_source, |src| src.send(())))
+    );
+}
+
+
+/// External helper function to build a signal.
+pub fn signal_build<A, K>(func: SignalFn<A>, keep_alive: K) -> Signal<A>
+        where K: Send + Sync + Clone + 'static
+{
+    Signal::build(func, keep_alive)
+}
+
+/// External accessor to current state of a signal.
+pub fn signal_current<A>(signal: &Signal<A>) -> &Arc<RwLock<Pending<SignalFn<A>>>> {
+    &signal.current
+}
+
+/// External accessor to signal source.
+pub fn signal_source<A>(signal: &Signal<A>) -> &Arc<RwLock<Source<()>>> {
+    &signal.source
+}
+
+/// Sample the value of the signal without committing it as a transaction.
+pub fn sample_raw<A: Clone + 'static>(signal: &Signal<A>) -> A {
+    signal.current.read().unwrap().call()
+}
+
+
+/// A continuous signal that changes over time.
+///
+/// Signals can be thought of as values that change over time. They have both a
+/// continuous and a discrete component. This means that their current value is
+/// defined by a function that can be called at any time. That function is only
+/// evaluated on-demand, when the signal's current value is sampled. (This is
+/// also called pull semantics in the literature on FRP.)
+///
+/// In addition, the current function used to sample a signal may change
+/// discretely in reaction to some event. For instance, it is possible to create
+/// a signal from an event stream, by holding the last event occurence as the
+/// current value of the stream.
+///
+/// # Algebraic laws
+///
+/// Signals come with some primitive methods to compose them with each other and
+/// with streams. Some of these primitives give the signals an algebraic
+/// structure.
+///
+/// ## Functor
+///
+/// Signals form a functor under unary lifting. Thus, the following laws hold:
+///
+/// - Preservation of identity: `lift!(|x| x, &a) == a`,
+/// - Function composition: `lift!(|x| g(f(x)), &a) == lift!(g, &lift!(f, &a))`.
+///
+/// ## Applicative functor
+///
+/// By extension, using the notion of a signal of a function, signals also
+/// become an [applicative][ghc-applicative] using `Signal::new` as `pure` and
+/// `|sf, sa| lift!(|f, a| f(a), &sf, &sa)` as `<*>`.
+///
+/// *TODO: Expand on this and replace the Haskell reference.*
+///
+/// [ghc-applicative]: https://downloads.haskell.org/~ghc/latest/docs/html/libraries/base/Control-Applicative.html
+pub struct Signal<A> {
+    current: Arc<RwLock<Pending<SignalFn<A>>>>,
+    source: Arc<RwLock<Source<()>>>,
+    #[allow(dead_code)]
+    keep_alive: Box<BoxClone>,
+}
+
+impl<A> Clone for Signal<A> {
+    fn clone(&self) -> Signal<A> {
+        Signal {
+            current: self.current.clone(),
+            source: self.source.clone(),
+            keep_alive: self.keep_alive.box_clone(),
+        }
+    }
+}
+
+impl<A> Signal<A> {
+    fn build<K>(func: SignalFn<A>, keep_alive: K) -> Signal<A>
+        where K: Send + Sync + Clone + 'static
+    {
+        Signal {
+            current: Arc::new(RwLock::new(Pending::new(func))),
+            source: Arc::new(RwLock::new(Source::new())),
+            keep_alive: Box::new(keep_alive),
+        }
+    }
+}
+
+impl<A: Clone + 'static> Signal<A> {
+    /// Create a constant signal.
+    pub fn new(a: A) -> Signal<A> {
+        Signal::build(SignalFn::Const(a), ())
+    }
+
+    /// Sample the current value of the signal.
+    pub fn sample(&self) -> A {
+        commit(|| sample_raw(self))
+    }
+}
+
+impl<A: Clone + Send + Sync + 'static> Signal<A> {
+    /// Create a signal with a cyclic definition.
+    ///
+    /// The closure gets an undefined forward-declaration of a signal. It is
+    /// supposed to return a self-referential definition of the same signal.
+    ///
+    /// Sampling the forward-declared signal, before it is properly defined,
+    /// will cause a run-time panic.
+    ///
+    /// This pattern is useful to implement accumulators, counters and other
+    /// loops that depend on the sampling behaviour of a signal before a
+    /// transaction.
+    pub fn cyclic<F>(def: F) -> Signal<A>
+        where F: FnOnce(&Signal<A>) -> Signal<A>
+    {
+        commit(|| {
+            let cycle = SignalCycle::new();
+            let finished = def(&cycle);
+            cycle.define(finished)
+        })
+    }
+
+    /// Combine the signal with a stream in a snapshot.
+    ///
+    /// `snapshot` creates a new stream given a signal and a stream. Whenever
+    /// the input stream fires an event, the output stream fires an event
+    /// created from the signal's current value and that event using the
+    /// supplied function.
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink1: Sink<i32> = Sink::new();
+    /// let sink2: Sink<f64> = Sink::new();
+    /// let mut events = sink1.stream().hold(1)
+    ///     .snapshot(&sink2.stream(), |a, b| (a, b))
+    ///     .events();
+    ///
+    /// // Updating its signal does not cause the snapshot to fire
+    /// sink1.send(4);
+    ///
+    /// // However sending an event down the stream does
+    /// sink2.send(3.0);
+    /// assert_eq!(events.next(), Some((4, 3.0)));
+    /// ```
+    pub fn snapshot<B, C, F>(&self, stream: &Stream<B>, f: F) -> Stream<C>
+        where B: Clone + Send + Sync + 'static,
+              C: Clone + Send + Sync + 'static,
+              F: Fn(A, B) -> C + Send + Sync + 'static,
+    {
+        stream::snapshot(self, stream, f)
+    }
+}
+
+impl<A: Clone + Send + Sync + 'static> Signal<Signal<A>> {
+    /// Switch between signals.
+    ///
+    /// This transforms a `Signal<Signal<A>>` into a `Signal<A>`. The nested
+    /// signal can be thought of as a representation of a switch between different
+    /// input signals, that allows one to change the structure of the dependency
+    /// graph at run-time. `switch` provides a way to access the inner value of
+    /// the currently active signal.
+    ///
+    /// The following example demonstrates how to use this to switch between two
+    /// input signals based on a `Button` event stream:
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// // Button type
+    /// #[derive(Clone, Show)]
+    /// enum Button { A, B };
+    ///
+    /// // The input sinks
+    /// let sink_a = Sink::<i32>::new();
+    /// let sink_b = Sink::<i32>::new();
+    ///
+    /// // The button sink
+    /// let sink_button = Sink::<Button>::new();
+    ///
+    /// // Create the output
+    /// let output = {
+    ///
+    ///     // Hold input sinks in a signal with some initials
+    ///     let channel_a = sink_a.stream().hold(1);
+    ///     let channel_b = sink_b.stream().hold(2);
+    ///
+    ///     // A trivial default channel used before any button event
+    ///     let default_channel = Sink::new().stream().hold(0);
+    ///
+    ///     // Map button to the channel signals, hold with the default channel as
+    ///     // initial value and switch between the signals
+    ///     sink_button
+    ///         .stream()
+    ///         .map(move |b| match b {
+    ///             Button::A => channel_a.clone(),
+    ///             Button::B => channel_b.clone(),
+    ///         })
+    ///         .hold(default_channel)
+    ///         .switch()
+    /// };
+    ///
+    /// // In the beginning, output will come from the default channel
+    /// assert_eq!(output.sample(), 0);
+    ///
+    /// // Let's switch to channel A
+    /// sink_button.send(Button::A);
+    /// assert_eq!(output.sample(), 1);
+    ///
+    /// // And to channel B
+    /// sink_button.send(Button::B);
+    /// assert_eq!(output.sample(), 2);
+    ///
+    /// // The channels can change, too, of course
+    /// for k in 4..13 {
+    ///     sink_b.send(k);
+    ///     assert_eq!(output.sample(), k);
+    /// }
+    /// sink_button.send(Button::A);
+    /// for k in 21..77 {
+    ///     sink_a.send(k);
+    ///     assert_eq!(output.sample(), k);
+    /// }
+    /// ```
+    pub fn switch(&self) -> Signal<A> {
+        fn make_callback<A>(parent: &Signal<Signal<A>>) -> SignalFn<A>
+            where A: Send + Clone + Sync + 'static,
+        {
+            // TODO: use information on inner value
+            let current_signal = parent.current.clone();
+            SignalFn::from_fn(move ||
+                sample_raw(&current_signal.read().unwrap().call())
+            )
+        }
+        commit(|| {
+            let signal = Signal::build(make_callback(self), ());
+            let parent = self.clone();
+            reg_signal(&mut self.source.write().unwrap(), &signal,
+                move |_| make_callback(&parent));
+            signal
+        })
+    }
+}
+
+#[cfg(test)]
+impl<A, B> Signal<ArcFn<A, B>>
+    where A: Clone + Send + Sync + 'static,
+          B: Clone + Send + Sync + 'static,
+{
+    /// Applicative functionality. Applies a signal of function to a signal of
+    /// its argument.
+    fn apply(&self, signal: &Signal<A>) -> Signal<B> {
+        lift::lift2(|f, a| f(a), self, signal)
+    }
+}
+
+#[cfg(test)]
+impl<A: Arbitrary + Sync + Clone + 'static> Arbitrary for Signal<A> {
+    fn arbitrary<G: Gen>(g: &mut G) -> Signal<A> {
+        let values = Vec::<A>::arbitrary(g);
+        if values.is_empty() {
+            Signal::new(Arbitrary::arbitrary(g))
+        } else {
+            let n = Mutex::new(0);
+            lift::lift0(move || {
+                let mut n = n.lock().unwrap();
+                *n += 1;
+                if *n >= values.len() { *n = 0 }
+                values[*n].clone()
+            })
+        }
+    }
+}
+
+impl<A: fmt::Debug + Clone + 'static> fmt::Debug for Signal<A> {
+    fn fmt(&self, fmt: &mut fmt::Formatter) -> Result<(), fmt::Error> {
+        commit(|| match **self.current.read().unwrap() {
+            SignalFn::Const(ref a) =>
+                fmt.debug_struct("Signal::const").field("value", &a).finish(),
+            SignalFn::Func(ref f) =>
+                fmt.debug_struct("Signal::fn").field("current", &f.call()).finish(),
+        })
+    }
+}
+
+
+/// Forward declaration of a signal to create value loops.
+struct SignalCycle<A> {
+    signal: Signal<A>,
+}
+
+impl<A: Send + Sync + Clone + 'static> SignalCycle<A> {
+    /// Forward-declare a new signal.
+    pub fn new() -> SignalCycle<A> {
+        const ERR: &'static str = "sampled on forward-declaration of signal";
+        SignalCycle { signal: Signal::build(SignalFn::from_fn(|| panic!(ERR)), ()) }
+    }
+
+    /// Provide the signal with a definition.
+    pub fn define(self, definition: Signal<A>) -> Signal<A> {
+        /// Generate a callback from the signal definition's current value.
+        fn make_callback<A>(current_def: &Arc<RwLock<Pending<SignalFn<A>>>>) -> SignalFn<A>
+            where A: Send + Sync + Clone + 'static
+        {
+            match *current_def.read().unwrap().future() {
+                SignalFn::Const(ref a) => SignalFn::Const(a.clone()),
+                SignalFn::Func(_) => SignalFn::from_fn({
+                    let sig = current_def.downgrade();
+                    move || {
+                        let strong = sig.upgrade().unwrap();
+                        let ret = strong.read().unwrap().call();
+                        ret
+                    }
+                }),
+            }
+        }
+        commit(move || {
+            *self.signal.current.write().unwrap() = Pending::new(make_callback(&definition.current));
+            let weak_parent = definition.current.downgrade();
+            reg_signal(&mut definition.source.write().unwrap(), &self.signal,
+                move |_| make_callback(&weak_parent.upgrade().unwrap()));
+            Signal { keep_alive: Box::new(definition), ..self.signal }
+        })
+    }
+}
+
+impl<A> Deref for SignalCycle<A> {
+    type Target = Signal<A>;
+    fn deref(&self) -> &Signal<A> { &self.signal }
+}
+
+
+/// Signal variant using inner mutability for efficient in-place updates.
+///
+/// This is the only kind of primitive that allows non-`Clone` types to be
+/// wrapped into functional reactive abstractions. The API is somewhat different
+/// from that of a regular signal to accommodate this.
+///
+/// One cannot directly sample a `SignalMut` as this would require a clone.
+/// Instead it comes with a couple of adaptor methods that mimick a subset of
+/// the `Signal` API. However, all functions passed to these methods take the
+/// argument coming from the `SignalMut` by reference.
+pub struct SignalMut<A> {
+    inner: Signal<ReadOnly<A>>,
+}
+
+impl<A: Send + Sync + 'static> SignalMut<A> {
+    /// Semantically the same as `Signal::snapshot`
+    ///
+    /// The key difference here is, that the combining function takes its first
+    /// argument by reference, as it can't be moved out of the `SignalMut`.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink1 = Sink::new();
+    /// let sink2 = Sink::new();
+    /// // Collect values in a mutable `Vec`
+    /// let values = sink1.stream().scan_mut(vec![], |v, a| v.push(a));
+    /// // Snapshot some value from it
+    /// let mut index = values.snapshot(&sink2.stream(),
+    ///     |v, k| v.get(k).map(|x| *x)
+    /// ).events();
+    ///
+    /// sink1.send(4);
+    /// sink1.send(5);
+    /// sink2.send(0);
+    /// assert_eq!(index.next(), Some(Some(4)));
+    ///
+    /// sink2.send(1);
+    /// assert_eq!(index.next(), Some(Some(5)));
+    ///
+    /// sink2.send(2);
+    /// assert_eq!(index.next(), Some(None));
+    /// ```
+    pub fn snapshot<B, C, F>(&self, stream: &Stream<B>, f: F) -> Stream<C>
+        where B: Clone + Send + Sync + 'static,
+              C: Clone + Send + Sync + 'static,
+              F: Fn(&A, B) -> C + Send + Sync + 'static,
+    {
+        self.inner.snapshot(stream, move |a, b| f(&a.read().unwrap(), b))
+    }
+
+    /// Similar to `lift2`. Combines a `SignalMut` with a `Signal` using a
+    /// function. The function takes its first argument by reference.
+    pub fn combine<B, C, F>(&self, signal: &Signal<B>, f: F) -> Signal<C>
+        where B: Clone + Send + Sync + 'static,
+              C: Clone + Send + Sync + 'static,
+              F: Fn(&A, B) -> C + Send + Sync + 'static,
+    {
+        lift::lift2(
+            move |a, b| f(&a.read().unwrap(), b),
+            &self.inner, &signal
+        )
+    }
+
+    /// Similar to `lift2`, but combines two `SignalMut` using a function. The
+    /// supplied function takes both arguments by reference.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink: Sink<i32> = Sink::new();
+    /// let sum = sink.stream().scan_mut(0, |sum, a| *sum += a);
+    /// let product = sink.stream().scan_mut(1, |prod, a| *prod *= a);
+    /// let combo = sum.combine_mut(&product, |s, p| (*s, *p));
+    ///
+    /// sink.send(1);
+    /// assert_eq!(combo.sample(), (1, 1));
+    ///
+    /// sink.send(3);
+    /// assert_eq!(combo.sample(), (4, 3));
+    ///
+    /// sink.send(5);
+    /// assert_eq!(combo.sample(), (9, 15));
+    /// ```
+    pub fn combine_mut<B, C, F>(&self, other: &SignalMut<B>, f: F) -> Signal<C>
+        where B: Clone + Send + Sync + 'static,
+              C: Clone + Send + Sync + 'static,
+              F: Fn(&A, &B) -> C + Send + Sync + 'static,
+    {
+        lift::lift2(
+            move |a, b| f(&a.read().unwrap(), &b.read().unwrap()),
+            &self.inner, &other.inner
+        )
+    }
+}
+
+
+/// Same as Stream::hold.
+pub fn hold<A>(initial: A, stream: &Stream<A>) -> Signal<A>
+    where A: Send + Sync + 'static,
+{
+    commit(|| {
+        let signal = Signal::build(SignalFn::Const(initial), stream.clone());
+        reg_signal(&mut stream::source(&stream).write().unwrap(), &signal, SignalFn::Const);
+        signal
+    })
+}
+
+
+/// Same as Stream::scan_mut.
+pub fn scan_mut<A, B, F>(stream: &Stream<A>, initial: B, f: F) -> SignalMut<B>
+    where A: Send + Sync + 'static,
+          B: Send + Sync + 'static,
+          F: Fn(&mut B, A) + Send + Sync + 'static,
+{
+    commit(move || {
+        let state = Arc::new(RwLock::new(initial));
+        let signal = Signal::build(SignalFn::Const(readonly::create(state.clone())), stream.clone());
+        reg_signal(&mut stream::source(&stream).write().unwrap(), &signal,
+            move |a| { f(&mut state.write().unwrap(), a); SignalFn::Const(readonly::create(state.clone())) });
+        SignalMut { inner: signal }
+    })
+}
+
+
+#[cfg(test)]
+mod test {
+    use quickcheck::quickcheck;
+
+    use ::stream::Sink;
+    use ::signal::{ self, Signal, SignalCycle };
+    use ::lift::lift1;
+    use ::testing::{ ArcFn, signal_eq, id, pure_fn, partial_comp };
+
+    #[test]
+    fn functor_identity() {
+        fn check(signal: Signal<i32>) -> bool {
+            let eq = signal_eq(&signal, &lift1(id, &signal));
+            (0..10).all(|_| eq.sample())
+        }
+        quickcheck(check as fn(Signal<i32>) -> bool);
+    }
+
+    #[test]
+    fn functor_composition() {
+        fn check(signal: Signal<i32>) -> bool {
+            fn f(n: i32) -> i32 { 3 * n }
+            fn g(n: i32) -> i32 { n + 2 }
+            let eq = signal_eq(
+                &lift1(|n| f(g(n)), &signal),
+                &lift1(f, &lift1(g, &signal))
+            );
+            (0..10).all(|_| eq.sample())
+        }
+        quickcheck(check as fn(Signal<i32>) -> bool);
+    }
+
+    #[test]
+    fn applicative_identity() {
+        fn check(signal: Signal<i32>) -> bool {
+            let eq = signal_eq(&pure_fn(id).apply(&signal), &signal);
+            (0..10).all(|_| eq.sample())
+        }
+        quickcheck(check as fn(Signal<i32>) -> bool);
+    }
+
+    #[test]
+    fn applicative_composition() {
+        fn check(signal: Signal<i32>) -> bool {
+            fn f(n: i32) -> i32 { n * 4 }
+            fn g(n: i32) -> i32 { n - 3 }
+            let u = pure_fn(f);
+            let v = pure_fn(g);
+            let eq = signal_eq(
+                &pure_fn(partial_comp).apply(&u).apply(&v).apply(&signal),
+                &u.apply(&v.apply(&signal))
+            );
+            (0..10).all(|_| eq.sample())
+        }
+        quickcheck(check as fn(Signal<i32>) -> bool);
+    }
+
+    #[test]
+    fn applicative_homomorphism() {
+        fn check(x: i32) -> bool {
+            fn f(x: i32) -> i32 { x * (-5) }
+            let eq = signal_eq(
+                &pure_fn(f).apply(&Signal::new(x)),
+                &Signal::new(f(x))
+            );
+            (0..10).all(|_| eq.sample())
+        }
+        quickcheck(check as fn(i32) -> bool);
+    }
+
+    #[test]
+    fn applicative_interchange() {
+        fn check(x: i32) -> bool {
+            fn f(x: i32) -> i32 { x * 2 - 7 }
+            let u = pure_fn(f);
+            let eq = signal_eq(
+                &u.apply(&Signal::new(x)),
+                &pure_fn(move |f: ArcFn<i32, i32>| f(x)).apply(&u)
+            );
+            (0..10).all(|_| eq.sample())
+        }
+        quickcheck(check as fn(i32) -> bool);
+    }
+
+    #[test]
+    fn clone() {
+        let b = Signal::new(3);
+        assert_eq!(b.clone().sample(), 3);
+    }
+
+    #[test]
+    fn hold() {
+        let sink = Sink::new();
+        let signal = sink.stream().hold(3);
+        assert_eq!(signal.sample(), 3);
+        sink.send(4);
+        assert_eq!(signal.sample(), 4);
+    }
+
+    #[test]
+    fn hold_implicit_stream() {
+        let sink = Sink::new();
+        let signal = signal::hold(0, &sink.stream().map(|n| 2 * n));
+        assert_eq!(signal.sample(), 0);
+        sink.send(4);
+        assert_eq!(signal.sample(), 8);
+    }
+
+    #[test]
+    fn snapshot() {
+        let sink1: Sink<i32> = Sink::new();
+        let sink2: Sink<f64> = Sink::new();
+        let mut snap_events = sink1.stream().hold(1)
+            .snapshot(&sink2.stream().map(|x| x + 3.0), |a, b| (a, b))
+            .events();
+        sink2.send(4.0);
+        assert_eq!(snap_events.next(), Some((1, 7.0)));
+    }
+
+    #[test]
+    fn snapshot_2() {
+        let ev1 = Sink::new();
+        let beh1 = ev1.stream().hold(5);
+        let ev2 = Sink::new();
+        let snap = beh1.snapshot(&ev2.stream(), |a, b| (a, b));
+        let mut events = snap.events();
+        ev2.send(4);
+        assert_eq!(events.next(), Some((5, 4)));
+        ev1.send(-2);
+        ev2.send(6);
+        assert_eq!(events.next(), Some((-2, 6)));
+    }
+
+    #[test]
+    fn cyclic_snapshot_accum() {
+        let sink = Sink::new();
+        let stream = sink.stream();
+        let accum = SignalCycle::new();
+        let def = accum.snapshot(&stream, |a, s| a + s).hold(0);
+        let accum = accum.define(def);
+        assert_eq!(accum.sample(), 0);
+        sink.send(3);
+        assert_eq!(accum.sample(), 3);
+        sink.send(7);
+        assert_eq!(accum.sample(), 10);
+        sink.send(-21);
+        assert_eq!(accum.sample(), -11);
+    }
+
+    #[test]
+    fn snapshot_order_standard() {
+        let sink = Sink::new();
+        let signal = sink.stream().hold(0);
+        let mut events = signal
+            .snapshot(&sink.stream(), |a, b| (a, b))
+            .events();
+        sink.send(1);
+        assert_eq!(events.next(), Some((0, 1)));
+    }
+
+    #[test]
+    fn snapshot_lift_order_standard() {
+        let sink = Sink::new();
+        let signal = sink.stream().hold(0);
+        let mut events = lift1(|x| x, &signal)
+            .snapshot(&sink.stream(), |a, b| (a, b))
+            .events();
+        sink.send(1);
+        assert_eq!(events.next(), Some((0, 1)));
+    }
+
+    #[test]
+    fn snapshot_order_alternative() {
+        let sink = Sink::new();
+        // Invert the "natural" order of the registry by declaring the stream before
+        // the signal, which are both used by the snapshot.
+        let first = sink.stream().map(|x| x);
+        let signal = sink.stream().hold(0);
+        let mut events = signal.snapshot(&first, |a, b| (a, b)).events();
+        sink.send(1);
+        assert_eq!(events.next(), Some((0, 1)));
+    }
+
+    #[test]
+    fn cyclic_signal_intermediate() {
+        let sink = Sink::new();
+        let stream = sink.stream();
+        let mut snap = None;
+        let sum = Signal::cyclic(|a| {
+            let my_snap = a.snapshot(&stream, |a, e| e + a);
+            snap = Some(my_snap.clone());
+            my_snap.hold(0)
+        });
+        let snap = snap.unwrap();
+        let mut events = snap.events();
+
+        sink.send(3);
+        assert_eq!(sum.sample(), 3);
+        assert_eq!(events.next(), Some(3));
+    }
+}
+
+
+ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/carboxyl/source.rs.html b/src/carboxyl/source.rs.html new file mode 100644 index 0000000..5aab27b --- /dev/null +++ b/src/carboxyl/source.rs.html @@ -0,0 +1,359 @@ + + + + + + + + + + source.rs.html -- source + + + + + + + + + + + + + + + +
  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+ 11
+ 12
+ 13
+ 14
+ 15
+ 16
+ 17
+ 18
+ 19
+ 20
+ 21
+ 22
+ 23
+ 24
+ 25
+ 26
+ 27
+ 28
+ 29
+ 30
+ 31
+ 32
+ 33
+ 34
+ 35
+ 36
+ 37
+ 38
+ 39
+ 40
+ 41
+ 42
+ 43
+ 44
+ 45
+ 46
+ 47
+ 48
+ 49
+ 50
+ 51
+ 52
+ 53
+ 54
+ 55
+ 56
+ 57
+ 58
+ 59
+ 60
+ 61
+ 62
+ 63
+ 64
+ 65
+ 66
+ 67
+ 68
+ 69
+ 70
+ 71
+ 72
+ 73
+ 74
+ 75
+ 76
+ 77
+ 78
+ 79
+ 80
+ 81
+ 82
+ 83
+ 84
+ 85
+ 86
+ 87
+ 88
+ 89
+ 90
+ 91
+ 92
+ 93
+ 94
+ 95
+ 96
+ 97
+ 98
+ 99
+100
+101
+102
+103
+104
+105
+106
+107
+108
+109
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+
+//! Event sources and callbacks.
+//!
+//! This is a light-weight implementation of the observer pattern. Subjects are
+//! modelled as the `Source` type and observers as boxed closures.
+
+use std::sync::{ RwLock, Weak };
+
+/// An error that can occur with a weakly referenced callback.
+#[derive(PartialEq, Eq, Clone, Copy, Debug)]
+pub enum CallbackError {
+    Disappeared,
+    Poisoned,
+}
+
+/// Shorthand for common callback results.
+pub type CallbackResult<T=()> = Result<T, CallbackError>;
+
+/// A boxed callback.
+type Callback<A> = Box<FnMut(A) -> CallbackResult + Send + Sync + 'static>;
+
+
+/// Perform some callback on a weak reference to a mutex and handle errors
+/// gracefully.
+pub fn with_weak<T, U, F: FnOnce(&mut T) -> U>(weak: &Weak<RwLock<T>>, f: F) -> CallbackResult<U> {
+    weak.upgrade()
+        .ok_or(CallbackError::Disappeared)
+        .and_then(|mutex| mutex.write()
+            .map(|mut t| f(&mut t))
+            .map_err(|_| CallbackError::Poisoned)
+        )
+}
+
+
+/// An event source.
+pub struct Source<A> {
+    callbacks: Vec<Callback<A>>,
+}
+
+impl<A> Source<A> {
+    /// Create a new source.
+    pub fn new() -> Source<A> {
+        Source { callbacks: vec![] }
+    }
+
+    /// Register a callback. The callback will be a mutable closure that takes
+    /// an event and must return a result. To unsubscribe from further events,
+    /// the callback has to return an error.
+    pub fn register<F>(&mut self, callback: F)
+        where F: FnMut(A) -> CallbackResult + Send + Sync + 'static
+    {
+        self.callbacks.push(Box::new(callback));
+    }
+}
+
+impl<A: Send + Sync + Clone + 'static> Source<A> {
+    /// Make the source send an event to all its observers.
+    pub fn send(&mut self, a: A) {
+        use std::mem;
+        let mut new_callbacks = vec!();
+        mem::swap(&mut new_callbacks, &mut self.callbacks);
+        self.callbacks = new_callbacks
+            .into_iter()
+            .filter_map(|mut callback| {
+                let result = callback(a.clone());
+                match result {
+                    Ok(_) => Some(callback),
+                    Err(_) => None,
+                }
+            })
+            .collect();
+    }
+}
+
+
+#[cfg(test)]
+mod test {
+    use std::sync::{ Arc, RwLock };
+    use std::thread;
+    use super::*;
+
+    #[test]
+    fn with_weak_no_error() {
+        let a = Arc::new(RwLock::new(3));
+        let weak = a.downgrade();
+        assert_eq!(with_weak(&weak, |a| { *a = 4; }), Ok(()));
+        assert_eq!(*a.read().unwrap(), 4);
+    }
+
+    #[test]
+    fn with_weak_disappeared() {
+        let weak = Arc::new(RwLock::new(3)).downgrade();
+        assert_eq!(with_weak(&weak, |_| ()), Err(CallbackError::Disappeared));
+    }
+
+    #[test]
+    fn with_weak_poisoned() {
+        let a = Arc::new(RwLock::new(3));
+        let a2 = a.clone();
+        let weak = a.downgrade();
+        let _ = thread::spawn(move || {
+            let _g = a2.write().unwrap();
+            panic!();
+        }).join();
+        assert_eq!(with_weak(&weak, |_| ()), Err(CallbackError::Poisoned));
+    }
+
+    #[test]
+    fn source_register_and_send() {
+        let mut src = Source::new();
+        let a = Arc::new(RwLock::new(3));
+        {
+            let a = a.clone();
+            src.register(move |x| {
+                *a.write().unwrap() = x;
+                Ok(())
+            });
+        }
+        assert_eq!(src.callbacks.len(), 1);
+        src.send(4);
+        assert_eq!(*a.read().unwrap(), 4);
+    }
+
+    #[test]
+    fn source_unregister() {
+        let mut src = Source::new();
+        src.register(|_| Err(CallbackError::Disappeared));
+        assert_eq!(src.callbacks.len(), 1);
+        src.send(());
+        assert_eq!(src.callbacks.len(), 0);
+    }
+}
+
+
+ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/carboxyl/stream.rs.html b/src/carboxyl/stream.rs.html new file mode 100644 index 0000000..8290275 --- /dev/null +++ b/src/carboxyl/stream.rs.html @@ -0,0 +1,1645 @@ + + + + + + + + + + stream.rs.html -- source + + + + + + + + + + + + + + + +
  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+ 11
+ 12
+ 13
+ 14
+ 15
+ 16
+ 17
+ 18
+ 19
+ 20
+ 21
+ 22
+ 23
+ 24
+ 25
+ 26
+ 27
+ 28
+ 29
+ 30
+ 31
+ 32
+ 33
+ 34
+ 35
+ 36
+ 37
+ 38
+ 39
+ 40
+ 41
+ 42
+ 43
+ 44
+ 45
+ 46
+ 47
+ 48
+ 49
+ 50
+ 51
+ 52
+ 53
+ 54
+ 55
+ 56
+ 57
+ 58
+ 59
+ 60
+ 61
+ 62
+ 63
+ 64
+ 65
+ 66
+ 67
+ 68
+ 69
+ 70
+ 71
+ 72
+ 73
+ 74
+ 75
+ 76
+ 77
+ 78
+ 79
+ 80
+ 81
+ 82
+ 83
+ 84
+ 85
+ 86
+ 87
+ 88
+ 89
+ 90
+ 91
+ 92
+ 93
+ 94
+ 95
+ 96
+ 97
+ 98
+ 99
+100
+101
+102
+103
+104
+105
+106
+107
+108
+109
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149
+150
+151
+152
+153
+154
+155
+156
+157
+158
+159
+160
+161
+162
+163
+164
+165
+166
+167
+168
+169
+170
+171
+172
+173
+174
+175
+176
+177
+178
+179
+180
+181
+182
+183
+184
+185
+186
+187
+188
+189
+190
+191
+192
+193
+194
+195
+196
+197
+198
+199
+200
+201
+202
+203
+204
+205
+206
+207
+208
+209
+210
+211
+212
+213
+214
+215
+216
+217
+218
+219
+220
+221
+222
+223
+224
+225
+226
+227
+228
+229
+230
+231
+232
+233
+234
+235
+236
+237
+238
+239
+240
+241
+242
+243
+244
+245
+246
+247
+248
+249
+250
+251
+252
+253
+254
+255
+256
+257
+258
+259
+260
+261
+262
+263
+264
+265
+266
+267
+268
+269
+270
+271
+272
+273
+274
+275
+276
+277
+278
+279
+280
+281
+282
+283
+284
+285
+286
+287
+288
+289
+290
+291
+292
+293
+294
+295
+296
+297
+298
+299
+300
+301
+302
+303
+304
+305
+306
+307
+308
+309
+310
+311
+312
+313
+314
+315
+316
+317
+318
+319
+320
+321
+322
+323
+324
+325
+326
+327
+328
+329
+330
+331
+332
+333
+334
+335
+336
+337
+338
+339
+340
+341
+342
+343
+344
+345
+346
+347
+348
+349
+350
+351
+352
+353
+354
+355
+356
+357
+358
+359
+360
+361
+362
+363
+364
+365
+366
+367
+368
+369
+370
+371
+372
+373
+374
+375
+376
+377
+378
+379
+380
+381
+382
+383
+384
+385
+386
+387
+388
+389
+390
+391
+392
+393
+394
+395
+396
+397
+398
+399
+400
+401
+402
+403
+404
+405
+406
+407
+408
+409
+410
+411
+412
+413
+414
+415
+416
+417
+418
+419
+420
+421
+422
+423
+424
+425
+426
+427
+428
+429
+430
+431
+432
+433
+434
+435
+436
+437
+438
+439
+440
+441
+442
+443
+444
+445
+446
+447
+448
+449
+450
+451
+452
+453
+454
+455
+456
+457
+458
+459
+460
+461
+462
+463
+464
+465
+466
+467
+468
+469
+470
+471
+472
+473
+474
+475
+476
+477
+478
+479
+480
+481
+482
+483
+484
+485
+486
+487
+488
+489
+490
+491
+492
+493
+494
+495
+496
+497
+498
+499
+500
+501
+502
+503
+504
+505
+506
+507
+508
+509
+510
+511
+512
+513
+514
+515
+516
+517
+518
+519
+520
+521
+522
+523
+524
+525
+526
+527
+528
+529
+530
+531
+532
+533
+534
+535
+536
+537
+538
+539
+540
+541
+542
+543
+544
+545
+546
+547
+548
+549
+550
+551
+552
+553
+554
+555
+556
+557
+558
+559
+560
+561
+562
+563
+564
+565
+566
+567
+568
+569
+570
+571
+572
+573
+574
+575
+576
+577
+578
+579
+580
+581
+582
+583
+584
+585
+586
+587
+588
+589
+590
+591
+592
+593
+594
+595
+596
+597
+598
+599
+600
+601
+602
+603
+604
+605
+606
+607
+608
+609
+610
+611
+612
+613
+614
+615
+616
+617
+618
+619
+620
+621
+622
+623
+624
+625
+626
+627
+628
+629
+630
+631
+632
+633
+634
+635
+636
+637
+638
+639
+640
+641
+642
+643
+644
+645
+646
+647
+648
+649
+650
+651
+652
+653
+654
+655
+656
+657
+658
+659
+660
+661
+662
+663
+664
+665
+666
+667
+668
+669
+670
+671
+672
+673
+674
+675
+676
+677
+678
+679
+680
+681
+682
+683
+684
+685
+686
+687
+688
+689
+690
+691
+692
+693
+694
+695
+696
+697
+698
+699
+700
+701
+702
+703
+704
+705
+706
+707
+708
+709
+710
+711
+712
+713
+714
+715
+716
+717
+718
+719
+720
+721
+722
+723
+724
+725
+726
+727
+728
+729
+730
+731
+732
+733
+734
+735
+736
+737
+738
+739
+740
+741
+742
+743
+744
+745
+746
+747
+748
+749
+750
+751
+752
+753
+754
+755
+756
+757
+758
+759
+760
+761
+762
+763
+764
+765
+766
+767
+768
+769
+770
+771
+772
+773
+774
+
+//! Streams of discrete events
+
+use std::sync::{ Arc, RwLock, Mutex, Weak };
+use std::sync::mpsc::{ Receiver, channel };
+use std::thread;
+use source::{ Source, CallbackError, CallbackResult, with_weak };
+use signal::{ self, Signal, SignalMut, sample_raw };
+use transaction::{ commit, later };
+
+
+/// An event sink.
+///
+/// This primitive is a way of generating streams of events. One can send
+/// input values into a sink and generate a stream that fires all these inputs
+/// as events:
+///
+/// ```
+/// # use carboxyl::Sink;
+/// // A new sink
+/// let sink = Sink::new();
+///
+/// // Make an iterator over a stream.
+/// let mut events = sink.stream().events();
+///
+/// // Send a value into the sink
+/// sink.send(5);
+///
+/// // The stream
+/// assert_eq!(events.next(), Some(5));
+/// ```
+///
+/// You can also feed a sink with an iterator:
+///
+/// ```
+/// # use carboxyl::Sink;
+/// # let sink = Sink::new();
+/// # let mut events = sink.stream().events();
+/// sink.feed(20..40);
+/// assert_eq!(events.take(4).collect::<Vec<_>>(), vec![20, 21, 22, 23]);
+/// ```
+///
+/// # Asynchronous calls
+///
+/// It is possible to send events into the sink asynchronously using the methods
+/// `send_async` and `feed_async`. Note though, that this will void some
+/// guarantees on the order of events. In the following example, it is unclear,
+/// which event is the first in the stream:
+///
+/// ```
+/// # use carboxyl::Sink;
+/// let sink = Sink::new();
+/// let mut events = sink.stream().events();
+/// sink.send_async(13);
+/// sink.send_async(22);
+/// let first = events.next().unwrap();
+/// assert!(first == 13 || first == 22);
+/// ```
+///
+/// `feed_async` provides a workaround, as it preserves the order of events from
+/// the iterator. However, any event sent into the sink after a call to it, may
+/// come at any point between the iterator events.
+pub struct Sink<A> {
+    source: Arc<RwLock<Source<A>>>,
+}
+
+impl<A> Clone for Sink<A> {
+    fn clone(&self) -> Sink<A> {
+        Sink { source: self.source.clone() }
+    }
+}
+
+impl<A: Send + Sync> Sink<A> {
+    /// Create a new sink.
+    pub fn new() -> Sink<A> {
+        Sink { source: Arc::new(RwLock::new(Source::new())) }
+    }
+
+    /// Generate a stream that fires all events sent into the sink.
+    pub fn stream(&self) -> Stream<A> {
+        Stream { source: self.source.clone(), keep_alive: Box::new(()), }
+    }
+}
+
+impl<A: Send + Sync + Clone + 'static> Sink<A> {
+    /// Asynchronous send.
+    ///
+    /// Same as `send`, but it spawns a new thread to process the updates to
+    /// dependent streams and signals.
+    pub fn send_async(&self, a: A) {
+        let clone = self.clone();
+        thread::spawn(move || clone.send(a));
+    }
+
+    /// Feed values from an iterator into the sink.
+    ///
+    /// This method feeds events into the sink from an iterator.
+    pub fn feed<I: IntoIterator<Item=A>>(&self, iterator: I) {
+        for event in iterator {
+            self.send(event);
+        }
+    }
+
+    /// Asynchronous feed.
+    ///
+    /// This is the same as `feed`, but it does not block, since it spawns the
+    /// feeding as a new task. This is useful, if the provided iterator is large
+    /// or even infinite (e.g. an I/O event loop).
+    pub fn feed_async<I: IntoIterator<Item=A> + Send + 'static>(&self, iterator: I) {
+        let clone = self.clone();
+        thread::spawn(move || clone.feed(iterator));
+    }
+
+    /// Send a value into the sink.
+    ///
+    /// When a value is sent into the sink, an event is fired in all dependent
+    /// streams.
+    pub fn send(&self, a: A) {
+        commit(|| self.source.write().unwrap().send(a))
+    }
+}
+
+
+/// Trait to wrap cloning of boxed values in a object-safe manner
+pub trait BoxClone: Sync + Send {
+    /// Clone the object as a boxed trait object
+    fn box_clone(&self) -> Box<BoxClone>; 
+}
+
+impl<T: Sync + Send + Clone + 'static> BoxClone for T {
+    fn box_clone(&self) -> Box<BoxClone> {
+        Box::new(self.clone())
+    }
+}
+
+
+/// Access a stream's source.
+///
+/// This is not defined as a method, so that it can be public to other modules
+/// in this crate while being private outside the crate.
+pub fn source<A>(stream: &Stream<A>) -> &Arc<RwLock<Source<A>>> {
+    &stream.source
+}
+
+
+/// A stream of events.
+///
+/// Conceptually a stream can be thought of as a series of discrete events that
+/// occur at specific times. They are ordered by a transaction system. This
+/// means that firings of disjoint events can not interfere with each other. The
+/// consequences of one event are atomically reflected in dependent quantities.
+///
+/// Streams provide a number of primitive operations. These can be used to
+/// compose streams and combine them with signals. For instance, streams can be
+/// mapped over with a function, merged with another stream of the same type or
+/// filtered by some predicate.
+///
+/// # Algebraic laws
+///
+/// Furthermore, streams satisfy certain algebraic properties that are useful to
+/// reason about them.
+///
+/// ## Monoid
+///
+/// For once, streams of the same type form a **monoid** under merging. The
+/// neutral element in this context is `Stream::never()`. So the following laws
+/// always hold for streams `a`, `b` and `c` of the same type:
+///
+/// - Left identity: `Stream::never().merge(&a) == a`,
+/// - Right identity: `a.merge(&Stream::never()) == a`,
+/// - Associativity: `a.merge(&b).merge(&c) == a.merge(&b.merge(&c))`.
+///
+/// *Note that equality in this context is not actually implemented as such,
+/// since comparing two (potentially infinite) streams is a prohibitive
+/// operation. Instead, the expressions above can be used interchangably and
+/// behave identically.*
+///
+/// ## Functor
+///
+/// Under the mapping operation streams also become a functor. A functor is a
+/// generic type like `Stream` with some mapping operation that takes a function
+/// `Fn(A) -> B` to map a `Stream<A>` to a `Stream<B>`. Algebraically it
+/// satisfies the following laws:
+///
+/// - The identity function is preserved: `a.map(|x| x) == a`,
+/// - Function composition is respected: `a.map(f).map(g) == a.map(|x| g(f(x)))`.
+pub struct Stream<A> {
+    source: Arc<RwLock<Source<A>>>,
+    #[allow(dead_code)]
+    keep_alive: Box<BoxClone>,
+}
+
+impl<A> Clone for Stream<A> {
+    fn clone(&self) -> Stream<A> {
+        Stream {
+            source: self.source.clone(),
+            keep_alive: self.keep_alive.box_clone(),
+        }
+    }
+}
+
+impl<A: Clone + Send + Sync + 'static> Stream<A> {
+    /// Create a stream that never fires. This can be useful in certain
+    /// situations, where a stream is logically required, but no events are
+    /// expected.
+    pub fn never() -> Stream<A> {
+        Stream {
+            source: Arc::new(RwLock::new(Source::new())),
+            keep_alive: Box::new(()) 
+        }
+    }
+
+    /// Map the stream to another stream using a function.
+    ///
+    /// `map` applies a function to every event fired in this stream to create a
+    /// new stream of type `B`.
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink: Sink<i32> = Sink::new();
+    /// let mut events = sink.stream().map(|x| x + 4).events();
+    /// sink.send(3);
+    /// assert_eq!(events.next(), Some(7));
+    /// ```
+    pub fn map<B, F>(&self, f: F) -> Stream<B>
+        where B: Send + Sync + Clone + 'static,
+              F: Fn(A) -> B + Send + Sync + 'static,
+    {
+        commit(|| {
+            let src = Arc::new(RwLock::new(Source::new()));
+            let weak = src.downgrade();
+            self.source.write().unwrap()
+                .register(move |a| with_weak(&weak, |src| src.send(f(a))));
+            Stream {
+                source: src,
+                keep_alive: Box::new(self.clone()),
+            }
+        })
+    }
+
+    /// Filter a stream according to a predicate.
+    ///
+    /// `filter` creates a new stream that only fires those events from the
+    /// original stream that satisfy the predicate.
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink: Sink<i32> = Sink::new();
+    /// let mut events = sink.stream()
+    ///     .filter(|&x| (x >= 4) && (x <= 10))
+    ///     .events();
+    /// sink.send(2); // won't arrive
+    /// sink.send(5); // will arrive
+    /// assert_eq!(events.next(), Some(5));
+    /// ```
+    pub fn filter<F>(&self, f: F) -> Stream<A>
+        where F: Fn(&A) -> bool + Send + Sync + 'static,
+    {
+        self.filter_map(move |a| if f(&a) { Some(a) } else { None })
+    }
+
+    /// Both filter and map a stream.
+    ///
+    /// This is equivalent to `.map(f).filter_some()`.
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink = Sink::new();
+    /// let mut events = sink.stream()
+    ///     .filter_map(|i| if i > 3 { Some(i + 2) } else { None })
+    ///     .events();
+    /// sink.send(2);
+    /// sink.send(4);
+    /// assert_eq!(events.next(), Some(6));
+    /// ```
+    pub fn filter_map<B, F>(&self, f: F) -> Stream<B>
+        where B: Send + Sync + Clone + 'static,
+              F: Fn(A) -> Option<B> + Send + Sync + 'static,
+    {
+        self.map(f).filter_some()
+    }
+
+    /// Merge with another stream.
+    ///
+    /// `merge` takes two streams and creates a new stream that fires events
+    /// from both input streams.
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink_1 = Sink::<i32>::new();
+    /// let sink_2 = Sink::<i32>::new();
+    /// let mut events = sink_1.stream().merge(&sink_2.stream()).events();
+    /// sink_1.send(2);
+    /// assert_eq!(events.next(), Some(2));
+    /// sink_2.send(4);
+    /// assert_eq!(events.next(), Some(4));
+    /// ```
+    pub fn merge(&self, other: &Stream<A>) -> Stream<A> {
+        commit(|| {
+            let src = Arc::new(RwLock::new(Source::new()));
+            for parent in [self, other].iter() {
+                let weak = src.downgrade();
+                parent.source.write().unwrap()
+                    .register(move |a| with_weak(&weak, |src| src.send(a)));
+            }
+            Stream {
+                source: src,
+                keep_alive: Box::new((self.clone(), other.clone())),
+            }
+        })
+    }
+
+    /// Coalesce multiple event firings within the same transaction into a
+    /// single event.
+    ///
+    /// The function should ideally commute, as the order of events within a
+    /// transaction is not well-defined.
+    pub fn coalesce<F>(&self, f: F) -> Stream<A>
+        where F: Fn(A, A) -> A + Send + Sync + 'static,
+    {
+        commit(|| {
+            let src = Arc::new(RwLock::new(Source::new()));
+            let weak = src.downgrade();
+            self.source.write().unwrap().register({
+                let mutex = Arc::new(Mutex::new(None));
+                move |a| {
+                    let mut inner = mutex.lock().unwrap();
+                    *inner = Some(match inner.take() {
+                        Some(b) => f(a, b),
+                        None => a,
+                    });
+                    // Send the updated value later
+                    later({
+                        let mutex = mutex.clone();
+                        let weak = weak.clone();
+                        move || {
+                            let mut inner = mutex.lock().unwrap();
+                            // Take it out and map, so that it does not happen twice
+                            inner.take().map(|value|
+                                with_weak(&weak, |src| src.send(value))
+                            );
+                        }
+                    });
+                    Ok(())
+                }
+            });
+            Stream { source: src, keep_alive: Box::new(self.clone()) }
+        })
+    }
+
+    /// Hold an event in a signal.
+    ///
+    /// The resulting signal `hold`s the value of the last event fired by the
+    /// stream.
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink = Sink::new();
+    /// let signal = sink.stream().hold(0);
+    /// assert_eq!(signal.sample(), 0);
+    /// sink.send(2);
+    /// assert_eq!(signal.sample(), 2);
+    /// ```
+    pub fn hold(&self, initial: A) -> Signal<A> {
+        signal::hold(initial, self)
+    }
+
+    /// A blocking iterator over the stream.
+    pub fn events(&self) -> Events<A> { Events::new(self) }
+
+    /// Scan a stream and accumulate its event firings in a signal.
+    ///
+    /// Starting at some initial value, each new event changes the value of the
+    /// resulting signal as prescribed by the supplied function.
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink = Sink::new();
+    /// let sum = sink.stream().scan(0, |a, b| a + b);
+    /// assert_eq!(sum.sample(), 0);
+    /// sink.send(2);
+    /// assert_eq!(sum.sample(), 2);
+    /// sink.send(4);
+    /// assert_eq!(sum.sample(), 6);
+    /// ```
+    pub fn scan<B, F>(&self, initial: B, f: F) -> Signal<B>
+        where B: Send + Sync + Clone + 'static,
+              F: Fn(B, A) -> B + Send + Sync + 'static,
+    {
+        Signal::cyclic(|scan| scan.snapshot(self, f).hold(initial))
+    }
+
+    /// Scan a stream and accumulate its event firings in some mutable state.
+    ///
+    /// Semantically this is equivalent to `scan`. However, it allows one to use
+    /// a non-Clone type as an accumulator and update it with efficient in-place
+    /// operations.
+    ///
+    /// The resulting `SignalMut` does have a slightly different API from a
+    /// regular `Signal` as it does not allow clones.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use carboxyl::{ Sink, Signal };
+    /// let sink: Sink<i32> = Sink::new();
+    /// let sum = sink.stream()
+    ///     .scan_mut(0, |sum, a| *sum += a)
+    ///     .combine(&Signal::new(()), |sum, ()| *sum);
+    /// assert_eq!(sum.sample(), 0);
+    /// sink.send(2);
+    /// assert_eq!(sum.sample(), 2);
+    /// sink.send(4);
+    /// assert_eq!(sum.sample(), 6);
+    /// ```
+    pub fn scan_mut<B, F>(&self, initial: B, f: F) -> SignalMut<B>
+        where B: Send + Sync + 'static,
+              F: Fn(&mut B, A) + Send + Sync + 'static,
+    {
+        signal::scan_mut(self, initial, f)
+    }
+}
+
+impl<A: Clone + Send + Sync + 'static> Stream<Option<A>> {
+    /// Filter a stream of options.
+    ///
+    /// `filter_some` creates a new stream that only fires the unwrapped
+    /// `Some(…)` events from the original stream omitting any `None` events.
+    ///
+    /// ```
+    /// # use carboxyl::Sink;
+    /// let sink = Sink::new();
+    /// let mut events = sink.stream().filter_some().events();
+    /// sink.send(None); // won't arrive
+    /// sink.send(Some(5)); // will arrive
+    /// assert_eq!(events.next(), Some(5));
+    /// ```
+    pub fn filter_some(&self) -> Stream<A> {
+        commit(|| {
+            let src = Arc::new(RwLock::new(Source::new()));
+            let weak = src.downgrade();
+            self.source.write().unwrap()
+                .register(move |a| a.map_or(
+                    Ok(()),
+                    |a| with_weak(&weak, |src| src.send(a))
+                ));
+            Stream {
+                source: src,
+                keep_alive: Box::new(self.clone())
+            }
+        })
+    }
+}
+
+impl<A: Send + Sync + Clone + 'static> Stream<Stream<A>> {
+    /// Switch between streams.
+    ///
+    /// This takes a stream of streams and maps it to a new stream, which fires
+    /// all events from the most recent stream fired into it.
+    ///
+    /// # Example
+    ///
+    /// ```
+    /// # use carboxyl::{ Sink, Stream };
+    /// // Create sinks
+    /// let stream_sink: Sink<Stream<i32>> = Sink::new();
+    /// let sink1: Sink<i32> = Sink::new();
+    /// let sink2: Sink<i32> = Sink::new();
+    ///
+    /// // Switch and listen
+    /// let switched = stream_sink.stream().switch();
+    /// let mut events = switched.events();
+    ///
+    /// // Should not receive events from either sink
+    /// sink1.send(1); sink2.send(2);
+    ///
+    /// // Now switch to sink 2
+    /// stream_sink.send(sink2.stream());
+    /// sink1.send(3); sink2.send(4);
+    /// assert_eq!(events.next(), Some(4));
+    ///
+    /// // And then to sink 1
+    /// stream_sink.send(sink1.stream());
+    /// sink1.send(5); sink2.send(6);
+    /// assert_eq!(events.next(), Some(5));
+    /// ```
+    pub fn switch(&self) -> Stream<A> {
+        fn rewire_callbacks<A>(new_stream: Stream<A>, source: Weak<RwLock<Source<A>>>,
+                               terminate: &mut Arc<()>)
+            -> CallbackResult
+            where A: Send + Sync + Clone + 'static,
+        {
+            *terminate = Arc::new(());
+            let weak = terminate.downgrade();
+            new_stream.source.write().unwrap().register(move |a|
+                weak.upgrade()
+                    .ok_or(CallbackError::Disappeared)
+                    .and_then(|_| with_weak(&source, |src| src.send(a)))
+            );
+            Ok(())
+        }
+        commit(|| {
+            let src = Arc::new(RwLock::new(Source::new()));
+            let weak = src.downgrade();
+            self.source.write().unwrap().register({
+                let mut terminate = Arc::new(());
+                move |stream| rewire_callbacks(stream, weak.clone(), &mut terminate)
+            });
+            Stream {
+                source: src,
+                keep_alive: Box::new(self.clone()),
+            }
+        })
+    }
+}
+
+
+/// Make a snapshot of a signal, whenever a stream fires an event.
+pub fn snapshot<A, B, C, F>(signal: &Signal<A>, stream: &Stream<B>, f: F) -> Stream<C>
+    where A: Clone + Send + Sync + 'static,
+          B: Clone + Send + Sync + 'static,
+          C: Clone + Send + Sync + 'static,
+          F: Fn(A, B) -> C + Send + Sync + 'static,
+{
+    commit(|| {
+        let src = Arc::new(RwLock::new(Source::new()));
+        let weak = src.downgrade();
+        stream.source.write().unwrap().register({
+            let signal = signal.clone();
+            move |b| with_weak(&weak, |src| src.send(f(sample_raw(&signal), b)))
+        });
+        Stream {
+            source: src,
+            keep_alive: Box::new((stream.clone(), signal.clone())),
+        }
+    })
+}
+
+
+/// A blocking iterator over events in a stream.
+pub struct Events<A> {
+    receiver: Receiver<A>,
+    #[allow(dead_code)]
+    keep_alive: Box<BoxClone>,
+}
+
+impl<A: Send + Sync + 'static> Events<A> {
+    /// Create a new events iterator.
+    fn new(stream: &Stream<A>) -> Events<A> {
+        commit(|| {
+            let (tx, rx) = channel();
+            let tx = Mutex::new(tx);
+            stream.source.write().unwrap().register(
+                move |a| tx.lock().unwrap().send(a).map_err(|_| CallbackError::Disappeared)
+            );
+            Events {
+                receiver: rx,
+                keep_alive: Box::new(stream.clone()),
+            }
+        })
+    }
+}
+
+impl<A: Send + Sync + 'static> Iterator for Events<A> {
+    type Item = A;
+    fn next(&mut self) -> Option<A> { self.receiver.recv().ok() }
+}
+
+
+#[cfg(test)]
+mod test {
+    use std::thread;
+    use quickcheck::quickcheck;
+
+    use testing::{ id, stream_eq };
+    use super::*;
+
+    #[test]
+    fn sink() {
+        let sink = Sink::new();
+        let mut events = sink.stream().events();
+        sink.send(1);
+        sink.send(2);
+        assert_eq!(events.next(), Some(1));
+        assert_eq!(events.next(), Some(2));
+    }
+
+    #[test]
+    fn map() {
+        let sink = Sink::new();
+        let triple = sink.stream().map(|x| 3 * x);
+        let mut events = triple.events();
+        sink.send(1);
+        assert_eq!(events.next(), Some(3));
+    }
+
+    #[test]
+    fn filter_some() {
+        let sink = Sink::new();
+        let small = sink.stream().filter_some();
+        let mut events = small.events();
+        sink.send(None);
+        sink.send(Some(9));
+        assert_eq!(events.next(), Some(9));
+    }
+
+    #[test]
+    fn chain_1() {
+        let sink: Sink<i32> = Sink::new();
+        let chain = sink.stream()
+            .map(|x| x / 2)
+            .filter(|&x| x < 3);
+        let mut events = chain.events();
+        sink.send(7);
+        sink.send(4);
+        assert_eq!(events.next(), Some(2));
+    }
+
+    #[test]
+    fn merge() {
+        let sink1 = Sink::new();
+        let sink2 = Sink::new();
+        let mut events = sink1.stream().merge(&sink2.stream()).events();
+        sink1.send(12);
+        sink2.send(9);
+        assert_eq!(events.next(), Some(12));
+        assert_eq!(events.next(), Some(9));
+    }
+
+    #[test]
+    fn chain_2() {
+        let sink1: Sink<i32> = Sink::new();
+        let sink2: Sink<i32> = Sink::new();
+        let mut events = sink1.stream().map(|x| x + 4)
+            .merge(
+                &sink2.stream()
+                .filter_map(|x| if x < 4 { Some(x) } else { None })
+                .map(|x| x * 5))
+            .events();
+        sink1.send(12);
+        sink2.send(3);
+        assert_eq!(events.next(), Some(16));
+        assert_eq!(events.next(), Some(15));
+    }
+
+    #[test]
+    fn move_closure() {
+        let sink = Sink::<i32>::new();
+        let x = 3;
+        sink.stream().map(move |y| y + x);
+    }
+
+    #[test]
+    fn scan_race_condition() {
+        let sink = Sink::new();
+        // Feed the sink in the background
+        sink.feed_async(0..100000);
+        // Try it multiple times to increase failure probability, when a data
+        // race can potentially happen.
+        for _ in 0..10 {
+            let _sum = sink.stream().scan(0, |a, b| a + b);
+        }
+    }
+
+    #[test]
+    fn sink_send_async() {
+        let sink = Sink::new();
+        let mut events = sink.stream().events();
+        sink.send_async(1);
+        assert_eq!(events.next(), Some(1));
+    }
+
+    #[test]
+    fn sink_feed() {
+        let sink = Sink::new();
+        let events = sink.stream().events();
+        sink.feed(0..10);
+        for (n, m) in events.take(10).enumerate() {
+            assert_eq!(n as i32, m);
+        }
+    }
+
+    #[test]
+    fn sink_feed_async() {
+        let sink = Sink::new();
+        let events = sink.stream().events();
+        sink.feed_async(0..10);
+        for (n, m) in events.take(10).enumerate() {
+            assert_eq!(n as i32, m);
+        }
+    }
+
+    #[test]
+    fn coalesce() {
+        let sink = Sink::new();
+        let stream = sink.stream()
+            .merge(&sink.stream())
+            .coalesce(|a, b| a + b);
+        let mut events = stream.events();
+
+        sink.send(1);
+        assert_eq!(events.next(), Some(2));
+    }
+
+    #[test]
+    fn monoid_left_identity() {
+        fn check(input: Vec<i32>) -> Result<bool, String> {
+            let sink = Sink::new();
+            let a = sink.stream();
+            let eq = stream_eq(&Stream::never().merge(&a), &a);
+            sink.feed(input.into_iter());
+            eq.sample()
+        }
+        quickcheck(check as fn(Vec<i32>) -> Result<bool, String>);
+    }
+
+    #[test]
+    fn monoid_right_identity() {
+        fn check(input: Vec<i32>) -> Result<bool, String> {
+            let sink = Sink::new();
+            let a = sink.stream();
+            let eq = stream_eq(&a.merge(&Stream::never()), &a);
+            sink.feed(input.into_iter());
+            eq.sample()
+        }
+        quickcheck(check as fn(Vec<i32>) -> Result<bool, String>);
+    }
+
+    #[test]
+    fn monoid_associative() {
+        fn check(input_a: Vec<i32>, input_b: Vec<i32>, input_c: Vec<i32>) -> Result<bool, String> {
+            let sink_a = Sink::new();
+            let sink_b = Sink::new();
+            let sink_c = Sink::new();
+            let a = sink_a.stream();
+            let b = sink_b.stream();
+            let c = sink_c.stream();
+            let eq = stream_eq(&a.merge(&b.merge(&c)), &a.merge(&b).merge(&c));
+            /* feed in parallel */ {
+                let _g1 = thread::scoped(|| sink_a.feed(input_a.into_iter()));
+                let _g2 = thread::scoped(|| sink_b.feed(input_b.into_iter()));
+                let _g3 = thread::scoped(|| sink_c.feed(input_c.into_iter()));
+            }
+            eq.sample()
+        }
+        quickcheck(check as fn(Vec<i32>, Vec<i32>, Vec<i32>) -> Result<bool, String>);
+    }
+
+    #[test]
+    fn functor_identity() {
+        fn check(input: Vec<i32>) -> Result<bool, String> {
+            let sink = Sink::new();
+            let a = sink.stream();
+            let eq = stream_eq(&a.map(id), &a);
+            sink.feed(input.into_iter());
+            eq.sample()
+        }
+        quickcheck(check as fn(Vec<i32>) -> Result<bool, String>);
+    }
+
+    #[test]
+    fn functor_composition() {
+        fn check(input: Vec<i32>) -> Result<bool, String> {
+            fn f(n: i32) -> i64 { (n + 3) as i64 }
+            fn g(n: i64) -> f64 { n as f64 / 2.5 }
+
+            let sink = Sink::new();
+            let a = sink.stream();
+            let eq = stream_eq(&a.map(f).map(g), &a.map(|n| g(f(n))));
+            sink.feed(input.into_iter());
+            eq.sample()
+        }
+        quickcheck(check as fn(Vec<i32>) -> Result<bool, String>);
+    }
+}
+
+
+ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/carboxyl/transaction.rs.html b/src/carboxyl/transaction.rs.html new file mode 100644 index 0000000..e9cc7b6 --- /dev/null +++ b/src/carboxyl/transaction.rs.html @@ -0,0 +1,459 @@ + + + + + + + + + + transaction.rs.html -- source + + + + + + + + + + + + + + + +
  1
+  2
+  3
+  4
+  5
+  6
+  7
+  8
+  9
+ 10
+ 11
+ 12
+ 13
+ 14
+ 15
+ 16
+ 17
+ 18
+ 19
+ 20
+ 21
+ 22
+ 23
+ 24
+ 25
+ 26
+ 27
+ 28
+ 29
+ 30
+ 31
+ 32
+ 33
+ 34
+ 35
+ 36
+ 37
+ 38
+ 39
+ 40
+ 41
+ 42
+ 43
+ 44
+ 45
+ 46
+ 47
+ 48
+ 49
+ 50
+ 51
+ 52
+ 53
+ 54
+ 55
+ 56
+ 57
+ 58
+ 59
+ 60
+ 61
+ 62
+ 63
+ 64
+ 65
+ 66
+ 67
+ 68
+ 69
+ 70
+ 71
+ 72
+ 73
+ 74
+ 75
+ 76
+ 77
+ 78
+ 79
+ 80
+ 81
+ 82
+ 83
+ 84
+ 85
+ 86
+ 87
+ 88
+ 89
+ 90
+ 91
+ 92
+ 93
+ 94
+ 95
+ 96
+ 97
+ 98
+ 99
+100
+101
+102
+103
+104
+105
+106
+107
+108
+109
+110
+111
+112
+113
+114
+115
+116
+117
+118
+119
+120
+121
+122
+123
+124
+125
+126
+127
+128
+129
+130
+131
+132
+133
+134
+135
+136
+137
+138
+139
+140
+141
+142
+143
+144
+145
+146
+147
+148
+149
+150
+151
+152
+153
+154
+155
+156
+157
+158
+159
+160
+161
+162
+163
+164
+165
+166
+167
+168
+169
+170
+171
+172
+173
+174
+175
+176
+177
+178
+179
+180
+181
+
+//! A trivial global lock transaction system.
+//!
+//! At the moment, this is really just a global static mutex, that needs to be
+//! locked, to ensure the atomicity of a transaction.
+
+use std::sync::Mutex;
+use std::cell::RefCell;
+use std::boxed::FnBox;
+
+
+/// The global transaction lock.
+///
+/// TODO: revert this to use a static mutex, as soon as that is stabilized in
+/// the standard library.
+lazy_static! {
+    static ref TRANSACTION_MUTEX: Mutex<()> = Mutex::new(());
+}
+
+/// Registry for callbacks to be executed at the end of a transaction.
+thread_local!(
+    static CURRENT_TRANSACTION: RefCell<Option<Transaction>> =
+        RefCell::new(None)
+);
+
+
+/// A callback.
+type Callback = Box<FnBox() + 'static>;
+
+
+/// A transaction.
+pub struct Transaction {
+    intermediate: Vec<Callback>,
+    finalizers: Vec<Callback>,
+}
+
+impl Transaction {
+    /// Create a new transaction
+    fn new() -> Transaction {
+        Transaction {
+            intermediate: vec![],
+            finalizers: vec![],
+        }
+    }
+
+    /// Add a callback that will be called, when the transaction is done
+    /// except for finalizers.
+    pub fn later<F: FnOnce() + 'static>(&mut self, callback: F) {
+        self.intermediate.push(Box::new(callback));
+    }
+
+    /// Add a finalizing callback. This should not have far reaching
+    /// side-effects, and in particular not commit by itself. Typical operations
+    /// for a finalizer are executing queued state updates.
+    pub fn end<F: FnOnce() + 'static>(&mut self, callback: F) {
+        self.finalizers.push(Box::new(callback));
+    }
+
+    /// Advance transactions by moving out intermediate stage callbacks.
+    fn advance(&mut self) -> Vec<Callback> {
+        use std::mem;
+        let mut intermediate = vec![];
+        mem::swap(&mut intermediate, &mut self.intermediate);
+        intermediate
+    }
+
+    /// Finalize the transaction
+    fn finalize(self) {
+        for finalizer in self.finalizers {
+            finalizer.call_box(());
+        }
+    }
+}
+
+
+/// Commit a transaction.
+///
+/// If the thread is not running any transactions currently, the global lock is
+/// acquired. Otherwise a new transaction begins, since given the interface of
+/// this module it is safely assumed that the lock is already held.
+pub fn commit<A, F: FnOnce() -> A>(body: F) -> A {
+    use std::mem;
+    // Begin a new transaction
+    let mut prev = CURRENT_TRANSACTION.with(|current| {
+        let mut prev = Some(Transaction::new());
+        mem::swap(&mut prev, &mut current.borrow_mut());
+        prev
+    });
+    // Acquire global lock if necessary
+    let _lock = match prev {
+        None => Some(TRANSACTION_MUTEX.lock().ok()
+                .expect("global transaction mutex poisoned")
+        ),
+        Some(_) => None,
+    };
+    // Perform the main body of the transaction
+    let result = body();
+    // Advance the transaction as long as necessary
+    loop {
+        let callbacks = with_current(Transaction::advance);
+        if callbacks.is_empty() { break }
+        for callback in callbacks {
+            callback.call_box(());
+        }
+    }
+    // Call all finalizers and drop the transaction
+    CURRENT_TRANSACTION.with(|current|
+        mem::swap(&mut prev, &mut current.borrow_mut())
+    );
+    prev.unwrap().finalize();
+    // Return
+    result
+}
+
+
+/// Register a callback during a transaction.
+pub fn with_current<A, F: FnOnce(&mut Transaction) -> A>(action: F) -> A {
+    CURRENT_TRANSACTION.with(|current|
+        match &mut *current.borrow_mut() {
+            &mut Some(ref mut trans) => action(trans),
+            _ => panic!("there is no active transaction to register a callback"),
+        }
+    )
+}
+
+pub fn later<F: FnOnce() + 'static>(action: F) {
+    with_current(|c| c.later(action))
+}
+
+pub fn end<F: FnOnce() + 'static>(action: F) {
+    with_current(|c| c.end(action))
+}
+
+
+#[cfg(test)]
+mod test {
+    use super::*;
+
+    #[test]
+    fn commit_single() {
+        let mut v = 3;
+        commit(|| v += 5);
+        assert_eq!(v, 8);
+    }
+
+    #[test]
+    fn commit_nested() {
+        let mut v = 3;
+        commit(|| {
+            commit(|| v *= 2);
+            v += 4;
+        });
+        assert_eq!(v, 10);
+    }
+
+    #[test]
+    fn commits_parallel() {
+        use std::sync::{Arc, Mutex};
+        use std::thread;
+
+        // Set up a ref-counted value
+        let v = Arc::new(Mutex::new(3));
+        // Spawn a couple of scoped threads performing atomic operations on it
+        let guards: Vec<_> = (0..3)
+            .map(|_| {
+                let v = v.clone();
+                thread::spawn(move || commit(move || {
+                    // Acquire locks independently, s.t. commit atomicity does
+                    // not rely on the local locks here
+                    *v.lock().unwrap() *= 2;
+                    // …and sleep for a bit
+                    thread::sleep_ms(1);
+                    *v.lock().unwrap() -= 1;
+                }))
+            })
+            .collect();
+        // Rejoin with all guards
+        for guard in guards { guard.join().ok().expect("thread failed"); }
+        // Check result
+        assert_eq!(&*v.lock().unwrap(), &17);
+    }
+}
+
+
+ + + + + + + + + + + + + + + \ No newline at end of file -- cgit v1.2.3