Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/statelyai/xstate/llms.txt

Use this file to discover all available pages before exploring further.

The fromObservable() function creates actor logic from an observable stream. This is useful for reactive programming patterns and is compatible with RxJS observables.

Signature

function fromObservable<
  TContext,
  TInput extends NonReducibleUnknown,
  TEmitted extends EventObject = EventObject
>(
  observableCreator: ({
    input,
    system,
    self,
    emit
  }: {
    input: TInput;
    system: AnyActorSystem;
    self: ObservableActorRef<TContext>;
    emit: (emitted: TEmitted) => void;
  }) => Subscribable<TContext>
): ObservableActorLogic<TContext, TInput, TEmitted>;

Parameters

observableCreator
function
required
A function that creates an observable. Receives an object with:
input
TInput
Data provided to the observable actor when created or invoked.
self
ObservableActorRef<TContext>
Reference to the observable actor itself.
system
AnyActorSystem
The actor system to which the observable actor belongs.
emit
(emitted: TEmitted) => void
Function to emit custom events to subscribers.
Should return a Subscribable (compatible with RxJS Observable).

Returns

ObservableActorLogic
ActorLogic
Actor logic that can be used with createActor() or invoked in a state machine.

Usage

Basic Example with RxJS

import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';

const intervalLogic = fromObservable(() => interval(1000));

const actor = createActor(intervalLogic);

actor.subscribe((snapshot) => {
  console.log('Count:', snapshot.context);
});

actor.start();
// Count: 0
// Count: 1
// Count: 2
// ...

With RxJS Operators

import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
import { map, filter, take } from 'rxjs/operators';

const logic = fromObservable(() => 
  interval(1000).pipe(
    map(n => n * 2),
    filter(n => n % 4 === 0),
    take(5)
  )
);

const actor = createActor(logic);

actor.subscribe((snapshot) => {
  console.log('Value:', snapshot.context);
  if (snapshot.status === 'done') {
    console.log('Observable completed');
  }
});

actor.start();
// Value: 0
// Value: 4
// Value: 8
// Value: 12
// Value: 16
// Observable completed

With Input

import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';
import { map } from 'rxjs/operators';

type Input = { 
  period: number;
  multiplier: number;
};

const logic = fromObservable<number, Input>(({ input }) => 
  interval(input.period).pipe(
    map(n => n * input.multiplier)
  )
);

const actor = createActor(logic, {
  input: { period: 500, multiplier: 10 }
});

actor.subscribe((snapshot) => {
  console.log(snapshot.context);
});

actor.start();
// 0, 10, 20, 30, ...

Invoking in a Machine

import { setup, fromObservable } from 'xstate';
import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

const timerLogic = fromObservable<number, { duration: number }>(({ input }) =>
  interval(1000).pipe(take(input.duration))
);

const machine = setup({
  actors: {
    timer: timerLogic
  }
}).createMachine({
  initial: 'idle',
  states: {
    idle: {
      on: {
        START: 'timing'
      }
    },
    timing: {
      invoke: {
        src: 'timer',
        input: { duration: 5 },
        onDone: 'complete'
      },
      on: {
        CANCEL: 'idle'
      }
    },
    complete: {}
  }
});

Custom Observable Implementation

import { fromObservable, createActor } from 'xstate';
import type { Subscribable } from 'xstate';

function customObservable(interval: number): Subscribable<number> {
  return {
    subscribe(observer) {
      let count = 0;
      const id = setInterval(() => {
        observer.next?.(count++);
      }, interval);
      
      return {
        unsubscribe() {
          clearInterval(id);
        }
      };
    }
  };
}

const logic = fromObservable(() => customObservable(1000));

const actor = createActor(logic);
actor.subscribe((snapshot) => {
  console.log(snapshot.context);
});
actor.start();

Mouse Position Stream

import { fromObservable } from 'xstate';
import { fromEvent } from 'rxjs';
import { map, throttleTime } from 'rxjs/operators';

type Position = { x: number; y: number };

const mousePositionLogic = fromObservable<Position>(() =>
  fromEvent<MouseEvent>(document, 'mousemove').pipe(
    throttleTime(100),
    map(event => ({ x: event.clientX, y: event.clientY }))
  )
);

WebSocket Stream

import { fromObservable } from 'xstate';
import { Observable } from 'rxjs';

type Message = { type: string; data: unknown };
type Input = { url: string };

const websocketLogic = fromObservable<Message, Input>(({ input }) =>
  new Observable<Message>(subscriber => {
    const ws = new WebSocket(input.url);
    
    ws.addEventListener('message', (event) => {
      subscriber.next(JSON.parse(event.data));
    });
    
    ws.addEventListener('error', (error) => {
      subscriber.error(error);
    });
    
    ws.addEventListener('close', () => {
      subscriber.complete();
    });
    
    return () => {
      ws.close();
    };
  })
);

Combining Multiple Streams

import { fromObservable } from 'xstate';
import { combineLatest, interval } from 'rxjs';
import { map } from 'rxjs/operators';

type CombinedData = {
  counter: number;
  timestamp: number;
};

const combinedLogic = fromObservable<CombinedData>(() =>
  combineLatest([
    interval(1000),
    interval(500).pipe(map(() => Date.now()))
  ]).pipe(
    map(([counter, timestamp]) => ({ counter, timestamp }))
  )
);

Error Handling

import { fromObservable, createActor } from 'xstate';
import { Observable } from 'rxjs';

const riskyLogic = fromObservable(() =>
  new Observable(subscriber => {
    let count = 0;
    const id = setInterval(() => {
      if (count < 5) {
        subscriber.next(count++);
      } else {
        subscriber.error(new Error('Count exceeded!'));
      }
    }, 1000);
    
    return () => clearInterval(id);
  })
);

const actor = createActor(riskyLogic);

actor.subscribe({
  next: (snapshot) => {
    if (snapshot.status === 'error') {
      console.error('Error:', snapshot.error);
    } else {
      console.log('Value:', snapshot.context);
    }
  },
  error: (err) => {
    console.error('Observer error:', err);
  }
});

actor.start();

Emitting Custom Events

import { fromObservable, createActor } from 'xstate';
import { interval } from 'rxjs';

type TickEvent = { type: 'tick'; count: number };

const logic = fromObservable<number, void, TickEvent>(({ emit }) => {
  return interval(1000).pipe(
    map(n => {
      emit({ type: 'tick', count: n });
      return n;
    })
  );
});

const actor = createActor(logic);

actor.on('tick', (event) => {
  console.log('Tick:', event.count);
});

actor.start();

Event Observable

XState also provides fromEventObservable() for observables that emit events to the parent:
import { fromEventObservable, createActor } from 'xstate';
import { fromEvent } from 'rxjs';
import { map } from 'rxjs/operators';

const clickLogic = fromEventObservable(() =>
  fromEvent<MouseEvent>(document, 'click').pipe(
    map(event => ({
      type: 'CLICK',
      x: event.clientX,
      y: event.clientY
    }))
  )
);

Snapshot

The observable actor snapshot has the following structure:
interface ObservableSnapshot<TContext, TInput> {
  status: 'active' | 'done' | 'error' | 'stopped';
  context: TContext | undefined;
  output: undefined;
  error: unknown;
  input: TInput | undefined;
}

Status Values

  • 'active' - Observable is emitting values
  • 'done' - Observable completed successfully
  • 'error' - Observable encountered an error
  • 'stopped' - Actor was stopped before observable completed

Behavior

  • Context updates: Each emitted value updates the actor’s context
  • Completion: Observable completion transitions the actor to done status
  • Error handling: Observable errors transition the actor to error status
  • Cleanup: Unsubscribes from the observable when actor is stopped
  • No restart: Completed observables are not restarted

Type Parameters

TContext
type
The type of values emitted by the observable.
TInput
type
default:"NonReducibleUnknown"
The type of the input data.
TEmitted
EventObject
default:"EventObject"
The type of events that can be emitted.

Subscribable Interface

XState’s Subscribable interface is compatible with RxJS Observable:
interface Subscribable<T> {
  subscribe(observer: {
    next?: (value: T) => void;
    error?: (err: unknown) => void;
    complete?: () => void;
  }): Subscription;
}

interface Subscription {
  unsubscribe(): void;
}

See Also

Build docs developers (and LLMs) love