Documentation Index
Fetch the complete documentation index at: https://mintlify.com/statelyai/xstate/llms.txt
Use this file to discover all available pages before exploring further.
The fromObservable() function creates actor logic from an observable stream. This is useful for reactive programming patterns and is compatible with RxJS observables.
Signature
function fromObservable<
TContext,
TInput extends NonReducibleUnknown,
TEmitted extends EventObject = EventObject
>(
observableCreator: ({
input,
system,
self,
emit
}: {
input: TInput;
system: AnyActorSystem;
self: ObservableActorRef<TContext>;
emit: (emitted: TEmitted) => void;
}) => Subscribable<TContext>
): ObservableActorLogic<TContext, TInput, TEmitted>;
Parameters
A function that creates an observable. Receives an object with:Data provided to the observable actor when created or invoked.
self
ObservableActorRef<TContext>
Reference to the observable actor itself.
The actor system to which the observable actor belongs.
emit
(emitted: TEmitted) => void
Function to emit custom events to subscribers.
Should return a Subscribable (compatible with RxJS Observable).
Returns
Actor logic that can be used with createActor() or invoked in a state machine.
Usage
Basic Example with RxJS
import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
const intervalLogic = fromObservable(() => interval(1000));
const actor = createActor(intervalLogic);
actor.subscribe((snapshot) => {
console.log('Count:', snapshot.context);
});
actor.start();
// Count: 0
// Count: 1
// Count: 2
// ...
With RxJS Operators
import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';
const logic = fromObservable(() =>
interval(1000).pipe(
map(n => n * 2),
filter(n => n % 4 === 0),
take(5)
)
);
const actor = createActor(logic);
actor.subscribe((snapshot) => {
console.log('Value:', snapshot.context);
if (snapshot.status === 'done') {
console.log('Observable completed');
}
});
actor.start();
// Value: 0
// Value: 4
// Value: 8
// Value: 12
// Value: 16
// Observable completed
import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
import { map } from 'rxjs/operators';
type Input = {
period: number;
multiplier: number;
};
const logic = fromObservable<number, Input>(({ input }) =>
interval(input.period).pipe(
map(n => n * input.multiplier)
)
);
const actor = createActor(logic, {
input: { period: 500, multiplier: 10 }
});
actor.subscribe((snapshot) => {
console.log(snapshot.context);
});
actor.start();
// 0, 10, 20, 30, ...
Invoking in a Machine
import { setup, fromObservable } from 'xstate';
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';
const timerLogic = fromObservable<number, { duration: number }>(({ input }) =>
interval(1000).pipe(take(input.duration))
);
const machine = setup({
actors: {
timer: timerLogic
}
}).createMachine({
initial: 'idle',
states: {
idle: {
on: {
START: 'timing'
}
},
timing: {
invoke: {
src: 'timer',
input: { duration: 5 },
onDone: 'complete'
},
on: {
CANCEL: 'idle'
}
},
complete: {}
}
});
Custom Observable Implementation
import { fromObservable, createActor } from 'xstate';
import type { Subscribable } from 'xstate';
function customObservable(interval: number): Subscribable<number> {
return {
subscribe(observer) {
let count = 0;
const id = setInterval(() => {
observer.next?.(count++);
}, interval);
return {
unsubscribe() {
clearInterval(id);
}
};
}
};
}
const logic = fromObservable(() => customObservable(1000));
const actor = createActor(logic);
actor.subscribe((snapshot) => {
console.log(snapshot.context);
});
actor.start();
Mouse Position Stream
import { fromObservable } from 'xstate';
import { fromEvent } from 'rxjs';
import { map, throttleTime } from 'rxjs/operators';
type Position = { x: number; y: number };
const mousePositionLogic = fromObservable<Position>(() =>
fromEvent<MouseEvent>(document, 'mousemove').pipe(
throttleTime(100),
map(event => ({ x: event.clientX, y: event.clientY }))
)
);
WebSocket Stream
import { fromObservable } from 'xstate';
import { Observable } from 'rxjs';
type Message = { type: string; data: unknown };
type Input = { url: string };
const websocketLogic = fromObservable<Message, Input>(({ input }) =>
new Observable<Message>(subscriber => {
const ws = new WebSocket(input.url);
ws.addEventListener('message', (event) => {
subscriber.next(JSON.parse(event.data));
});
ws.addEventListener('error', (error) => {
subscriber.error(error);
});
ws.addEventListener('close', () => {
subscriber.complete();
});
return () => {
ws.close();
};
})
);
Combining Multiple Streams
import { fromObservable } from 'xstate';
import { combineLatest, interval } from 'rxjs';
import { map } from 'rxjs/operators';
type CombinedData = {
counter: number;
timestamp: number;
};
const combinedLogic = fromObservable<CombinedData>(() =>
combineLatest([
interval(1000),
interval(500).pipe(map(() => Date.now()))
]).pipe(
map(([counter, timestamp]) => ({ counter, timestamp }))
)
);
Error Handling
import { fromObservable, createActor } from 'xstate';
import { Observable } from 'rxjs';
const riskyLogic = fromObservable(() =>
new Observable(subscriber => {
let count = 0;
const id = setInterval(() => {
if (count < 5) {
subscriber.next(count++);
} else {
subscriber.error(new Error('Count exceeded!'));
}
}, 1000);
return () => clearInterval(id);
})
);
const actor = createActor(riskyLogic);
actor.subscribe({
next: (snapshot) => {
if (snapshot.status === 'error') {
console.error('Error:', snapshot.error);
} else {
console.log('Value:', snapshot.context);
}
},
error: (err) => {
console.error('Observer error:', err);
}
});
actor.start();
Emitting Custom Events
import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
type TickEvent = { type: 'tick'; count: number };
const logic = fromObservable<number, void, TickEvent>(({ emit }) => {
return interval(1000).pipe(
map(n => {
emit({ type: 'tick', count: n });
return n;
})
);
});
const actor = createActor(logic);
actor.on('tick', (event) => {
console.log('Tick:', event.count);
});
actor.start();
Event Observable
XState also provides fromEventObservable() for observables that emit events to the parent:
import { fromEventObservable, createActor } from 'xstate';
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';
const clickLogic = fromEventObservable(() =>
fromEvent<MouseEvent>(document, 'click').pipe(
map(event => ({
type: 'CLICK',
x: event.clientX,
y: event.clientY
}))
)
);
Snapshot
The observable actor snapshot has the following structure:
interface ObservableSnapshot<TContext, TInput> {
status: 'active' | 'done' | 'error' | 'stopped';
context: TContext | undefined;
output: undefined;
error: unknown;
input: TInput | undefined;
}
Status Values
'active' - Observable is emitting values
'done' - Observable completed successfully
'error' - Observable encountered an error
'stopped' - Actor was stopped before observable completed
Behavior
- Context updates: Each emitted value updates the actor’s
context
- Completion: Observable completion transitions the actor to
done status
- Error handling: Observable errors transition the actor to
error status
- Cleanup: Unsubscribes from the observable when actor is stopped
- No restart: Completed observables are not restarted
Type Parameters
The type of values emitted by the observable.
TInput
type
default:"NonReducibleUnknown"
The type of the input data.
TEmitted
EventObject
default:"EventObject"
The type of events that can be emitted.
Subscribable Interface
XState’s Subscribable interface is compatible with RxJS Observable:
interface Subscribable<T> {
subscribe(observer: {
next?: (value: T) => void;
error?: (err: unknown) => void;
complete?: () => void;
}): Subscription;
}
interface Subscription {
unsubscribe(): void;
}
See Also