Struct carboxyl::Stream
+
+ [−]
+
+ [src]
+pub struct Stream<A> { + // some fields omitted +}
A stream of events.
+ +Conceptually a stream can be thought of as a series of discrete events that +occur at specific times. They are ordered by a transaction system. This +means that firings of disjoint events can not interfere with each other. The +consequences of one event are atomically reflected in dependent quantities.
+ +Streams provide a number of primitive operations. These can be used to +compose streams and combine them with signals. For instance, streams can be +mapped over with a function, merged with another stream of the same type or +filtered by some predicate.
+ +Algebraic laws
+Furthermore, streams satisfy certain algebraic properties that are useful to +reason about them.
+ +Monoid
+For once, streams of the same type form a monoid under merging. The
+neutral element in this context is Stream::never()
. So the following laws
+always hold for streams a
, b
and c
of the same type:
-
+
- Left identity:
Stream::never().merge(&a) == a
,
+ - Right identity:
a.merge(&Stream::never()) == a
,
+ - Associativity:
a.merge(&b).merge(&c) == a.merge(&b.merge(&c))
.
+
Note that equality in this context is not actually implemented as such, +since comparing two (potentially infinite) streams is a prohibitive +operation. Instead, the expressions above can be used interchangably and +behave identically.
+ +Functor
+Under the mapping operation streams also become a functor. A functor is a
+generic type like Stream
with some mapping operation that takes a function
+Fn(A) -> B
to map a Stream<A>
to a Stream<B>
. Algebraically it
+satisfies the following laws:
-
+
- The identity function is preserved:
a.map(|x| x) == a
,
+ - Function composition is respected:
a.map(f).map(g) == a.map(|x| g(f(x)))
.
+
Methods
impl<A: Clone + Send + Sync + 'static> Stream<A>
fn never() -> Stream<A>
+Create a stream that never fires. This can be useful in certain +situations, where a stream is logically required, but no events are +expected.
+fn map<B, F>(&self, f: F) -> Stream<B> where B: Send + Sync + Clone + 'static, F: Fn(A) -> B + Send + Sync + 'static
+Map the stream to another stream using a function.
+ +map
applies a function to every event fired in this stream to create a
+new stream of type B
.
+let sink: Sink<i32> = Sink::new(); +let mut events = sink.stream().map(|x| x + 4).events(); +sink.send(3); +assert_eq!(events.next(), Some(7)); ++
fn filter<F>(&self, f: F) -> Stream<A> where F: Fn(&A) -> bool + Send + Sync + 'static
+Filter a stream according to a predicate.
+ +filter
creates a new stream that only fires those events from the
+original stream that satisfy the predicate.
+let sink: Sink<i32> = Sink::new(); +let mut events = sink.stream() + .filter(|&x| (x >= 4) && (x <= 10)) + .events(); +sink.send(2); // won't arrive +sink.send(5); // will arrive +assert_eq!(events.next(), Some(5)); ++
fn filter_map<B, F>(&self, f: F) -> Stream<B> where B: Send + Sync + Clone + 'static, F: Fn(A) -> Option<B> + Send + Sync + 'static
+Both filter and map a stream.
+ +This is equivalent to .map(f).filter_some()
.
+let sink = Sink::new(); +let mut events = sink.stream() + .filter_map(|i| if i > 3 { Some(i + 2) } else { None }) + .events(); +sink.send(2); +sink.send(4); +assert_eq!(events.next(), Some(6)); ++
fn merge(&self, other: &Stream<A>) -> Stream<A>
+Merge with another stream.
+ +merge
takes two streams and creates a new stream that fires events
+from both input streams.
+let sink_1 = Sink::<i32>::new(); +let sink_2 = Sink::<i32>::new(); +let mut events = sink_1.stream().merge(&sink_2.stream()).events(); +sink_1.send(2); +assert_eq!(events.next(), Some(2)); +sink_2.send(4); +assert_eq!(events.next(), Some(4)); ++
fn coalesce<F>(&self, f: F) -> Stream<A> where F: Fn(A, A) -> A + Send + Sync + 'static
+Coalesce multiple event firings within the same transaction into a +single event.
+ +The function should ideally commute, as the order of events within a +transaction is not well-defined.
+fn hold(&self, initial: A) -> Signal<A>
+Hold an event in a signal.
+ +The resulting signal hold
s the value of the last event fired by the
+stream.
+let sink = Sink::new(); +let signal = sink.stream().hold(0); +assert_eq!(signal.sample(), 0); +sink.send(2); +assert_eq!(signal.sample(), 2); ++
fn events(&self) -> Events<A>
+A blocking iterator over the stream.
+fn scan<B, F>(&self, initial: B, f: F) -> Signal<B> where B: Send + Sync + Clone + 'static, F: Fn(B, A) -> B + Send + Sync + 'static
+Scan a stream and accumulate its event firings in a signal.
+ +Starting at some initial value, each new event changes the value of the +resulting signal as prescribed by the supplied function.
++let sink = Sink::new(); +let sum = sink.stream().scan(0, |a, b| a + b); +assert_eq!(sum.sample(), 0); +sink.send(2); +assert_eq!(sum.sample(), 2); +sink.send(4); +assert_eq!(sum.sample(), 6); ++
fn scan_mut<B, F>(&self, initial: B, f: F) -> SignalMut<B> where B: Send + Sync + 'static, F: Fn(&mut B, A) + Send + Sync + 'static
+Scan a stream and accumulate its event firings in some mutable state.
+ +Semantically this is equivalent to scan
. However, it allows one to use
+a non-Clone type as an accumulator and update it with efficient in-place
+operations.
The resulting SignalMut
does have a slightly different API from a
+regular Signal
as it does not allow clones.
Example
+let sink: Sink<i32> = Sink::new(); +let sum = sink.stream() + .scan_mut(0, |sum, a| *sum += a) + .combine(&Signal::new(()), |sum, ()| *sum); +assert_eq!(sum.sample(), 0); +sink.send(2); +assert_eq!(sum.sample(), 2); +sink.send(4); +assert_eq!(sum.sample(), 6); ++
impl<A: Clone + Send + Sync + 'static> Stream<Option<A>>
fn filter_some(&self) -> Stream<A>
+Filter a stream of options.
+ +filter_some
creates a new stream that only fires the unwrapped
+Some(…)
events from the original stream omitting any None
events.
+let sink = Sink::new(); +let mut events = sink.stream().filter_some().events(); +sink.send(None); // won't arrive +sink.send(Some(5)); // will arrive +assert_eq!(events.next(), Some(5)); ++
impl<A: Send + Sync + Clone + 'static> Stream<Stream<A>>
fn switch(&self) -> Stream<A>
+Switch between streams.
+ +This takes a stream of streams and maps it to a new stream, which fires +all events from the most recent stream fired into it.
+ +Example
+// Create sinks +let stream_sink: Sink<Stream<i32>> = Sink::new(); +let sink1: Sink<i32> = Sink::new(); +let sink2: Sink<i32> = Sink::new(); + +// Switch and listen +let switched = stream_sink.stream().switch(); +let mut events = switched.events(); + +// Should not receive events from either sink +sink1.send(1); sink2.send(2); + +// Now switch to sink 2 +stream_sink.send(sink2.stream()); +sink1.send(3); sink2.send(4); +assert_eq!(events.next(), Some(4)); + +// And then to sink 1 +stream_sink.send(sink1.stream()); +sink1.send(5); sink2.send(6); +assert_eq!(events.next(), Some(5)); ++