1. Introduction
This section is non-normative.
2. Core infrastructure
2.1.
The
Subscriber
interface
[Exposed=*]interface {
Subscriber undefined next (any );
value undefined error (any );
error undefined complete ();undefined addTeardown (VoidFunction ); // True after the Subscriber is created, up until either // complete()/error() are invoked, or the subscriber unsubscribes. Inside // complete()/error(), this attribute is true.
teardown readonly attribute boolean active ;readonly attribute AbortSignal signal ; };
Each
Subscriber
has
a
next
algorithm
,
which
is
a
next
steps
.
Each
Subscriber
has
a
error
algorithm
,
which
is
an
error
steps
.
Each
Subscriber
has
a
complete
algorithm
,
which
is
a
complete
steps
.
Each
Subscriber
has
a
teardown
callbacks
,
which
is
a
list
of
VoidFunction
s,
initially
empty.
Each
Subscriber
has
a
subscription
controller
,
which
is
an
AbortController
.
Each
Subscriber
has
a
active
boolean,
initially
true.
Note:
This
is
a
bookkeeping
variable
to
ensure
that
a
Subscriber
never
calls
any
of
the
callbacks
it
owns
after
it
has
been
closed
.
The
active
getter
steps
are
to
return
this
's
’s
active
boolean.
The
signal
getter
steps
are
to
return
this
's
’s
subscription
controller
's
’s
signal
.
next(
value
)
method
steps
are:
-
If this
's’s relevant global object is aWindow
object, and its associated Document is not fully active , then return. -
Run this
's’s next algorithm given value .Assert : No exception was thrown .
Note: No exception can be thrown here because in the case where next algorithm is just a wrapper around a script-provided callback, the process observer steps take care to wrap these callbacks in logic that, when invoking them, catches any exceptions, and reports them to the global.
When the next algorithm is a spec algorithm, those steps take care to not throw any exceptions outside of itself, to appease this assert.
error(
error
)
method
steps
are:
-
If this
's’s active is false, report the exception error , then return. -
If this
's’s relevant global object is aWindow
object, and its associated Document is not fully active , then return. -
Run this
's’s error algorithm given error .Assert : No exception was thrown .
Note: See the documentation in
next()
for details on why this is true.
complete()
method
steps
are:
-
If this
's’s relevant global object is aWindow
object, and its associated Document is not fully active , then return. -
Run this
's’s complete algorithm .Assert : No exception was thrown .
Note: See the documentation in
next()
for details on why this is true.
addTeardown(
teardown
)
method
steps
are:
-
If this
's’s relevant global object is aWindow
object, and its associated Document is not fully active , then return. -
If this
's’s active is true, then append teardown to this's’s teardown callbacks list. -
Otherwise, invoke teardown .
If an exception E was thrown , then report the exception E .
Subscriber
subscriber
,
and
an
optional
any
reason
,
run
these
steps:
-
If subscriber ’s active is false, then return.
This guards against re-entrant invocation, which can happen in the "producer-initiated" unsubscription case. Consider the following example:
const outerController= new AbortController(); const observable= new Observable( subscriber=> { subscriber. addTeardown(() => { // 2.) This teardown executes inside the "Close" algorithm, while it’s // running. Aborting the downstream signal run its abort algorithms, // one of which is the currently-running "Close" algorithm. outerController. abort(); }); // 1.) This immediately invokes the "Close" algorithm, which // sets subscriber.active to false. subscriber. complete(); }); observable. subscribe({}, { signal: outerController. signal}); -
Set subscriber ’s active boolean to false.
-
Signal abort subscriber ’s subscription controller with reason , if it is given.
-
For each teardown of subscriber ’s teardown callbacks sorted in reverse insertion order:
-
If subscriber ’s relevant global object is a
Window
object, and its associated Document is not fully active , then abort these steps.Note: This step runs repeatedly because each teardown could result in the above
Document
becoming inactive. -
Invoke teardown .
If an exception E was thrown , then report the exception E .
-
2.2.
The
Observable
interface
// SubscribeCallback is where the Observable "creator's" code lives. It's // called when subscribe() is called, to set up a new subscription.callback =
SubscribeCallback undefined (Subscriber );
subscriber callback =
ObservableSubscriptionCallback undefined (any );
value dictionary {
SubscriptionObserver ObservableSubscriptionCallback ;
next ObservableSubscriptionCallback ;
error VoidFunction ; };
complete callback =
ObservableInspectorAbortHandler undefined (any );
value dictionary {
ObservableInspector ObservableSubscriptionCallback ;
next ObservableSubscriptionCallback ;
error VoidFunction ;
complete VoidFunction ;
subscribe ObservableInspectorAbortHandler ; };
abort typedef (ObservableSubscriptionCallback or SubscriptionObserver );
ObserverUnion typedef (ObservableSubscriptionCallback or ObservableInspector );
ObservableInspectorUnion dictionary {
SubscribeOptions AbortSignal ; };
signal callback =
Predicate boolean (any ,
value unsigned long long );
index callback =
Reducer any (any ,
accumulator any );
currentValue callback =
Mapper any (any ,
value unsigned long long ); // Differs from Mapper only in return type, since this callback is exclusively // used to visit each element in a sequence, not transform it.
index callback =
Visitor undefined (any ,
value unsigned long long ); // This callback returns an `any` that must convert into an `Observable`, via // the `Observable` conversion semantics.
index callback =
CatchCallback any (any ); [Exposed=*]
value interface {
Observable constructor (SubscribeCallback );
callback undefined subscribe (optional ObserverUnion = {},
observer optional SubscribeOptions = {}); // Constructs a native Observable from value if it's any of the following: // - Observable // - AsyncIterable // - Iterable // - Promise
options );static Observable from (any ); // Observable-returning operators. See "Operators" section in the spec. // // takeUntil() can consume promises, iterables, async iterables, and other // observables.
value );Observable takeUntil (any );
value Observable map (Mapper );
mapper Observable filter (Predicate );
predicate Observable take (unsigned long long );
amount Observable drop (unsigned long long );
amount Observable flatMap (Mapper );
mapper Observable switchMap (Mapper );
mapper Observable inspect (optional ObservableInspectorUnion = {});
inspect_observer Observable catch (CatchCallback );
callback Observable finally (VoidFunction ); // Promise-returning operators.
callback Promise <sequence <any >>toArray (optional SubscribeOptions = {});
options Promise <undefined >forEach (Visitor ,
callback optional SubscribeOptions = {});
options Promise <boolean >every (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <any >first (optional SubscribeOptions = {});
options Promise <any >last (optional SubscribeOptions = {});
options Promise <any >find (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <boolean >some (Predicate ,
predicate optional SubscribeOptions = {});
options Promise <any >reduce (Reducer ,
reducer optional any ,
initialValue optional SubscribeOptions = {}); };
options
Each
Observable
has
a
subscribe
callback
,
which
is
a
SubscribeCallback
or
a
set
of
steps
that
take
in
a
Subscriber
.
Note:
The
"union"
of
these
types
is
to
support
both
Observable
s
created
by
JavaScript
(that
are
always
constructed
with
a
SubscribeCallback
),
and
natively-constructed
Observable
objects
(whose
subscribe
callback
could
be
an
arbitrary
set
of
native
steps,
not
a
JavaScript
callback).
The
return
value
of
when()
is
an
example
of
the
latter.
new
Observable(
callback
)
constructor
steps
are:
-
Set this
's’s subscribe callback to callback .Note: This callback will get invoked later when
subscribe()
is called.
subscribe(
observer
,
options
)
method
steps
are:
2.2.1. Supporting concepts
any
error
,
and
runs
these
steps:
-
Report the exception error .
Note:
We
pull
this
default
out
separately
so
that
every
place
in
this
specification
that
natively
subscribes
to
an
Observable
(i.e.,
subscribes
from
spec
prose,
not
going
through
the
subscribe()
method)
doesn’t
have
to
redundantly
define
these
steps.
An internal observer is a struct with the following items :
- next steps
-
An algorithm that takes a single parameter of type
any
. Initially, these steps do nothing. - error steps
-
An algorithm that takes a single parameter of type
any
. Initially, the default error algorithm . - complete steps
-
An algorithm with no parameters. Initially, these steps do nothing.
The
internal
observer
struct
is
used
to
mirror
the
next
,
error
,
and
complete
callback
functions
.
For
any
Observable
that
is
subscribed
by
JavaScript
via
the
subscribe()
method,
these
algorithm
"steps"
will
just
be
a
wrapper
around
invoking
the
corresponding
next
,
error
,
and
complete
callback
functions
provided
by
script.
But
when
internal
spec
prose
(not
user
script)
subscribes
to
an
Observable
,
these
"steps"
are
arbitrary
spec
algorithms
that
are
not
provided
via
an
ObserverUnion
packed
with
Web
IDL
callback
functions
.
See
the
§ 2.3.3
Promise-returning
operators
that
make
use
of
this,
for
example.
any
value
,
run
these
steps:
Note:
We
split
this
algorithm
out
from
the
Web
IDL
from()
method,
so
that
spec
prose
can
convert
values
to
without
going
through
the
Web
IDL
bindings.
If Type ( value ) is not Object , throw a
TypeError
.Note: This prevents primitive types from being coerced into iterables (e.g., String). See discussion in WICG/observable#125 .
From Observable : If value ’s specific type is an
Observable
, then return value .Spec the From async iterable conversion steps which take place before the iterable conversion steps.
From iterable : Let iteratorMethod be ? GetMethod ( value ,
%Symbol.iterator%
).If iteratorMethod is undefined, then jump to the step labeled From Promise .
Otherwise, return a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:Let iteratorRecordCompletion be GetIterator ( value , sync).
If iteratorRecordCompletion is a throw completion , then run subscriber ’s
error()
method, given iteratorRecordCompletion ’s [[Value]], and abort these steps.Let iteratorRecord be ! iteratorRecordCompletion .
While true:
Let next be IteratorStepValue ( iteratorRecord ).
If next is a throw completion , then run subscriber ’s
error()
method, given next ’s [[Value]], and break .Set next to ! to next .
If next is done, then:
Assert : iteratorRecord ’s [[Done]] is true.
Run subscriber ’s
complete()
.Return.
Run subscriber ’s
next()
given next .
From Promise : If IsPromise ( value ) is true, then:
Return a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:React to value :
If value was fulfilled with value v , then:
Run subscriber ’s
next()
method, given v .Run subscriber ’s
complete()
method.
If value was rejected with reason r , then run subscriber ’s
error()
method, given r .
Observable
given
an
ObserverUnion
-or-
internal
observer
observer
,
and
a
SubscribeOptions
options
,
run
these
steps:
Note:
We
split
this
algorithm
out
from
the
Web
IDL
subscribe()
method,
so
that
spec
prose
can
subscribe
to
an
Observable
without
going
through
the
Web
IDL
bindings.
See
w3c/IntersectionObserver#464
for
similar
context,
where
"internal"
prose
must
not
go
through
Web
IDL
bindings
on
objects
whose
properties
could
be
mutated
by
JavaScript.
See
§ 2.3.3
Promise-returning
operators
for
usage
of
this.
-
If this
's’s relevant global object is aWindow
object, and its associated Document is not fully active , then return. -
Let internal observer be a new internal observer .
-
Process observer as follows:
-
-
If
observer
is
an
ObservableSubscriptionCallback
-
Set
internal
observer
’s
next
steps
to
these
steps
that
take
an
any
value :-
Invoke observer with value .
If an exception E was thrown , then report the exception E .
-
-
If
observer
is
a
SubscriptionObserver
-
-
If observer ’s
next
exists , then set internal observer ’s next steps to these steps that take anany
value :-
Invoke observer ’s
next
with value .If an exception E was thrown , then report the exception E .
-
-
If observer ’s
error
exists , then set internal observer ’s error steps to these steps that take anany
error :-
Invoke observer ’s
error
with error .If an exception E was thrown , then report the exception E .
-
-
If observer ’s
complete
exists , then set internal observer ’s complete steps to these steps:-
If an exception E was thrown , then report the exception E .
-
-
- If observer is an internal observer
- Set internal observer to observer .
-
If
observer
is
an
-
-
Assert : internal observer ’s error steps is either the default error algorithm , or an algorithm that invokes the provided
error
callback function . -
Let subscriber be a new
Subscriber
, initialized as:- next algorithm
-
internal observer ’s next steps
- error algorithm
-
internal observer ’s error steps
- complete algorithm
-
internal observer ’s complete steps
-
If options ’s
signal
exists , then:-
If options ’s
signal
is aborted , then close subscriber given options ’ssignal
abort reason . -
Otherwise, add the following abort algorithm to options ’s
signal
:-
Close subscriber with options ’s
signal
abort reason .
-
-
-
If this
's’s subscribe callback is aSubscribeCallback
, invoke it with subscriber .If an exception E was thrown , call subscriber ’s
error()
method with E . -
Otherwise, run the steps given by this
's’s subscribe callback , given subscriber .
Tests
2.3. Operators
For now, see https://github.com/wicg/observable#operators .
2.3.1.
from()
from(
value
)
method
steps
are:
Return the
exact semanticsresult of converting value to anfrom()Observableconversion.
2.3.2.
Observable
-returning
operators
takeUntil(
notifier
value
)
method
steps
are:
-
Let sourceObservable be this .
-
Let notifier be the result of converting value to an Observable.
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:Note that this method involves Subscribing to twoObservable
s: (1) notifier , and (2) sourceObservable . We "unsubscribe" from both of them in the following situations:-
notifier starts emitting values (either "next" or "error"). In this case, we unsubscribe from notifier since we got all we need from it, and no longer need it to keep producing values. We also unsubscribe from sourceObservable , because it no longer needs to produce values that get plumbed through this method’s returned observable , because we’re manually ending the subscription to observable , since notifier finally produced a value.
-
sourceObservable either
error()
s orcomplete()
s itself. In this case, we unsubscribe from notifier since we no longer need to listen for values it emits in order to determine when observable can stop mirroring values from sourceObservable (since sourceObservable ran to completion by itself). Unsubscribing from sourceObservable isn’t necessary, since its subscription has been exhausted by itself.
-
Let notifierObserver be a new internal observer , initialized as follows:
- next steps
-
Run subscriber ’s
complete()
method.Note: This will "unsubscribe" from sourceObservable , if it has been subscribed to by this point. This is because sourceObservable is subscribed to with the "outer" subscriber ’s subscription controller
's’s signal as an input signal, and that signal will get aborted when the "outer" subscriber ’scomplete()
is called above (and below). - error steps
-
Run subscriber ’s
complete()
method.
Note: We do not specify complete steps , because if the notifier
Observable
completes itself, we do not need to complete the subscriber associated with the observable returned from this method. Rather, the observable will continue to mirror sourceObservable uninterrupted. -
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to notifier given notifierObserver and options .
-
If subscriber ’s active is false, then return.
Note: This means that sourceObservable ’s subscribe callback will not even get invoked once, if notifier synchronously emits a value. If notifier only "completes" synchronously though (without emitting a "next" or "error" value), then subscriber ’s active will still be true, and we proceed to subscribe to sourceObservable , which observable will mirror uninterrupted.
-
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
-
Run subscriber ’s
next()
method, given the passed in value . - error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
Run subscriber ’s
complete()
method.
Note: sourceObserver is mostly a pass-through, mirroring everything that sourceObservable emits, with the exception of having the ability to unsubscribe from the notifier
Observable
in the case where sourceObservable is exhausted before notifier emits anything. -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
map(
mapper
)
method
steps
are:
-
Let sourceObservable be this .
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let idx be an
unsigned long long
, initially 0. -
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
-
-
Invoke mapper with the passed in value , and idx , and let mappedValue be the returned value.
If an exception E was thrown , then run subscriber ’s
error()
method, given E , and abort these steps. -
Increment idx .
-
Run subscriber ’s
next()
method, given mappedValue .
-
- error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
Run subscriber ’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
filter(
predicate
)
method
steps
are:
-
Let sourceObservable be this .
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let idx be an
unsigned long long
, initially 0. -
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
-
-
Invoke predicate with the passed in value and idx , and let matches be the returned value.
If an exception E was thrown , then run subscriber ’s
error()
method, given E , and abort these steps. -
Set idx to idx + 1.
-
If matches is true, then run subscriber ’s
next()
method, given value .
-
- error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
Run subscriber ’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
take(
amount
)
method
steps
are:
-
Let sourceObservable be this .
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let remaining be amount .
-
If remaining is 0, then run subscriber ’s
complete()
method and abort these steps. -
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
-
-
Run subscriber ’s
next()
method with the passed in value . -
Decrement remaining .
-
If remaining is 0, then run subscriber ’s
complete()
method.
-
- error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
Run subscriber ’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
drop(
amount
)
method
steps
are:
-
Let sourceObservable be this .
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let remaining be amount .
-
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
- error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
Run subscriber ’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
flatMap(
mapper
)
method
steps
are:
-
Let sourceObservable be this .
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let idx be an
unsigned long long
, initially 0. -
Let outerSubscriptionHasCompleted to a boolean , initially false.
-
Let queue be a new list of
any
values, initially empty.Note: This queue is used to store any
Observable
s emitted by sourceObservable , while observable is currently subscribed to anObservable
emitted earlier by sourceObservable that has not yet been exhausted. -
Let activeInnerSubscription be a boolean , initially false.
-
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
-
-
If activeInnerSubscription is true, then:
-
Append value to queue .
Note: This value will eventually be processed once the
Observable
that is currently subscribed-to (as indicated by activeInnerSubscription ) is exhausted.
-
-
Otherwise:
-
Set activeInnerSubscription to true.
-
Run the flatmap process next value steps with value , subscriber , mapper , and references to all of the following: queue , activeInnerSubscription , outerSubscriptionHasCompleted , and idx .
Note: This flatmap process next value steps will subscribe to the
Observable
derived from value (if one such can be derived) and keep processing values from it until its subscription becomes inactive (either by error or completion). If this "inner"Observable
completes, then the processing steps will recursively invoke themselves with the nextany
in queue .If no such value exists , then the processing steps will terminate, unsetting activeInnerSubscription , so that future values emitted from sourceObservable are processed correctly.
-
-
- error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
-
Set outerSubscriptionHasCompleted to true.
Note: If activeInnerSubscription is true, then the below step will not complete subscriber . In that case, the flatmap process next value steps will be responsible for completing subscriber when queue is empty , after the "inner" subscription becomes inactive.
-
If activeInnerSubscription is false and queue is empty , run subscriber ’s
complete()
method.
-
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
any
value
,
a
Subscriber
subscriber
,
a
Mapper
mapper
,
and
references
to
all
of
the
following:
a
list
of
any
values
queue
,
a
boolean
activeInnerSubscription
,
a
boolean
outerSubscriptionHasCompleted
,
and
an
unsigned
long
long
idx
:
-
Let mappedResult be the result of invoking mapper with value and idx .
If an exception E was thrown , then run subscriber ’s
error()
method, given E , and abort these steps. -
Set idx to idx + 1.
-
Let innerObservable be the result of calling
from()
with mappedResult .If an exception E was thrown , then run subscriber ’s
error()
method, given E , and abort these steps.We shouldn’t invoke
from()
directly. Rather, we should call some internal algorithm that passes-back the exceptions for us to handle properly here, since we want to pipe them to subscriber . -
Let innerObserver be a new internal observer , initialized as follows:
- next steps
-
Run subscriber ’s
next()
method, given the passed in value . - error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
-
If queue is not empty, then:
-
Let nextValue be the first item in queue ; remove remove this item from queue .
-
Run flatmap process next value steps given nextValue , subscriber , mapper , and references to queue and activeInnerSubscription .
-
-
Otherwise:
-
Set activeInnerSubscription to false.
Note: Because activeInnerSubscription is a reference, this has the effect of ensuring that all subsequent values emitted from the "outer"
Observable
(called sourceObservable . -
If outerSubscriptionHasCompleted is true, run subscriber ’s
complete()
method.Note: This means the "outer"
Observable
has already completed, but did not proceed to complete subscriber yet because there was at least one more pending "inner"Observable
(i.e., innerObservable ) that had already been queued and had not yet completed. Until right now!
-
-
-
Let innerOptions be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to innerObservable given innerObserver and innerOptions .
switchMap(
mapper
)
method
steps
are:
-
Let sourceObservable be this .
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let idx be an
unsigned long long
, initially 0. -
Let outerSubscriptionHasCompleted be a boolean , initially false.
-
Let activeInnerAbortController be an
AbortController
-or-null, initially null.Note: This
AbortController
is assigned to a newAbortController
only by this algorithm’s next steps (below), and only assigned to null by the switchmap process next value steps , when the "inner"Observable
either completes or errors. This variable is used as a marker for whether there is currently an active "inner" subscription. The complete steps below care about this, because if sourceObservable completes while there is an active "inner" subscription, we do not immediately complete subscriber . In that case, subscriber ’s completion becomes blocked on the "inner" subscription’s completion. -
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
-
-
If activeInnerAbortController is not null, then signal abort activeInnerAbortController .
Note: This "unsubscribes" from the "inner"
Observable
that was derived from the value that was last pushed from sourceObservable . Then we immediately subscribe to the newObservable
that we’re about to derive from value , i.e., the most-recently pushed value from sourceObservable . -
Set activeInnerAbortController to a new
AbortController
. -
Run the switchmap process next value steps with value , subscriber , mapper , and references to all of the following: activeInnerAbortController , outerSubscriptionHasCompleted , and idx .
Note: The switchmap process next value steps will subscribe to the
Observable
derived from value (if one such can be derived) and keep processing values from it until either (1) its subscription becomes inactive (either by error or completion), or (2) activeInnerAbortController gets aborted , due to sourceObservable pushing another newer value that will replace the current "inner" subscription.
-
- error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
-
Set outerSubscriptionHasCompleted to true.
Note: If activeInnerAbortController is not null, then we don’t immediately complete subscriber . Instead, the switchmap process next value steps will complete subscriber when the inner subscription finally completes itself.
-
If activeInnerAbortController is null, run subscriber ’s
complete()
method.
-
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
any
value
,
a
Subscriber
subscriber
,
a
Mapper
mapper
,
and
references
to
all
of
the
following:
an
AbortController
activeInnerAbortController
,
a
boolean
outerSubscriptionHasCompleted
,
and
an
unsigned
long
long
idx
are
to
run
these
steps:
-
Let mappedResult be the result of invoking mapper with value and idx .
If an exception E was thrown , then run subscriber ’s
error()
method, given E , and abort these steps. -
Set idx to idx + 1.
-
Let innerObservable be the result of calling
from()
with mappedResult .If an exception E was thrown , then run subscriber ’s
error()
method, given E , and abort these steps. -
Let innerObserver be a new internal observer , initialized as follows:
- next steps
-
Run subscriber ’s
next()
method, given the passed in value . - error steps
-
Run subscriber ’s
error()
method, given the passed in error .Note: We don’t have to set activeInnerAbortController to null here, to signal to the
switchMap()
method steps above that the inner "subscription" has been canceled. That’s because calling subscriber ’serror()
method already unsubscribes from the "outer" source Observable, so it will not be able to push any more values to theswitchMap()
internal observer. - complete steps
-
-
If outerSubscriptionHasCompleted is true, run subscriber ’s
complete()
method. -
Otherwise, set activeInnerAbortController to null.
Note: Because this variable is a reference, it signals to the switchMap complete steps that there is no active inner subscription.
-
-
Let innerOptions be a new
SubscribeOptions
whosesignal
is the result of creating a dependent abort signal from the list « activeInnerAbortController ’s signal , subscriber ’s subscription controller's’s signal », usingAbortSignal
, and the current realm . -
Subscribe to innerObservable given innerObserver and innerOptions .
inspect(
inspector_union
)
method
steps
are:
-
Let subscribe callback be a
VoidFunction
-or-null, initially null. -
Let next callback be a
ObservableSubscriptionCallback
-or-null, initially null. -
Let error callback be a
ObservableSubscriptionCallback
-or-null, initially null. -
Let complete callback be a
VoidFunction
-or-null, initially null. -
Let abort callback be a
ObservableInspectorAbortHandler
-or-null, initially null. -
Process inspector_union as follows:
-
If
inspector_union
is
an
ObservableSubscriptionCallback
-
-
Set next callback to inspector_union .
-
-
If
inspector_union
is
an
ObservableInspector
-
-
If
subscribe
exists in inspector_union , then set subscribe callback to it. -
If
next
exists in inspector_union , then set next callback to it. -
If
error
exists in inspector_union , then set error callback to it. -
If
complete
exists in inspector_union , then set complete callback to it. -
If
abort
exists in inspector_union , then set abort callback to it.
-
-
If
inspector_union
is
an
-
Let sourceObservable be this .
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
If subscribe callback is not null, then invoke it.
If an exception E was thrown , then run subscriber ’s
error()
method, given E , and abort these steps.Note: The result of this is that sourceObservable is never subscribed to.
-
If abort callback is not null, then add the following abort algorithm to subscriber ’s subscription controller
's’s signal :-
Invoke abort callback with subscriber ’s subscription controller
's’s signal's’s abort reason .If an exception E was thrown , then report the exception E .
-
-
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
-
-
If next callback is not null, then invoke next callback with the passed in value .
If an exception E was thrown , then:
-
Remove abort callback from subscriber ’s subscription controller
's’s signal .Note: This step is important, because the abort callback is only meant to be called for consumer-initiated unsubscriptions. When the producer terminates the subscription (via subscriber ’s
error()
orcomplete()
methods) like below, we have to ensure that abort callback is not run.This matches Chromium’s implementation, but consider holding a reference to the originally-passed-in
SubscribeOptions
's’ssignal
and just invoking abort callback when it aborts. The result is likely the same, but needs investigation. -
Run subscriber ’s
error()
method, given E , and return.
-
-
Run subscriber ’s
next()
method with the passed in value .
-
- error steps
-
Remove abort callback from subscriber ’s subscription controller
's’s signal , and run subscriber ’serror()
method, given the passed in error . - complete steps
-
Remove abort callback from subscriber ’s subscription controller
's’s signal , and run subscriber ’scomplete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
catch(
callback
)
method
steps
are:
-
Let sourceObservable be this .
-
Let observable be a new
Observable
whose subscribe callback is an algorithm that takes aSubscriber
subscriber and does the following:-
Let sourceObserver be a new internal observer , initialized as follows:
- next steps
-
Run subscriber ’s
next()
method, given the passed in value . - error steps
-
-
Invoke callback with the passed in error . Let result be the returned value.
If an exception E was thrown , then run subscriber ’s
error()
with E , and abort these steps. -
Let innerObservable be the result of calling
from()
with result .If an exception E was thrown , then run subscriber ’s
error()
method, given E , and abort these steps.We shouldn’t invoke
from()
directly. Rather, we should call some internal algorithm that passes-back the exceptions for us to handle properly here, since we want to pipe them to subscriber . -
Let innerObserver be a new internal observer , initialized as follows:
- next steps
-
Run subscriber ’s
next()
method, given the passed in value . - error steps
-
Run subscriber ’s
error()
method, given the passed in error . - complete steps
-
Run subscriber ’s
complete()
method.
-
Let innerOptions be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to innerObservable given innerObserver and innerOptions .
Note: We’re free to subscribe to innerObservable here without first "unsubscribing" from sourceObservable , and without fear that sourceObservable will keep emitting values, because all of this is happening inside of the error steps associated with sourceObservable . This means sourceObservable has already completed its subscription and will no longer produce any values, and we are free to safely switch our source of values to innerObservable .
-
- complete steps
-
Run subscriber ’s
complete()
method.
-
Let options be a new
SubscribeOptions
whosesignal
is subscriber ’s subscription controller's’s signal . -
Subscribe to sourceObservable given sourceObserver and options .
-
-
Return observable .
finally(
callback
)
method
steps
are:
-
TODO: Spec this and use callback .
2.3.3.
Promise
-returning
operators
toArray(
options
)
method
steps
are:
-
Let p a new promise .
-
If options ’s
signal
is not null:-
If options ’s
signal
is aborted , then:-
Reject p with options ’s
signal
's’s abort reason . -
Return p .
-
-
Add the following abort algorithm to options ’s
signal
:-
Reject p with options ’s
signal
's’s abort reason .
Note: All we have to do here is reject p . Note that the subscription to this
Observable
will also be closed automatically, since the "inner" Subscriber gets closed in response to options ’ssignal
getting signal abort . -
-
-
Let values be a new list .
-
Let observer be a new internal observer , initialized as follows:
- next steps
-
Append the passed in value to values .
- error steps
-
Reject p with the passed in error .
- complete steps
-
Resolve p with values .
-
Return p .
forEach(
callback
,
options
)
method
steps
are:
-
Let p a new promise .
-
Let visitor callback controller be a new
AbortController
. -
Let internal options be a new
SubscribeOptions
whosesignal
is the result of creating a dependent abort signal from the list « visitor callback controller ’s signal , options ’ssignal
if non-null», usingAbortSignal
, and the current realm .Many trivial internal observers act as pass-throughs, and do not control the subscription to the
Observable
that they represent; that is, their error steps and complete steps are called when the subscription is terminated, and their next steps simply pass some version of the given value along the chain.For this operator, however, the below observer ’s next steps are responsible for actually aborting the underlying subscription to this , in the event that callback throws an exception. In that case, the
SubscribeOptions
's’ssignal
we pass through to " Subscribe to anObservable
", needs to be a dependent signal derived from options ’ssignal
, and theAbortSignal
of anAbortController
that the next steps below has access to, and can signal abort when needed. -
If internal options ’s
signal
is aborted , then:-
Reject p with internal options ’s
signal
's’s abort reason . -
Return p .
-
-
Add the following abort algorithm to internal options ’s
signal
:-
Reject p with internal options ’s
signal
's’s abort reason .Note: The fact that rejection of p is tied to internal options ’s
signal
, and not options ’ssignal
means, that any microtasks queued during the firing of options ’ssignal
's’sabort
event will run before p ’s rejection handler runs.
-
-
Let idx be an
unsigned long long
, initially 0. -
Let observer be a new internal observer , initialized as follows:
- next steps
-
-
Invoke callback with the passed in value , and idx .
If an exception E was thrown , then reject p with E , and signal abort visitor callback controller with E .
-
Increment idx .
-
- error steps
-
Reject p with the passed in error .
- complete steps
-
Return p .
every(
predicate
,
options
)
method
steps
are:
-
Let p a new promise .
-
Let controller be a new
AbortController
. -
Let internal options be a new
SubscribeOptions
whosesignal
is the result of creating a dependent abort signal from the list « controller ’s signal , options ’ssignal
if non-null», usingAbortSignal
, and the current realm . -
If internal options ’s
signal
is aborted , then:-
Reject p with internal options ’s
signal
's’s abort reason . -
Return p .
-
-
Add the following abort algorithm to internal options ’s
signal
:-
Reject p with internal options ’s
signal
's’s abort reason .
-
-
Let idx be an
unsigned long long
, initially 0. -
Let observer be a new internal observer , initialized as follows:
- next steps
-
-
Invoke predicate with the passed in value and idx , and let passed be the returned value.
If an exception E was thrown , then reject p with E , and signal abort controller with E .
-
Set idx to idx + 1.
-
If passed is false, then resolve p with false, and signal abort controller .
-
- error steps
-
Reject p with the passed in error .
- complete steps
-
Resolve p with true.
-
Return p .
first(
options
)
method
steps
are:
-
Let p a new promise .
-
Let controller be a new
AbortController
. -
Let internal options be a new
SubscribeOptions
whosesignal
is the result of creating a dependent abort signal from the list « controller ’s signal , options ’ssignal
if non-null», usingAbortSignal
, and the current realm . -
If internal options ’s
signal
is aborted , then:-
Reject p with internal options ’s
signal
's’s abort reason . -
Return p .
-
-
Add the following abort algorithm to internal options ’s
signal
:-
Reject p with internal options ’s
signal
's’s abort reason .
-
-
Let internal observer be a new internal observer , initialized as follows:
- next steps
-
-
Resolve p with the passed in value .
-
Signal abort controller .
-
- error steps
-
Reject p with the passed in error .
- complete steps
-
Note: This is only reached when the source
Observable
completes before it emits a single value; in this case, resolving withundefined
is harmless but makes it difficult to distinguish between the first value trule beingundefined
and premature completion. See #132 for discussion on this.
-
Subscribe to this given internal observer and internal options .
-
Return p .
last(
options
)
method
steps
are:
-
Let p a new promise .
-
If options ’s
signal
is not null:-
If options ’s
signal
is aborted , then:-
Reject p with options ’s
signal
's’s abort reason . -
Return p .
-
-
Add the following abort algorithm to options ’s
signal
:-
Reject p with options ’s
signal
's’s abort reason .
-
-
-
Let lastValue be an
any
-or-null, initially null. -
Let hasLastValue be a boolean , initially false.
-
Let observer be a new internal observer , initialized as follows:
- next steps
-
-
Set hasLastValue to true.
-
Set lastValue to the passed in value .
-
- error steps
-
Reject p with the passed in error .
- complete steps
-
Return p .
find(
predicate
,
options
)
method
steps
are:
-
Let p a new promise .
-
Let controller be a new
AbortController
. -
Let internal options be a new
SubscribeOptions
whosesignal
is the result of creating a dependent abort signal from the list « controller ’s signal , options ’ssignal
if non-null», usingAbortSignal
, and the current realm . -
If internal options ’s
signal
is aborted , then:-
Reject p with internal options ’s
signal
's’s abort reason . -
Return p .
-
-
Add the following abort algorithm to internal options ’s
signal
:-
Reject p with internal options ’s
signal
's’s abort reason .
-
-
Let idx be an
unsigned long long
, initially 0. -
Let observer be a new internal observer , initialized as follows:
- next steps
-
-
Invoke predicate with the passed in value an idx , and let passed be the returned value.
If an exception E was thrown , then reject p with E , and signal abort controller with E .
-
Set idx to idx + 1.
-
If passed is true, then resolve p with value , and signal abort controller .
-
- error steps
-
Reject p with the passed in error .
- complete steps
-
Return p .
some(
predicate
,
options
)
method
steps
are:
-
Let p a new promise .
-
Let controller be a new
AbortController
. -
Let internal options be a new
SubscribeOptions
whosesignal
is the result of creating a dependent abort signal from the list « controller ’s signal , options ’ssignal
if non-null», usingAbortSignal
, and the current realm . -
If internal options ’s
signal
is aborted , then:-
Reject p with internal options ’s
signal
's’s abort reason . -
Return p .
-
-
Add the following abort algorithm to internal options ’s
signal
:-
Reject p with internal options ’s
signal
's’s abort reason .
-
-
Let idx be an
unsigned long long
, initially 0. -
Let observer be a new internal observer , initialized as follows:
- next steps
-
-
Invoke predicate with the passed in value and idx , and let passed be the returned value.
If an exception E was thrown , then reject p with E , and signal abort controller with E .
-
Set idx to idx + 1.
-
If passed is true, then resolve p with true, and signal abort controller .
-
- error steps
-
Reject p with the passed in error .
- complete steps
-
Resolve p with false.
-
Return p .
reduce(
reducer
,
initialValue
,
options
)
method
steps
are:
-
TODO: Spec this and use reducer , initialValue , and options .
3.
EventTarget
integration
dictionary {
ObservableEventListenerOptions boolean =
capture false ;boolean ; };
passive partial interface EventTarget {= {});Observable when (DOMString ,
type optional ObservableEventListenerOptions = {}); };
options
when(
type
,
options
)
method
steps
are:
-
If this
's’s relevant global object is aWindow
object, and its associated Document is not fully active , then return. -
Let event target be this .
-
Let observable be a new
Observable
, initialized as follows:- subscribe callback
-
An algorithm that takes a
Subscriber
subscriber and runs these steps:-
If event target is null, abort these steps.
Note: This is meant to capture the fact that event target can be garbage collected by the time this algorithm runs upon subscription.
-
If subscriber ’s subscription controller
's’s signal is aborted , abort these steps. -
Add an event listener with event target and an event listener defined as follows:
- type
-
type
- callback
-
The result of creating a new Web IDL
EventListener
instance representing a reference to a function of one argument of typeEvent
event . This function executes the observable event listener invoke algorithm given subscriber and event . - capture
-
options ’s
capture
- passive
-
options ’s
passive
- once
-
false
- signal
-
null
Note: The
AbortSignal
for event listeners added bywhen()
is managed by theObservable
itself. Seesubscribe()
andSubscribeOptions
.
-
-
Return observable .
Subscriber
subscriber
and
an
Event
event
,
and
runs
these
steps:
-
Run subscriber ’s
next()
method with event .
Tests
4. Security & Privacy Considerations
This material is being upstreamed from our explainer into this specification, and in the meantime you can consult the following resources:
5. Acknowledgements
A
special
thanks
to
Ben
Lesh
for
much
of
the
design
input
for
the
Observable
API,
and
his
many
years
of
work
maintaining
userland
Observable
code
that
made
this
contribution
to
the
web
platform
possible.