samod 0.8.0

A rust library for managing automerge documents, compatible with the js automerge-repo library
docs.rs failed to build samod-0.8.0
Please check the build logs for more information.
See Builds for ideas on how to fix a failed build, or Metadata for how to configure docs.rs builds.
If you believe this is docs.rs' fault, open an issue.
Visit the last successful build: samod-0.4.0

Samod

samod is a library for building collaborative applications which work offlne and don't require servers (though servers certainly can be useful). This is achieved by representing data as automerge documents. samod is wire compatible with the automerge-repo JavaScript library.

What does all that mean?

samod helps you manage automerge "documents", which are hierarchical data structures composed of maps, lists, text, and primitive values - a little like JSON. Every change you make to a document is recorded and you can move back and forth through the history of a document - it's a bit like Git for JSON. samod takes care of storing changes for you and synchronizing them with connected peers. The interesting part is that given this very detailed history which we never discard, we can merge documents with changes which were made concurrently. This means that we can build applications which allow multiple users to edit the same document without having to have all changes go through a server.

How it all works

The library is structured around a [Repo], which talks to a [Storage] instance and to which you can connect to other peers using Repo::connect. Once you have a [Repo] you can create documents using [Repo::create], or look up existing docuements using [Repo::find]. In either case you will get back a [DocHandle] which you can use to interact with the document.

Typically then, your workflow will look like this:

  • Initialize a Repo at application startup, passing it a [RuntimeHandle] implementation and [Storage] implementation
  • Whenever you have connections available (maybe you are connecting to a sync server, maybe you are receiving peer-to-peer connections) you call [Repo::connect] to add connections to the repo.
  • Create DocHandles using Repo::create and look up existing documents using Repo::find
  • Modify documents using DocHandle::with_document

Let's walk through each of those steps.

Initializing a [Repo]

To initialize a [Repo] you call [Repo::builder()] to obtain a [RepoBuilder] which you use to configure the repo before calling [RepoBuilder::load()] to actually load the repository. For example:

# #[cfg(feature="tokio")]
# tokio_test::block_on(async {
let repo = samod::Repo::builder(tokio::runtime::Handle::current())
    .with_storage(samod::storage::InMemoryStorage::new())
    .load()
    .await;
})

The first argument to builder is an implementation of [RuntimeHandle]. Default implementations are provided for tokio and gio which can be conveniently used via [Repo::build_tokio] and [Repo::build_gio] respectively. The [RuntimeHandle] trait is straightforward to implement if you want to use some other async runtime.

By default samod uses an in-memory storage implementation. This is great for prototyping but in most cases you do actually want to persist data somewhere. In this case you'll need an implementation of [Storage] to pass to [RepoBuilder::with_storage]

It is possible to use [Storage] and [AnnouncePolicy] implementations which do not produce Send futures. In this case you will also need a runtime which can spawn non-Send futures. See the runtimes section for more details.

Connecting to peers

Connections are managed via dialers and acceptors (listeners).

  • A dialer actively connects to a remote endpoint and automatically reconnects with exponential backoff. Create one with [Repo::dial()], which takes a [BackoffConfig] and an Arc<dyn Dialer>
  • Repo::find will wait for any dialers which are in the process of connection to either establish a connection, or start retrying, before marking a document as unavailable. This means that as long as you add the [Dialer] before calling Repo::find you won't have to coordinate the timing of your dialers and document lookups.
  • An acceptor passively accepts inbound connections. Create one with [Repo::make_acceptor()], then feed accepted transports via [AcceptorHandle::accept()].
// In this case we use the built in ChannelDialer which is useful for simple tests
// more useful implementations wrap actual network transports
# use samod::{BackoffConfig, Repo};
# use samod::transport::channel::ChannelDialer;
# use std::sync::Arc;
# async fn doit() {
let bob: Repo = todo!();
let alice: Repo = todo!();
let url = url::Url::parse("samod-channel://my-channel").unwrap();
let acceptor = bob.make_acceptor(url).unwrap();
let channel_dialer = ChannelDialer::new(acceptor);
let dialer_handle = alice.dial(BackoffConfig::default(), Arc::new(channel_dialer)).unwrap();
// Wait for the first successful connection + handshake
let peer_info = dialer_handle.established().await.unwrap();
# }

Dialer handles

The [DialerHandle] returned by [Repo::dial()] can be used to:

  • Wait for the first successful connection with [DialerHandle::established()]
  • Observe lifecycle events (connect, disconnect, retry, failure) with [DialerHandle::events()]
  • Check if connected with [DialerHandle::is_connected()]
  • Close the dialer with [DialerHandle::close()]

Acceptor handles

The [AcceptorHandle] returned by [Repo::make_acceptor()] can be used to:

  • Accept inbound connections with [AcceptorHandle::accept()]
  • Observe lifecycle events with [AcceptorHandle::events()]
  • Check connection count with [AcceptorHandle::connection_count()]
  • Close the acceptor with [AcceptorHandle::close()]

Managing Documents

Once you have a [Repo] you can use it to manage [DocHandle]s. A [DocHandle] represents an [automerge] document which the [Repo] is managing. "managing" here means a few things:

  • Any changes made to the document using [DocHandle::with_document] will be persisted to storage and synchronized with connected peers (subject to the [AnnouncePolicy]).
  • Any changes received from connected peers will be applied to the document and made visible to the application. You can listen for these changes using [DocHandle::changes].

To create a new document you use [Repo::create] which will return once the document has been persisted to storage. To look up an existing document you use [Repo::find]. This will first look in storage, then if the document is not found in storage it will request the document from all connected peers (again subject to the [AnnouncePolicy]). If any peer has the document the future returned by [Repo::find] will resolve once we have synchronized with at least one remote peer which has the document.

You can make changes to a document using [DocHandle::with_document].

Announce Policies

By default, samod will announce all the [DocHandle]s it is synchronizing to all connected peers and will also send requests to any connected peers when you call [Repo::find]. This is often not what you want. To customize this logic you pass an implementation of [AnnouncePolicy] to [RepoBuilder::with_announce_policy]. Note that AnnouncePolicy is implemented for Fn(&DocumentId) -> bool so you can just pass a closure if you want.

# #[cfg(feature="tokio")]
# tokio_test::block_on(async{
let authorized_peer = samod::PeerId::from("alice");
let repo = samod::Repo::build_tokio().with_announce_policy(move |_doc_id, peer_id| {
   // Only announce documents to alice
   &peer_id == &authorized_peer
}).load().await;
# });

Runtimes

[RuntimeHandle] is a trait which is intended to abstract over the various runtimes available in the rust ecosystem. The most common runtime is tokio. tokio is a work-stealing runtime which means that the futures spawned on it must be [Send], so that they can be moved between threads. This means that [RuntimeHandle::spawn] requires [Send] futures. This in turn means that the futures returned by the [Storage] and [AnnouncePolicy] traits are also [Send] so that they can be spawned onto the [RuntimeHandle].

In many cases though, you may have a runtime which doesn't require [Send] futures and you may have storage and announce policy implementations which cannot produce [Send] futures. This would often be the case in single threaded runtimes for example. In these cases you can instead implement [LocalRuntimeHandle], which doesn't require [Send] futures and then you implement [LocalStorage] and [LocalAnnouncePolicy] traits for your storage and announce policy implementations. You configure all these things via the [RepoBuilder] struct. Once you've configured the storage and announce policy implementations to use local variants you can then create a local [Repo] using [RepoBuilder::load_local].

Concurrency

Typically samod will be managing many documents. One for each [DocHandle] you retrieve via [Repo::create] or [Repo::find] but also one for any sync messages received about a particular document from remote peers (e.g. a sync server would have no [DocHandle]s open but would still be running many document processes). By default document tasks will be handled on the async runtime provided to the [RepoBuilder] but this can be undesirable. Document operations can be compute intensive and so responsiveness may benefit from running them on a separate thread pool. This is the purpose of the [RepoBuilder::with_concurrency] method, which allows you to configure how document operations are processed. If you want to use the threadpool approach you will need to enable the threadpool feature.

Why not just Automerge?

automerge is a low level library. It provides routines for manipulating documents in memory and an abstract data sync protocol. It does not actually hook this up to any kind of network or storage. Most of the work involved in doing this plumbing is straightforward, but if every application does it themselves, we don't end up with interoperable applications. In particular we don't end up with fungible sync servers. One of the core goals of this library is to allow application authors to be agnostic as to where the user synchronises data by implementing a generic network and storage layer which all applications can use.

Example

Here's a somewhat fully featured example of using samod to manage a todo list across two repos (representing two different devices):

# #[cfg(feature="tokio")]
# tokio_test::block_on(async {
use automerge::{ReadDoc, transaction::{Transactable as _}};
use futures::StreamExt as _;
use samod::{BackoffConfig, transport::channel::ChannelDialer};
use std::sync::Arc;

# let _ = tracing_subscriber::fmt().try_init();

// Create two repos, representing two different devices
let alice = samod::Repo::build_tokio().load().await;
let bob = samod::Repo::build_tokio().load().await;

// Create an initial skeleton for our todo list on alice
let mut initial_doc = automerge::Automerge::new();
initial_doc.transact::<_, _, automerge::AutomergeError>(|tx| {
    let _todos = tx.put_object(automerge::ROOT, "todos", automerge::ObjType::List)?;
    Ok(())
}).unwrap();

// Now create a `samod::DocHandle` on alice using `Repo::create`
let alice_handle = alice.create(initial_doc).await.unwrap();

// Bob registers an acceptor; alice dials bob via an in-process ChannelDialer
let url = url::Url::parse("channel://alice-to-bob").unwrap();
let acceptor = bob.make_acceptor(url).unwrap();
let channel_dialer = ChannelDialer::new(acceptor);
let dialer_handle = alice.dial(BackoffConfig::default(), Arc::new(channel_dialer)).unwrap();

// Wait for alice to be connected to bob
dialer_handle.established().await.unwrap();

// Bob can now fetch alice's document
let bob_handle = bob.find(alice_handle.document_id().clone()).await.unwrap().unwrap();

// Make a change on bob's side
bob_handle.with_document(|doc| {
    doc.transact(|tx| {
       let todos = tx.get(automerge::ROOT, "todos").unwrap()
          .expect("todos should exist").1;
       tx.insert(todos, 0, "Buy milk")?;
       Ok::<_, automerge::AutomergeError>(())
    }).unwrap();
});

// Wait for the change to be received on alice's side
alice_handle.changes().next().await.unwrap();

// Alice's document now reflects bob's change
alice_handle.with_document(|doc| {
    let todos = doc.get(automerge::ROOT, "todos").unwrap()
        .expect("todos should exist").1;
    let item = doc.get(todos, 0).unwrap().expect("item should exist").0;
    let automerge::Value::Scalar(val) = item else {
        panic!("item should be a scalar");
    };
    let automerge::ScalarValue::Str(s) = val.as_ref() else {
        panic!("item should be a string");
    };
    assert_eq!(s, "Buy milk");
    Ok::<_, automerge::AutomergeError>(())
}).unwrap();
# });