Streams are a powerful mechanism for managing large collections in LiveView without keeping the entire dataset in server memory. They provide a fire-and-forget approach to rendering dynamic lists, where items are sent to the client and immediately freed from the socket state.
Overview
Traditional assigns require keeping all data in memory on the server. With streams, you can:
- Reduce memory usage: Items are freed immediately after rendering
- Improve performance: Only send changes to the client, not entire collections
- Handle large datasets: Efficiently manage thousands of items
- Implement pagination: Use limits for infinite scrolling patterns
Basic Usage
Creating a Stream
Define a stream in your mount or handle callback:
def mount(_params, _session, socket) do
songs = Music.list_songs()
{:ok, stream(socket, :songs, songs)}
end
Rendering Streams
Streams are accessed via @streams and return tuples of {dom_id, item}:
<table>
<tbody id="songs" phx-update="stream">
<tr
:for={{dom_id, song} <- @streams.songs}
id={dom_id}
>
<td>{song.title}</td>
<td>{song.duration}</td>
</tr>
</tbody>
</table>
The parent container must have:
- A
phx-update="stream" attribute
- A unique DOM id
Each stream item must use the generated dom_id in its id attribute.
Stream Operations
Inserting Items
Add individual items with stream_insert/4:
def handle_event("add_song", %{"song" => song_params}, socket) do
song = Music.create_song!(song_params)
{:noreply, stream_insert(socket, :songs, song)}
end
Prepend items with at: 0:
stream_insert(socket, :songs, song, at: 0)
Updating Items
Update an existing item by inserting it again with the same ID:
def handle_event("update_song", %{"id" => id}, socket) do
song = Music.get_song!(id)
updated_song = Music.update_song!(song, %{plays: song.plays + 1})
{:noreply, stream_insert(socket, :songs, updated_song)}
end
When updating, the item remains in its current position. To move and update, delete then insert:socket
|> stream_delete(:songs, song)
|> stream_insert(:songs, updated_song, at: 0)
Deleting Items
Remove items with stream_delete/3:
def handle_event("delete_song", %{"id" => id}, socket) do
song = Music.get_song!(id)
Music.delete_song!(song)
{:noreply, stream_delete(socket, :songs, song)}
end
Delete by DOM id:
stream_delete_by_dom_id(socket, :songs, "songs-123")
Resetting Streams
Replace the entire stream:
# Clear the stream
stream(socket, :songs, [], reset: true)
# Replace with new items
stream(socket, :songs, new_songs, reset: true)
Advanced Features
Limiting Streams
Limit the number of items displayed for infinite scrolling:
# Keep last 10 items (appending)
stream(socket, :songs, songs, at: -1, limit: -10)
# Keep first 10 items (prepending)
stream(socket, :songs, songs, at: 0, limit: 10)
Limits must be passed to stream_insert/4 as well:
stream_insert(socket, :songs, song, limit: -10)
Limits are not enforced on the initial mount (static render). Only load the desired number of items initially.
Custom DOM IDs
Configure custom ID generation with stream_configure/3:
def mount(socket) do
socket = stream_configure(socket, :songs, dom_id: &"song-#{&1.id}")
{:ok, stream(socket, :songs, songs)}
end
Configure streams before inserting items, typically in mount/1 for LiveComponents.
Handling Empty States
Use CSS :only-child selector for empty messages:
<tbody id="songs" phx-update="stream">
<tr id="songs-empty" class="only:table-row hidden">
<td colspan="2">No songs found</td>
</tr>
<tr :for={{dom_id, song} <- @streams.songs} id={dom_id}>
<td>{song.title}</td>
<td>{song.duration}</td>
</tr>
</tbody>
Memory Usage
Streams are freed immediately after rendering:
# Source: lib/phoenix_live_view/live_stream.ex
defstruct name: nil,
dom_id: nil,
ref: nil,
inserts: [], # Cleared after render
deletes: [], # Cleared after render
reset?: false,
consumable?: false
The prune/1 function clears pending operations:
def prune(%LiveStream{} = stream) do
%{stream | inserts: [], deletes: [], reset?: false}
end
Wire Protocol
Streams send minimal diffs to the client:
// Insert operation
{"stream": [ref, [["songs-1", -1, null, false]], []]}
// Delete operation
{"stream": [ref, [], ["songs-1"]]}
// Reset operation
{"stream": [ref, inserts, deletes, true]}
Common Patterns
def mount(_params, _session, socket) do
{:ok,
socket
|> assign(page: 1, per_page: 20)
|> stream(:songs, Music.list_songs(page: 1, per_page: 20))}
end
def handle_event("load_more", _, socket) do
page = socket.assigns.page + 1
songs = Music.list_songs(page: page, per_page: socket.assigns.per_page)
{:noreply,
socket
|> assign(page: page)
|> stream(:songs, songs, at: -1)}
end
Real-time Updates
def handle_info({:song_created, song}, socket) do
{:noreply, stream_insert(socket, :songs, song, at: 0)}
end
def handle_info({:song_updated, song}, socket) do
{:noreply, stream_insert(socket, :songs, song)}
end
def handle_info({:song_deleted, song}, socket) do
{:noreply, stream_delete(socket, :songs, song)}
end
Batch Operations
def handle_event("bulk_delete", %{"ids" => ids}, socket) do
socket =
Enum.reduce(ids, socket, fn id, acc ->
song = Music.get_song!(id)
Music.delete_song!(song)
stream_delete(acc, :songs, song)
end)
{:noreply, socket}
end
Best Practices
Do:
- Use streams for lists with 50+ items
- Free memory by avoiding assigns for large collections
- Implement limits for infinite scrolling
- Configure DOM IDs once in
mount/1
Don’t:
- Use
Enum.map/2 or similar on @streams (use regular assigns instead)
- Alter generated DOM IDs (breaks client tracking)
- Forget
phx-update="stream" on parent containers
- Re-configure streams after items are inserted
Comparison: Streams vs Assigns
| Feature | Assigns | Streams |
|---|
| Memory | Kept in socket | Freed after render |
| Updates | Re-render entire list | Surgical DOM updates |
| Large lists | Can cause memory issues | Designed for scale |
| Sorting | Easy with Enum.sort/2 | Requires reset |
| Filtering | Easy with Enum.filter/2 | Requires reset |
| Best for | Less than 100 items, frequent sorting | 100+ items, append/delete |
Implementation Details
From lib/phoenix_live_view/live_stream.ex:14-39:
def new(name, ref, items, opts) when is_list(opts) do
dom_prefix = to_string(name)
dom_id = Keyword.get_lazy(opts, :dom_id, fn -> &default_id(dom_prefix, &1) end)
if not is_function(dom_id, 1) do
raise ArgumentError,
"stream :dom_id must return a function which accepts each item"
end
items_list =
for item <- items, reduce: [] do
items -> [{dom_id.(item), -1, item, opts[:limit], opts[:update_only]} | items]
end
%LiveStream{
ref: ref,
name: name,
dom_id: dom_id,
inserts: items_list,
deletes: [],
reset?: false
}
end
Each insert is a 5-tuple: {dom_id, at, item, limit, update_only}