Struct carboxyl::Stream
[−]
[src]
pub struct Stream<A> { // some fields omitted }
A stream of events.
Conceptually a stream can be thought of as a series of discrete events that occur at specific times. They are ordered by a transaction system. This means that firings of disjoint events can not interfere with each other. The consequences of one event are atomically reflected in dependent quantities.
Streams provide a number of primitive operations. These can be used to compose streams and combine them with signals. For instance, streams can be mapped over with a function, merged with another stream of the same type or filtered by some predicate.
Algebraic laws
Furthermore, streams satisfy certain algebraic properties that are useful to reason about them.
Monoid
For once, streams of the same type form a monoid under merging. The
neutral element in this context is Stream::never()
. So the following laws
always hold for streams a
, b
and c
of the same type:
- Left identity:
Stream::never().merge(&a) == a
, - Right identity:
a.merge(&Stream::never()) == a
, - Associativity:
a.merge(&b).merge(&c) == a.merge(&b.merge(&c))
.
Note that equality in this context is not actually implemented as such, since comparing two (potentially infinite) streams is a prohibitive operation. Instead, the expressions above can be used interchangably and behave identically.
Functor
Under the mapping operation streams also become a functor. A functor is a
generic type like Stream
with some mapping operation that takes a function
Fn(A) -> B
to map a Stream<A>
to a Stream<B>
. Algebraically it
satisfies the following laws:
- The identity function is preserved:
a.map(|x| x) == a
, - Function composition is respected:
a.map(f).map(g) == a.map(|x| g(f(x)))
.
Methods
impl<A: Clone + Send + Sync + 'static> Stream<A>
fn never() -> Stream<A>
Create a stream that never fires. This can be useful in certain situations, where a stream is logically required, but no events are expected.
fn map<B, F>(&self, f: F) -> Stream<B> where B: Send + Sync + Clone + 'static, F: Fn(A) -> B + Send + Sync + 'static
Map the stream to another stream using a function.
map
applies a function to every event fired in this stream to create a
new stream of type B
.
let sink: Sink<i32> = Sink::new(); let mut events = sink.stream().map(|x| x + 4).events(); sink.send(3); assert_eq!(events.next(), Some(7));
fn filter<F>(&self, f: F) -> Stream<A> where F: Fn(&A) -> bool + Send + Sync + 'static
Filter a stream according to a predicate.
filter
creates a new stream that only fires those events from the
original stream that satisfy the predicate.
let sink: Sink<i32> = Sink::new(); let mut events = sink.stream() .filter(|&x| (x >= 4) && (x <= 10)) .events(); sink.send(2); // won't arrive sink.send(5); // will arrive assert_eq!(events.next(), Some(5));
fn filter_map<B, F>(&self, f: F) -> Stream<B> where B: Send + Sync + Clone + 'static, F: Fn(A) -> Option<B> + Send + Sync + 'static
Both filter and map a stream.
This is equivalent to .map(f).filter_some()
.
let sink = Sink::new(); let mut events = sink.stream() .filter_map(|i| if i > 3 { Some(i + 2) } else { None }) .events(); sink.send(2); sink.send(4); assert_eq!(events.next(), Some(6));
fn merge(&self, other: &Stream<A>) -> Stream<A>
Merge with another stream.
merge
takes two streams and creates a new stream that fires events
from both input streams.
let sink_1 = Sink::<i32>::new(); let sink_2 = Sink::<i32>::new(); let mut events = sink_1.stream().merge(&sink_2.stream()).events(); sink_1.send(2); assert_eq!(events.next(), Some(2)); sink_2.send(4); assert_eq!(events.next(), Some(4));
fn coalesce<F>(&self, f: F) -> Stream<A> where F: Fn(A, A) -> A + Send + Sync + 'static
Coalesce multiple event firings within the same transaction into a single event.
The function should ideally commute, as the order of events within a transaction is not well-defined.
fn hold(&self, initial: A) -> Signal<A>
Hold an event in a signal.
The resulting signal hold
s the value of the last event fired by the
stream.
let sink = Sink::new(); let signal = sink.stream().hold(0); assert_eq!(signal.sample(), 0); sink.send(2); assert_eq!(signal.sample(), 2);
fn events(&self) -> Events<A>
A blocking iterator over the stream.
fn scan<B, F>(&self, initial: B, f: F) -> Signal<B> where B: Send + Sync + Clone + 'static, F: Fn(B, A) -> B + Send + Sync + 'static
Scan a stream and accumulate its event firings in a signal.
Starting at some initial value, each new event changes the value of the resulting signal as prescribed by the supplied function.
let sink = Sink::new(); let sum = sink.stream().scan(0, |a, b| a + b); assert_eq!(sum.sample(), 0); sink.send(2); assert_eq!(sum.sample(), 2); sink.send(4); assert_eq!(sum.sample(), 6);
fn scan_mut<B, F>(&self, initial: B, f: F) -> SignalMut<B> where B: Send + Sync + 'static, F: Fn(&mut B, A) + Send + Sync + 'static
Scan a stream and accumulate its event firings in some mutable state.
Semantically this is equivalent to scan
. However, it allows one to use
a non-Clone type as an accumulator and update it with efficient in-place
operations.
The resulting SignalMut
does have a slightly different API from a
regular Signal
as it does not allow clones.
Example
let sink: Sink<i32> = Sink::new(); let sum = sink.stream() .scan_mut(0, |sum, a| *sum += a) .combine(&Signal::new(()), |sum, ()| *sum); assert_eq!(sum.sample(), 0); sink.send(2); assert_eq!(sum.sample(), 2); sink.send(4); assert_eq!(sum.sample(), 6);
impl<A: Clone + Send + Sync + 'static> Stream<Option<A>>
fn filter_some(&self) -> Stream<A>
Filter a stream of options.
filter_some
creates a new stream that only fires the unwrapped
Some(…)
events from the original stream omitting any None
events.
let sink = Sink::new(); let mut events = sink.stream().filter_some().events(); sink.send(None); // won't arrive sink.send(Some(5)); // will arrive assert_eq!(events.next(), Some(5));
impl<A: Send + Sync + Clone + 'static> Stream<Stream<A>>
fn switch(&self) -> Stream<A>
Switch between streams.
This takes a stream of streams and maps it to a new stream, which fires all events from the most recent stream fired into it.
Example
// Create sinks let stream_sink: Sink<Stream<i32>> = Sink::new(); let sink1: Sink<i32> = Sink::new(); let sink2: Sink<i32> = Sink::new(); // Switch and listen let switched = stream_sink.stream().switch(); let mut events = switched.events(); // Should not receive events from either sink sink1.send(1); sink2.send(2); // Now switch to sink 2 stream_sink.send(sink2.stream()); sink1.send(3); sink2.send(4); assert_eq!(events.next(), Some(4)); // And then to sink 1 stream_sink.send(sink1.stream()); sink1.send(5); sink2.send(6); assert_eq!(events.next(), Some(5));