DEV Community

Cover image for 5-Operators and Composition in Our System - Building a Reactive System in TypeScript
Michael De Abreu
Michael De Abreu

Posted on

5-Operators and Composition in Our System - Building a Reactive System in TypeScript

To recap, we have built and optimized a reactive system that can store variables and react to updates and event emissions, update other states, and perform side-effect calls. The journey has been incredible, and I hope this has helped you to understand more about how reactive systems work under the hood, but all things must come to an end. This will be the final post of the series, where we will explore how to create operators for our system. This will also be a long post since everything is related to operators, and I didn't want to break the flow.

Defining an operator

If you are familiar with other reactive libraries, you have probably seen these functions that allow you to transform values in a data stream. These functions are called operators. An operator should be a function that takes a transformation function and returns another function that gets the internal stream, applies the transformation, and returns a new stream.

export type OperatorFn<T, R = T> = (source: Subscribable<T>) => Event<R>;
Enter fullscreen mode Exit fullscreen mode

We could also apply the transformation directly without involving the stream system, but then the operators would be synchronous, and that wouldn't be much fun, would it? By keeping the stream, we can use it to create async operators easily; more on that later.

Coding our first operator

If I mention transforming a data stream, I bet you are thinking about the map method of an array, and you'd be right. The first operator we will create is a map operator. Let's start by defining the test:

describe("map", () => {
  it("should apply map correctly", () => {
    const source = event<number>();

    const mapped1 = map((x: number) => x * 2)(source);
    const mapped2 = map((x: number) => `${x}`)(mapped1);

    const result = mock();

    mapped2.subscribe(result);

    source(2);

    expect(result).toHaveBeenCalledWith("4");
  });
});
Enter fullscreen mode Exit fullscreen mode

We want the operator to be composable. This first callback will create the signature of the OperatorFn that we defined, something we can reuse to call the operator and create a new stream.

Let's start by creating the map function in its own file:

export function map<T, R>(fn: (value: T) => R): OperatorFn<T, R> {
}
Enter fullscreen mode Exit fullscreen mode

Since we want to return an operator function, we need something like this:

return (source) => {
  const mappedEvent = event<R>();
  return mappedEvent;
};
Enter fullscreen mode Exit fullscreen mode

We return a function that creates a new event and returns it, but it's not transforming the output yet. To do that, we need to use the fn callback we're passing to the operator.

source.subscribe((value) => mappedEvent(fn(value)));
Enter fullscreen mode Exit fullscreen mode

With this, we subscribe to the input source of the operator and emit transformed values into the new event. Finally, this is what our first operator looks like:

export function map<T, R>(fn: (value: T) => R): OperatorFn<T, R> {
  return (source) => {
    const mappedEvent = event<R>();
    source.subscribe((value) => mappedEvent(fn(value)));
    return mappedEvent;
  };
}
Enter fullscreen mode Exit fullscreen mode

This operator emits a new value each time the input is updated. However, another useful method the Array prototype has is filter, which allows us to get a new array with only the values we want, based on a predicate function.

The filter operator

To create our second operator, we'll start again by writing the tests first:

describe("filter", () => {
  it("should apply filter correctly", () => {
    const source = event<number>();
    const filtered = filter((x) => x > 5)(source);

    const result = mock();

    filtered.subscribe(result);

    source(3);
    expect(result).not.toHaveBeenCalled();
    source(7);
    expect(result).toHaveBeenCalledWith(7);
  });
});
Enter fullscreen mode Exit fullscreen mode

This operator is very similar to map, so I'll skip the step-by-step explanation and show the complete implementation instead.

export function filter<T>(predicate: (value: T) => boolean): OperatorFn<T> {
  return (source) => {
    const filteredEvent = event<T>();
    source.subscribe((value) => {
      if (predicate(value)) {
        filteredEvent(value);
      }
    });
    return filteredEvent;
  };
}
Enter fullscreen mode Exit fullscreen mode

The only difference between the operators lies in their behavior. Instead of transforming a value, this operator evaluates whether the value should be emitted.

With this, we now have two operators, map and filter, and the result of each can be used as the source for another. We can leverage that to create a way to chain operators more naturally.

Composing with pipe

Composition is a fundamental part of any reactive system, and operator chaining is the most natural way to express transformations and logic over time. Instead of creating one-off streams and wiring them manually, we can define a clear and readable sequence of operations, just like we do with array methods. I think it would be better if we could chain them naturally, like this:

const result = source.pipe(
  map((x) => x * 2),
  filter((x) => x > 5)
);
Enter fullscreen mode Exit fullscreen mode

This might seem like magic, but all we really need to do is apply the operators one after another, like we've been doing manually. We'll just take advantage of the fact that the output of one stream is the input of the next. It's not magic, but it's going to look a lot cleaner.

function pipe<R>(...operators: OperatorFn<any, any>[]): Event<R> {
  const pipeEvent = event<unknown>();
  subscribe(pipeEvent);
  const resultEvent = operators.reduce(
    (acc, op) => op(acc),
    pipeEvent,
  ) as Event<R>;
  return resultEvent;
}
Enter fullscreen mode Exit fullscreen mode

Thanks to the composable API we’ve built, we can chain all the operators into a resulting event simply by creating a new event that subscribes to the source and using it as the initial value in the reducer. That result is what we return from the pipe function.

To integrate this new method in each creator, we just need to add it. By default, it will always return an event, but you could tweak it per creator to make it return the same type. I'll leave that part up to you, and instead, I'll show how you can integrate it into event; similarly, you can add it to other creators.

export function event<T = void>(): Event<T> {
  // Same code we had; we only need to update the return.
  return Object.assign(emit, { subscribe, toState, pipe }) as Event<T>;
}

Enter fullscreen mode Exit fullscreen mode

With that, any state, event, or computed we create will have a pipe method that we can use to chain our operators. We've got two basic operators, but let’s build a few more to show what this system is capable of.

Other operators

Debounce

This operator won’t emit a new value until a specified amount of time has passed.

export function debounce<T>(delay: number): OperatorFn<T> {
  return (source) => {
    const debouncedEvent = event<T>();
    let timeoutId: ReturnType<typeof setTimeout> | undefined;

    source.subscribe((value) => {
      if (timeoutId != null) {
        clearTimeout(timeoutId);
      }

      timeoutId = setTimeout(() => {
        debouncedEvent(value);
        timeoutId = void 0;
      }, delay);
    });

    return debouncedEvent;
  };
}

Enter fullscreen mode Exit fullscreen mode

Merge

This operator merges two sources into a single stream:

export function merge<S, O>(other: Subscribable<O>): OperatorFn<S, S | O> {
  return (source) => {
    const mergedEvent = event<S | O>();
    source.subscribe(mergedEvent);
    other.subscribe(mergedEvent);
    return mergedEvent;
  };
}
Enter fullscreen mode Exit fullscreen mode

MapAsync

This one is my personal favorite. It allows you to map your source into a promise and receive its result asynchronously.

export function mapAsync<T, R, E = unknown>(
  promiseFn: (value: T) => Promise<R>,
): OperatorFn<T, { data?: T; loading?: boolean; error?: E; }> {
  return (source) => {
    const out = event<{ data?: T; loading?: boolean; error?: E; }>();
    let currentFetchId = 0;
    source.subscribe(async (value) => {
      currentFetchId++;
      const fetchId = currentFetchId;
      out({ loading: true });
      try {
        const result = await promiseFn(value);
        if (fetchId === currentFetchId) {
          out({ data: result });
        }
      } catch (error: any) {
        if (fetchId === currentFetchId) {
          out({ error });
        }
      }
    });
    return out;
  };
}
Enter fullscreen mode Exit fullscreen mode

So simple, yet so useful, and incredibly powerful.

One more thing...

If you’ve been following along, you’ve probably noticed TypeScript complaining about the pipe types. We did not define any types for it, so of course, it will complain. However, typing a pipe method is complex because it requires a level of recursion that can even push TS to its limits. This is the most complex part of this series, and it's not even about code but about the shape of the types themselves. What I did is similar to what other libraries like rxjs have done: overloads. A lot of overloads.

PipeFn types
export interface PipeFn<T, S extends Pipeable<any>> {
  (): Event<R>;
  <A>(op1: OperatorFn<NonNullable<T>, A>): Event<R>;
  <A, B>(op1: OperatorFn<NonNullable<T>, A>, op2: OperatorFn<A, B>): Event<R>;
  <A, B, C>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
  ): Event<R>;
  <A, B, C, D>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
  ): Event<R>;
  <A, B, C, D, E>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
  ): Event<R>;
  <A, B, C, D, E, F>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
  ): Event<R>;
  <A, B, C, D, E, F, G>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
    op7: OperatorFn<F, G>,
  ): Event<R>;
  <A, B, C, D, E, F, G, H>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
    op7: OperatorFn<F, G>,
    op8: OperatorFn<G, H>,
  ): Event<R>;
  <A, B, C, D, E, F, G, H, I>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
    op7: OperatorFn<F, G>,
    op8: OperatorFn<G, H>,
    op9: OperatorFn<H, I>,
  ): Event<R>;
  <A, B, C, D, E, F, G, H, I, J>(
    op1: OperatorFn<NonNullable<T>, A>,
    op2: OperatorFn<A, B>,
    op3: OperatorFn<B, C>,
    op4: OperatorFn<C, D>,
    op5: OperatorFn<D, E>,
    op6: OperatorFn<E, F>,
    op7: OperatorFn<F, G>,
    op8: OperatorFn<G, H>,
    op9: OperatorFn<H, I>,
    op10: OperatorFn<I, J>,
  ): Event<R>;
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  <R>(...ops: OperatorFn<any, any>[]): Event<R>;
}

export interface Pipeable<T> {
  pipe: PipeFn<T, this>;
}
Enter fullscreen mode Exit fullscreen mode

That's all, folks!

And with that, we've reached the end of the series. We started with something as simple as a counter, and we ended up building a complete reactive system, with state, events, operators, async transformations and functional composition.

There's still plenty we could explore to continue the journey, including more advanced topics like batch updates, dev tools, dependency graph, performance optimizations, and integrations with other libraries. But that would go far beyond the scope and simplicity of what we've built so far.

Thank you so much for joining me. I hope you’ve enjoyed the process as much as I did. Now that you know how this works, you might even come up with better ideas than mine, and that would be awesome. I can only hope you'll share them. Until next time.

Top comments (0)