Streams and SSE

Dioxus Fullstack provides an easy way to send and receive streaming data from a server. This can be useful to implement functionality like streaming LLM responses, file downloads, and server-sent-events (SSE).

Unlike websockets which allow two-way communication, streams are unidirectional. In browsers, it's usually impossible to have a streaming input and a streaming output, so you should stick to using streams for things like text/byte responses or file sending.

Streaming Text

Dioxus Fullstack provides the TextStream type to easily send chunks of text between the client and the server. We can use this type as the input or output of a server function:

// The output is a `TextStream`
#[get("/api/test_stream?start")]
async fn text_stream(start: Option<i32>) -> Result<TextStream> {
    let mut count = start.unwrap_or(0);

    // We can create a new text stream with `spawn`
    Ok(TextStream::spawn(move |tx| async move {

        // Send a message with `unbounded_send`
        while tx.unbounded_send(format!("Hello, world! {}", count)).is_ok() {
            count += 1;

            // and then wait a bit
            tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
        }
    }))
}

You can create a new stream with TextStream::spawn which gives you an UnboundedSender object, or from TextStream::new() which takes an existing type that implements the Stream trait

// the `rx` here implements `Stream` which can be used in `new()`
let (tx, rx) = futures::channel::mpsc::unbounded();

tokio::spawn(async move {
    let mut count = start.unwrap_or(0);
    loop {
        let message = format!("Hello, world! {}", count);
        if tx.unbounded_send(message).is_err() {
            break;
        }

        count += 1;
        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
    }
});

Ok(Streaming::new(rx))

Streaming Bytes

The TextStream type is a superset of the ByteStream type. To send raw bytes between the client and the server, simply use ByteStream in the same way as TextStream, but with the Bytes type as input:

#[post("/api/upload_as_bytestream")]
async fn upload_as_bytestream(mut stream: ByteStream) -> Result<()> {
    while let Some(chunk) = stream.next().await {
        // ... handle chunk
    }

    Ok(())
}

Note that in this example, we are consuming the byte stream using .next(). Streams in Dioxus implement the standard Stream trait, which has a number of useful extensions.

The Generic Streaming<T, E> type

Both the TextStream and ByteStream types are implemented as specific variations of the generic Streaming<T, E> type. Under the hood, both stream types are simply streams of Vec<u8>. The ByteStream type wraps the incoming bytes in the Bytes type while TextStream ensures they're valid utf-8 text.

You can use any encoding provided it imlpements the Encoding trait.

pub trait Encoding {
    fn content_type() -> &'static str;
    fn stream_content_type() -> &'static str;
    fn to_bytes(data: impl Serialize) -> Option<Bytes>;
    fn from_bytes<O: DeserializeOwned>(bytes: Bytes) -> Option<O>;
}

Dioxus provides a number of built-in encodings:

  • JsonEncoding: String-encoded JSON text
  • CborEncoding: Binary-encoded data in the CBOR format
  • PostcardEncoding: A binary encoding built on Postcard meant for use in no_std apps
  • MsgPackEncoding: A compact binary encoding in a "JSON but small" format

As each element in the stream arrives, it will be appropriately chunked and then deserialized using the encoding's from_bytes and to_bytes implementations.

This means we can stream arbitrary data - even custom structures!

#[derive(Serialize, Deserialize, Debug)]
struct Dog {
    name: String,
    age: u8,
}

/// A custom `Streaming<T, E>` endpoint that streams JSON-encoded `Dog` structs to the client.
///
/// Dioxus provides the `JsonEncoding` type which can be used to encode and decode JSON data.
#[get("/api/json_stream")]
async fn json_stream() -> Result<Streaming<Dog, JsonEncoding>> {
    Ok(Streaming::spawn(|tx| async move {
        for count in 0..10 {
            let dog = Dog {
                name: format!("Dog {}", count),
                age: (count % 10) as u8,
            };

            if tx.unbounded_send(dog).is_err() {
                break;
            }

            tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
        }
    }))
}

File Streams

The final stream type, FileStream, is a special stream type not built on Streaming<T, E>. File streams use platform-native optimizations to efficiently stream files without buffering the entire file into memory.

We can create the FileStream object in a variety of ways. For example, we can use from_path to efficiently stream arbitrary files from the server's file system:

/// This endpoint uses `file!()` to return the current file's `PathBuf`
#[get("/api/download_as_filestream")]
async fn download_as_filestream() -> Result<FileStream> {
    Ok(FileStream::from_path(file!()).await?)
}

The FileStream type can be created from the FileData type from dioxus-html. This makes it easy to add streaming file uploads to your app from the HTML <input /> and <form /> elements:

// Our client component calls the endpoint with `file.into()`
fn app() -> Element {
    rsx! {
        h3 { "Upload as FileUpload" }
        div {
            ondragover: move |evt| evt.prevent_default(),
            ondrop: move |evt| async move {
                evt.prevent_default();
                for file in files {
                    _ = upload_file_as_filestream(file.into()).await;
                }
            },
            "Drop files here"
        }
    }
}

// Our server endpoint accepts `FileStream`
#[post("/api/upload_as_file_stream")]
async fn upload_file_as_filestream(mut upload: FileStream) -> Result<()> {
    // ...
}

The FileStream type also sets extra headers like Content-Dispostion and X-Content-Size which give the server endpoint more information to efficiently handle the upload.