Struct carboxyl::Stream [] [src]

pub struct Stream<A> {
    // some fields omitted
}

A stream of events.

Conceptually a stream can be thought of as a series of discrete events that occur at specific times. They are ordered by a transaction system. This means that firings of disjoint events can not interfere with each other. The consequences of one event are atomically reflected in dependent quantities.

Streams provide a number of primitive operations. These can be used to compose streams and combine them with signals. For instance, streams can be mapped over with a function, merged with another stream of the same type or filtered by some predicate.

Algebraic laws

Furthermore, streams satisfy certain algebraic properties that are useful to reason about them.

Monoid

For once, streams of the same type form a monoid under merging. The neutral element in this context is Stream::never(). So the following laws always hold for streams a, b and c of the same type:

Note that equality in this context is not actually implemented as such, since comparing two (potentially infinite) streams is a prohibitive operation. Instead, the expressions above can be used interchangably and behave identically.

Functor

Under the mapping operation streams also become a functor. A functor is a generic type like Stream with some mapping operation that takes a function Fn(A) -> B to map a Stream<A> to a Stream<B>. Algebraically it satisfies the following laws:

Methods

impl<A: Clone + Send + Sync + 'static> Stream<A>

fn never() -> Stream<A>

Create a stream that never fires. This can be useful in certain situations, where a stream is logically required, but no events are expected.

fn map<B, F>(&self, f: F) -> Stream<B> where B: Send + Sync + Clone + 'static, F: Fn(A) -> B + Send + Sync + 'static

Map the stream to another stream using a function.

map applies a function to every event fired in this stream to create a new stream of type B.

let sink: Sink<i32> = Sink::new();
let mut events = sink.stream().map(|x| x + 4).events();
sink.send(3);
assert_eq!(events.next(), Some(7));

fn filter<F>(&self, f: F) -> Stream<A> where F: Fn(&A) -> bool + Send + Sync + 'static

Filter a stream according to a predicate.

filter creates a new stream that only fires those events from the original stream that satisfy the predicate.

let sink: Sink<i32> = Sink::new();
let mut events = sink.stream()
    .filter(|&x| (x >= 4) && (x <= 10))
    .events();
sink.send(2); // won't arrive
sink.send(5); // will arrive
assert_eq!(events.next(), Some(5));

fn filter_map<B, F>(&self, f: F) -> Stream<B> where B: Send + Sync + Clone + 'static, F: Fn(A) -> Option<B> + Send + Sync + 'static

Both filter and map a stream.

This is equivalent to .map(f).filter_some().

let sink = Sink::new();
let mut events = sink.stream()
    .filter_map(|i| if i > 3 { Some(i + 2) } else { None })
    .events();
sink.send(2);
sink.send(4);
assert_eq!(events.next(), Some(6));

fn merge(&self, other: &Stream<A>) -> Stream<A>

Merge with another stream.

merge takes two streams and creates a new stream that fires events from both input streams.

let sink_1 = Sink::<i32>::new();
let sink_2 = Sink::<i32>::new();
let mut events = sink_1.stream().merge(&sink_2.stream()).events();
sink_1.send(2);
assert_eq!(events.next(), Some(2));
sink_2.send(4);
assert_eq!(events.next(), Some(4));

fn coalesce<F>(&self, f: F) -> Stream<A> where F: Fn(A, A) -> A + Send + Sync + 'static

Coalesce multiple event firings within the same transaction into a single event.

The function should ideally commute, as the order of events within a transaction is not well-defined.

fn hold(&self, initial: A) -> Signal<A>

Hold an event in a signal.

The resulting signal holds the value of the last event fired by the stream.

let sink = Sink::new();
let signal = sink.stream().hold(0);
assert_eq!(signal.sample(), 0);
sink.send(2);
assert_eq!(signal.sample(), 2);

fn events(&self) -> Events<A>

A blocking iterator over the stream.

fn scan<B, F>(&self, initial: B, f: F) -> Signal<B> where B: Send + Sync + Clone + 'static, F: Fn(B, A) -> B + Send + Sync + 'static

Scan a stream and accumulate its event firings in a signal.

Starting at some initial value, each new event changes the value of the resulting signal as prescribed by the supplied function.

let sink = Sink::new();
let sum = sink.stream().scan(0, |a, b| a + b);
assert_eq!(sum.sample(), 0);
sink.send(2);
assert_eq!(sum.sample(), 2);
sink.send(4);
assert_eq!(sum.sample(), 6);

fn scan_mut<B, F>(&self, initial: B, f: F) -> SignalMut<B> where B: Send + Sync + 'static, F: Fn(&mut B, A) + Send + Sync + 'static

Scan a stream and accumulate its event firings in some mutable state.

Semantically this is equivalent to scan. However, it allows one to use a non-Clone type as an accumulator and update it with efficient in-place operations.

The resulting SignalMut does have a slightly different API from a regular Signal as it does not allow clones.

Example

let sink: Sink<i32> = Sink::new();
let sum = sink.stream()
    .scan_mut(0, |sum, a| *sum += a)
    .combine(&Signal::new(()), |sum, ()| *sum);
assert_eq!(sum.sample(), 0);
sink.send(2);
assert_eq!(sum.sample(), 2);
sink.send(4);
assert_eq!(sum.sample(), 6);

impl<A: Clone + Send + Sync + 'static> Stream<Option<A>>

fn filter_some(&self) -> Stream<A>

Filter a stream of options.

filter_some creates a new stream that only fires the unwrapped Some(…) events from the original stream omitting any None events.

let sink = Sink::new();
let mut events = sink.stream().filter_some().events();
sink.send(None); // won't arrive
sink.send(Some(5)); // will arrive
assert_eq!(events.next(), Some(5));

impl<A: Send + Sync + Clone + 'static> Stream<Stream<A>>

fn switch(&self) -> Stream<A>

Switch between streams.

This takes a stream of streams and maps it to a new stream, which fires all events from the most recent stream fired into it.

Example

// Create sinks
let stream_sink: Sink<Stream<i32>> = Sink::new();
let sink1: Sink<i32> = Sink::new();
let sink2: Sink<i32> = Sink::new();

// Switch and listen
let switched = stream_sink.stream().switch();
let mut events = switched.events();

// Should not receive events from either sink
sink1.send(1); sink2.send(2);

// Now switch to sink 2
stream_sink.send(sink2.stream());
sink1.send(3); sink2.send(4);
assert_eq!(events.next(), Some(4));

// And then to sink 1
stream_sink.send(sink1.stream());
sink1.send(5); sink2.send(6);
assert_eq!(events.next(), Some(5));

Trait Implementations

impl<A> Clone for Stream<A>

fn clone(&self) -> Stream<A>

fn clone_from(&mut self, source: &Self)