Skip to content

Instantly share code, notes, and snippets.

@rbuckton
Last active June 26, 2021 20:58
Show Gist options
  • Save rbuckton/256c4e929f4a097e2c16 to your computer and use it in GitHub Desktop.
Save rbuckton/256c4e929f4a097e2c16 to your computer and use it in GitHub Desktop.
External Cancellation of Promises with CancellationTokenSource

Cancellation Signals

Introduction

The direction I've taken around Promise cancellation, given the API surface of the ES6 Promise, has been to use a different object to handle cancellation. This approach is similar to the Cooperative Cancelation model used in .NET. There are several reasons I'm using this approach:

  • Separates the concerns of cancellation (CTS/CancellationToken) from asynchronous completion (Promise)
  • Distinguishes the relationship between the producer and consumer of cancellation and completion signals:
    • A function that performs an asynchronous operation produces the completion signal, while the caller receives the signal.
    • The caller produces the cancellation signal, while the function that performs the asynchronous operation receives the signal.
  • A Promise guarantees all continuations are asynchronous, and will be scheduled in a later turn. Cancellation signals must be completed synchronously, lest the signal is scheduled after the Promise resolves.
  • The producer of a cancellation signal maintains sole control over the ability to signal cancellation. Consumers can merely listen for the signal.
  • Cancellation signals only propagate up the chain to a Promise producer who intentionally listens for the signal.
  • Cancellation signals cannot be sent by downstream consumers of a Promise API that does not expose its cancellation source.

CancellationTokenSource

A CancellationTokenSource is an object capable of producing a synchronous cancellation signal to registered listeners.

class CancellationTokenSource {
  /**
   * Creates a new CancellationTokenSource, optionally linked to one or more upstream CancellationTokens.
   */
  constructor(linkedTokens: CancellationToken[]);
  
  /**
   * Gets the CancellationToken linked to this CancellationTokenSource.
   */
  token: CancellationToken;
  
  /**
   * Sends the cancellation signal to the linked CancellationToken.
   * @param reason An optional reason for cancellation (default: new Error("Operation canceled."))
   */
  cancel(reason?: any): void;
  
  /**
   * Sends the cancellation signal to the linked CancellationToken after a timeout has elapsed.
   * @param delay The timeout to wait before sending the cancellation signal.
   * @param reason An optional reason for cancellation (default: new Error("Operation canceled."))
   */
  cancelAfter(delay: number, reason?: any): void;
  
  /**
   * Closes the CancellationTokenSource, releasing resources and preventing possible future cancellation.
   */
  close(): void;
}

CancellationToken

A CancellationToken is an object capable of consuming a cancellation signal.

// NOTE: this class cannot be created using a constructor. Instead you can use CancellationTokenSource#token or CancellationToken.default.
class CancellationToken {

  /**
    * Gets a CancellationToken that can never be canceled.
    */
  static default: CancellationToken;

  /**
    * Gets a value indicating whether the token was canceled.
    */
  canceled: boolean;
  
  /**
    * Gets the reason a CancellationTokenSource was canceled, if available.
    */
  reason: any;
  
  /**
    * Throws the reason from the linked CancellationTokenSource if the linked source was canceled.
    */
  throwIfCanceled(): void;
  
  /**
    * Registers a callback that is executed when the linked CancellationTokenSource sends its cancellation signal.
    * @param callback The callback that will be executed along with the reason for cancellation.
    */
  register(callback: (reason: any) => void): CancellationRegistration;
}

interface CancellationRegistration {
  /**
   * Unregisters a callback previously added using CancellationToken#register.
   */
  unregister(): void;
}

Examples

The following examples demonstrate some of the key concepts of the CancellationTokenSource and CancellationToken:

Promise Producer, Cancellation Consumer

The fetchAsync method below produces a Promise, and can consume a cancellation signal:

public fetchAsync(url, cancellationToken = CancellationToken.default) {
  return new Promise((resolve, reject) => {
    // throw (reject) if cancellation has already been requested.
    cancellationToken.throwIfCanceled();

    // alternatively:
    //
    //   // if cancellation has been requested, reject the Promise with the reason for cancellation.
    //   if (cancellationToken.canceled) {
    //     reject(cancellationToken.reason);
    //     return; 
    //   }
    //
    // or even:
    //
    //   // register the reject handler of the Promise to receive the cancellation signal. If the source is already
    //   // canceled, this will call the reject handler immediately.
    //   cancellationToken.register(reject); 
    //   if (cancellationToken.canceled) {
    //     return;
    //   }

    var xhr = new XMLHttpRequest();

    // save a callback to abort the xhr when cancellation is requested
    var oncancel = (reason: any) => {
      // abort the request
      xhr.abort();
      
      // reject the promise
      reject(reason);
    }

    // wait for the remote resource
    xhr.onload = event => {
      // async operation completed, stop waiting for cancellation
      registration.unregister();

      // resolve the promise
      resolve(event);
    }

    xhr.onerror = event => {
      // async operation failed, stop waiting for cancellation
      cancellationToken.unregister();

      // reject the promise
      reject(event);
    }
    
    // register the callback to execut when cancellation is requested
    var registration = cancellationToken.register(oncancel);

    // begin the async operation
    xhr.open('GET', url, /*async*/ true);
    xhr.send(null);
  });
}

Cancellation Producer, Promise Consumer

The fetchConsumer method below can produce a cancellation signal, and consumes a Promise.

function fetchConsumer(url) {
  var source = new CancellationTokenSource();
  source.cancelAfter(1000); // cancel after 1sec.
  return fetchAsync(url, source.token);
}

Promise Consumer, Cancellation Consumer

The fetchMiddle function below receives a CancellationToken from its caller, which it can choose to listen to and pass along, but cannot cancel the CancellationTokenSource of its upstream caller. In addition, this function will receive the Promise produced by fetchAsync and can listen to the result, but cannot resolve or reject the Promise of the downstream Promise producer.

function fetchMiddle(url, cancellationToken = CancellationToken.default) {
  document.querySelector("#loading").style.display = 'block';
  
  // Secondary consumer *can* listen for cancellation...
  var ondone = () => document.querySelector("#loading").style.display = 'none';
  var registration = cancellationToken.register(ondone);
  
  return fetchAsync(url, cancellationToken)
    .then(value => {
      registration.unregister();
      ondone();
      return value;
    }, reason => {
      registration.unregister();
      ondone();
      return Promise.reject(reason);
    })
}

Upstream Promise Consumer

Another benefit to this mechanism for cancellation, is that upstream consumers of a Promise from a library can only affect the canceled state of a downstream Promise producer if the API of the downstream library accepts a CancellationToken argument. Upstream consumers cannot directly affect the state of the downstream Promise. For example, consider a grid that performs UI virtualization:

class Grid {
  constructor(dataUrl) {
    // cancels all network traffic when the Grid is destroyed
    this._cleanupSource = new CancellationTokenSource();
    this._rows = new Array();
    this._dataUrl = dataUrl;
  }
  
  // ...
  
    // somewhat naive, we always fetch the rows in the background so that we can cache them in memory, 
    // even when a new fetch is requested.
  _fetchRows(offset, count) {
    if (this._hasCachedRows(offset, count)) {
      return Promise.resolve();
    }
    
    return fetchAsync(dataUrl + "?offset=" + offset + "&count=" + count, this._cleanupSource.token).then(event => {
      var result = JSON.parse(event.source.responseText);
      for(var i = 0; i < result.length; i++) {
        this._rows[offset + i] = result[i];
      }
    });
  }
  
  // handles the "click" event of a next page button. prevPage would be similar.
  // an external caller can request a new page, but cannot cancel the underlying network operation.
  nextPage() {
    // cancel any previous page change
    if (this._pageSource) {
      this._pageSource.cancel();
    }
    
    // set the current page change, linking it to the cleanup source.
    var pageSource = new CancellationTokenSource(this._cleanupSource.token);
    this._pageSource = pageSource;
    
    var page = this.page + 1;
    var count = this.pageSize;
    var offset = page * count;
    
    // fetch the rows (either from cache or from the remote store)
    return this._fetchRows(offset, count).then(() => {
      // if a new page was requested, stop processing.
      if (pageSource.state === "canceled") {
        return;
      }
      
      pageSource.close();
      this._displayPage(page);
    });
  }
  
  // destroys the Grid, cancel any pending network activity
  destroy() {
    // this both cancels any pending network activity as well as cancels any linked page changes.
    this._cleanupSource.cancel();
  }
}

Reference Implementation

A reference implementation can be found in https://github.com/rbuckton/asyncjs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment