Broadcasting Messages

Many WebSocket applications broadcast messages to many clients at once, so in this chapter we'll learn how to do this asynchronously. Previously, we had to use an external dependency bus, but using the asynchronous approach, this is no longer necessary.

The example we build in this chapter will simply echo messages back to the client as well as broadcasting any messages typed into the server console to all connected clients. Furthermore, we'll broadcast a message whenever a client connects too.

Initialising the Project

As before, we need a new Humphrey WebSocket application. We don't need to handle the disconnection event, so we won't add a handler for it.

use humphrey_ws::async_app::{AsyncStream, AsyncWebsocketApp};
use humphrey_ws::message::Message;

use std::sync::Arc;

fn main() {
    let websocket_app: AsyncWebsocketApp<()> = AsyncWebsocketApp::new()
        .with_connect_handler(connect_handler)
        .with_message_handler(message_handler);

    websocket_app.run();
}

fn connect_handler(stream: AsyncStream, _: Arc<()>) {
    // TODO
}

fn message_handler(stream: AsyncStream, message: Message, _: Arc<()>) {
    stream.send(message);
}

Broadcasting Messages from Event Handlers

Our connection handler needs to broadcast a message to all connected clients when a new client connects. This message will also be sent to the new client. The AsyncStream provides functionality for this, but as we'll see later, this is not the only way to broadcast messages.

Let's add this to our connection handler.

// --snip--

fn connect_handler(stream: AsyncStream, _: Arc<()>) {
    let message = Message::new(format!("Welcome, {}!", stream.peer_addr()));
    stream.broadcast(message);
}

// --snip--

It's as simple as that! If we test this with websocat and connect from a few terminals, you'll see that each message is correctly echoed back to the client, and new connections are announced to everyone.

Sending Messages without an Event

Broadcasts can also be triggered without an event. This is useful for sending messages to all connected clients from a separate thread, or for responding to non-WebSocket events. In this example, we'll broadcast the standard input to all connected clients.

To do this, we'll use an AsyncSender, which allows us to send messages and broadcasts without waiting for an event. Let's get a new async sender from the app, and send it to a separate thread for handling user input.

// --snip--

use humphrey_ws::async_app::AsyncSender;

use std::thread::spawn;

fn main() {
    let websocket_app: AsyncWebsocketApp<()> = AsyncWebsocketApp::new()
        .with_connect_handler(connect_handler)
        .with_message_handler(message_handler);

    let sender = websocket_app.sender();
    spawn(move || user_input(sender));

    websocket_app.run();
}

fn user_input(sender: AsyncSender) {
  // TODO
}

// --snip--

You can create as many senders as you want from the app, but they can only be created from the main thread and must be created before the application is run.

Using the Sender

Now that we have a sender, we can use it to send messages to all connected clients. Let's use the same code from our synchronous example, but slightly modify it to work with a sender instead of the bus.

// --snip

use std::io::BufRead;

// --snip--

fn user_input(sender: AsyncSender) {
    let stdin = std::io::stdin();
    let handle = stdin.lock();

    for line in handle.lines().flatten() {
        sender.broadcast(Message::new(line));
    }
}

// --snip--

If we run this code now, every line we type in the server console will be broadcast to all connected clients.

Full Example

The full source code for this example should look like this.

use humphrey_ws::async_app::{AsyncStream, AsyncWebsocketApp, AsyncSender};
use humphrey_ws::message::Message;

use std::io::BufRead;
use std::sync::Arc;
use std::thread::spawn;

fn main() {
    let websocket_app: AsyncWebsocketApp<()> = AsyncWebsocketApp::new()
        .with_connect_handler(connect_handler)
        .with_message_handler(message_handler);

    let sender = websocket_app.sender();
    spawn(move || user_input(sender));

    websocket_app.run();
}

fn user_input(sender: AsyncSender) {
    let stdin = std::io::stdin();
    let handle = stdin.lock();

    for line in handle.lines().flatten() {
        sender.broadcast(Message::new(line));
    }
}

fn connect_handler(stream: AsyncStream, _: Arc<()>) {
    let message = Message::new(format!("Welcome, {}!", stream.peer_addr()));
    stream.broadcast(message);
}

fn message_handler(stream: AsyncStream, message: Message, _: Arc<()>) {
    stream.send(message);
}

Conclusion

In this chapter, we've learnt how to broadcast messages asynchronously. It's a lot easier than the synchronous approach, and also more flexible. In the next chapter, we'll learn how to integrate an asynchronous WebSocket application with an existing Humphrey application.