Skip to main content

Overview

Controllers are implemented as Redux-Observable epics—functions that listen for specific Redux actions, perform asynchronous operations using use cases, and dispatch new actions to update the store. They coordinate complex workflows involving multiple use cases and service interactions.

Epic pattern

All epics follow this structure:
export const exampleEpic: Epic<RootAction, RootAction, RootState, Dependencies> = (
  action$,
  state$,
  dependencies
) =>
  action$.pipe(
    filter(someSlice.actions.someAction.match),
    // Transform, orchestrate use cases, handle errors
    mergeMap(() => of(someSlice.actions.nextAction()))
  );
action$
Observable<RootAction>
Stream of all dispatched actions
state$
StateObservable<RootState>
Observable of current Redux state
dependencies
Dependencies
Injected services (loader, parser, decryptor, fs)

Root epic

createRootEpic

Combines all individual epics into a single root epic for the Redux store.
import { combineEpics, Epic } from "redux-observable";
import { BehaviorSubject } from "rxjs";
import { mergeMap } from "rxjs/operators";
import { RootState } from "../store/root-reducer";
import { Dependencies } from "../services";
import * as epics from ".";

export function createRootEpic() {
  const epicsArray = Object.values({ ...epics });

  const epic$ = new BehaviorSubject(combineEpics(...epicsArray));

  const rootEpic: Epic<any, any, RootState, Dependencies> = (
    action$,
    state$,
    deps
  ) => epic$.pipe(mergeMap((epic) => epic(action$, state$, deps)));
  return rootEpic;
}

Playlist epics

addPlaylistEpic

Listens for addPlaylist actions and automatically fetches levels for the new playlist. Triggers on: playlistsSlice.actions.addPlaylist Dispatches: playlistsSlice.actions.fetchPlaylistLevels
export const addPlaylistEpic: Epic<
  RootAction,
  RootAction,
  RootState,
  Dependencies
> = (action$, state$) =>
  action$.pipe(
    filter(playlistsSlice.actions.addPlaylist.match),
    map((action) => action.payload),
    filter(({ id }) => Boolean(state$.value.playlists.playlists[id])),
    mergeMap(({ id }) =>
      of(
        playlistsSlice.actions.fetchPlaylistLevels({
          playlistID: id,
        })
      )
    )
  );

fetchPlaylistLevelsEpic

Fetches and parses master playlist to extract available quality levels and tracks. Triggers on: playlistsSlice.actions.fetchPlaylistLevels Dispatches:
  • playlistsSlice.actions.fetchPlaylistLevelsSuccess
  • playlistsSlice.actions.fetchPlaylistLevelsFailed
  • levelsSlice.actions.add
export const fetchPlaylistLevelsEpic: Epic<
  RootAction,
  RootAction,
  RootState,
  Dependencies
> = (action$, store$, { loader, parser }) =>
  action$.pipe(
    filter(playlistsSlice.actions.fetchPlaylistLevels.match),
    map((action) => action.payload.playlistID),
    map((playlistID) => store$.value.playlists.playlists[playlistID]!),
    mergeMap(({ uri, id }) =>
      from(
        getLevelsFactory(loader, parser)(uri, store$.value.config.fetchAttempts)
      ).pipe(
        map((levels) => ({
          levels,
          playlistID: id,
          ok: true,
        })),
        catchError(() =>
          of({
            playlistID: id,
            levels: [],
            ok: false,
          })
        )
      )
    ),
    mergeMap(({ playlistID, levels, ok }) => {
      if (!ok || levels.length === 0) {
        return of(
          playlistsSlice.actions.fetchPlaylistLevelsFailed({
            playlistID,
          })
        );
      }
      return of(
        playlistsSlice.actions.fetchPlaylistLevelsSuccess({
          playlistID,
        }),
        levelsSlice.actions.add({
          levels,
        })
      );
    })
  );

removePlaylistEpic

Removes a playlist and cleans up associated levels and jobs. Triggers on: playlistsSlice.actions.removePlaylist

Download job epics

downloadJobEpic

Orchestrates the complete download workflow: creates bucket, downloads fragments, decrypts, and writes to storage. Triggers on: jobsSlice.actions.download Dispatches:
  • jobsSlice.actions.incDownloadStatus (for each fragment)
  • jobsSlice.actions.downloadFailed (on error)
export const downloadJobEpic: Epic<
  RootAction,
  RootAction,
  RootState,
  Dependencies
> = (action$, store$, { fs, loader, decryptor }) =>
  action$.pipe(
    filter(jobsSlice.actions.download.match),
    map((action) => action.payload.jobId),
    mergeMap((jobId) => {
      const job = store$.value.jobs.jobs[jobId];
      if (!job) {
        return EMPTY;
      }
      const { videoFragments, audioFragments } = job;
      const fragments = videoFragments.concat(
        audioFragments.map((fragment) => ({
          ...fragment,
          index: fragment.index + videoFragments.length,
        }))
      );
      return from(
        createBucketFactory(fs)(
          jobId,
          videoFragments.length,
          audioFragments.length
        ).then(() => ({
          fragments,
          jobId,
        }))
      );
    }),
    mergeMap(({ fragments, jobId }) =>
      from(fragments).pipe(
        mergeMap(
          (fragment) =>
            from(
              downloadSingleFactory(loader)(
                fragment,
                store$.value.config.fetchAttempts
              ).then((data) => ({
                fragment,
                data,
                jobId,
              }))
            ),
          store$.value.config.concurrency
        ),
        mergeMap(({ data, fragment, jobId }) =>
          decryptSingleFragmentFactory(loader, decryptor)(
            fragment.key,
            data,
            store$.value.config.fetchAttempts
          ).then((data) => ({
            fragment,
            data,
            jobId,
          }))
        ),
        mergeMap(({ data, jobId, fragment }) =>
          writeToBucketFactory(fs)(jobId, fragment.index, data).then(() => ({
            jobId,
          }))
        ),
        mergeMap(({ jobId }) =>
          of(
            jobsSlice.actions.incDownloadStatus({
              jobId,
            })
          )
        ),
        takeUntil(
          action$
            .pipe(filter(jobsSlice.actions.cancel.match))
            .pipe(filter((action) => action.payload.jobId === jobId))
        ),
        catchError((error: unknown) =>
          of(
            jobsSlice.actions.downloadFailed({
              jobId,
              message:
                (error as Error)?.message ||
                "Download failed during fragment processing",
            })
          )
        )
      )
    )
  );
This epic respects the concurrency setting from config to control how many fragments download simultaneously.

addDownloadJobEpic

Creates a new download job with fragment details. Triggers on: Custom action for adding download jobs

saveAsJobEpic

Saves a completed download job to the user’s file system. Triggers on: jobsSlice.actions.saveAs Dispatches: jobsSlice.actions.saveAsSuccess

deleteJobEpic

Deletes a job and cleans up associated bucket storage. Triggers on: jobsSlice.actions.delete Dispatches: jobsSlice.actions.deleteSuccess

cancelJobDeleteJobEpic

Cancels an in-progress download and deletes the job. Triggers on: jobsSlice.actions.cancel

downloadQueueEpic

Manages a queue of download jobs, ensuring only maxActiveDownloads run concurrently. Triggers on: jobsSlice.actions.queue

incDownloadStatusEpic

Updates download progress and triggers completion when all fragments are done. Triggers on: jobsSlice.actions.incDownloadStatus Dispatches: jobsSlice.actions.finishDownload (when complete)

Subtitle epics

downloadSubtitleEpic

Downloads a subtitle track and saves it as a WebVTT file. Triggers on: Custom subtitle download action Dispatches: Success/failure actions

Inspection epics

inspectLevelEpic

Inspects a quality level to detect encryption and fragment details. Triggers on: levelInspectionsSlice.actions.inspect Dispatches:
  • levelInspectionsSlice.actions.inspectSuccess
  • levelInspectionsSlice.actions.inspectFailed
export const inspectLevelEpic: Epic<
  RootAction,
  RootAction,
  RootState,
  Dependencies
> = (action$, state$, { loader, parser }) =>
  action$.pipe(
    filter(levelInspectionsSlice.actions.inspect.match),
    map((action) => action.payload.levelId),
    mergeMap((levelId) => {
      const level = state$.value.levels.levels[levelId];
      if (!level) {
        return of(
          levelInspectionsSlice.actions.inspectFailed({
            levelId,
            message: "Level not found",
          })
        );
      }

      return from(
        inspectLevelEncryptionFactory(loader, parser)(
          level,
          state$.value.config.fetchAttempts,
          {
            baseUri: level.playlistID,
          }
        )
      ).pipe(
        map((inspection) =>
          levelInspectionsSlice.actions.inspectSuccess({ inspection })
        ),
        catchError((error: unknown) =>
          of(
            levelInspectionsSlice.actions.inspectFailed({
              levelId,
              message:
                (error as Error)?.message ||
                "Unable to inspect encryption for level",
            })
          )
        )
      );
    })
  );

Initialization epic

onInit

Runs initialization logic when the store starts. Triggers on: "init/start" action Dispatches: "init/done" action

Complete epic list

All epics exported from the controllers module:
  • downloadJobEpic: Download all fragments for a job
  • addDownloadJobEpic: Add new download job
  • saveAsJobEpic: Save completed job to file
  • incDownloadStatusEpic: Update download progress
  • downloadQueueEpic: Manage download queue
  • fetchPlaylistLevelsEpic: Fetch playlist quality levels
  • addPlaylistEpic: Add playlist and trigger fetch
  • deleteJobEpic: Delete job and cleanup
  • onInit: Initialize store
  • cancelJobDeleteJobEpic: Cancel and delete job
  • downloadSubtitleEpic: Download subtitle track
  • inspectLevelEpic: Inspect level encryption
  • removePlaylistEpic: Remove playlist and cleanup

Error handling

Epics use RxJS operators for error handling:
.pipe(
  mergeMap((payload) => from(useCase(payload))),
  catchError((error: unknown) =>
    of(someSlice.actions.failed({
      message: (error as Error)?.message || 'Operation failed'
    }))
  )
)
Errors are caught and converted to failed actions that update the store’s error state.

Build docs developers (and LLMs) love