Last active
March 4, 2024 09:38
-
-
Save osaxma/141d6be2b522f8bfe8673af14eb20bd1 to your computer and use it in GitHub Desktop.
a workaround for keeping haura_connect subscriptions alive when using JWT
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import 'dart:async'; | |
import 'package:meta/meta.dart'; | |
import 'package:hasura_connect/hasura_connect.dart'; | |
import 'package:http/http.dart' as http; | |
// problem: | |
// when subscribing to`HasuraConnect.subscription`, a websocket connection is created on the first subscription | |
// call. The websocket connection is created only once with the initial subscription and the provided TokenInterceptor | |
// or headers. When the token expires during an active subscription, HasuraConnect doesn't stop the subscription, or try | |
// to reconnect with the latest token in in the TokenInterceptor nor does it throw an error. It'll keep calling `onRequest` | |
// method indefinitely for some reason as it was observed in the TokenInterceptor. | |
// solution: | |
// When all active subscription are closed (ie Snapshot.close()), HasuraConnect will terminate the websocket connection | |
// The following implementation does exactly that and then reconnect again to create a websocket connection with the latest | |
// token interceptor. ALl this while keeping subscription alive. Here's how: | |
// - For each subscription, return a SnapshotX object and keep a reference to the object (currentSnapshots). | |
// - The SnapshotX object is fed by a source Snapshot that's updatable. | |
// - listen to an auth stream provided by the user. | |
// - when the auth stream sends an event, all source snapshots are closed (hence HasuraConnect closes websocket connection) | |
// - recreate all source snapshots and update each subscription (SnapshotX) with a new source snapshot | |
// - when recreating the new snapshots, hasura will use the latest Token available in the TokenInterceptor | |
// and recreate a new websocket connection. | |
// | |
// this way the user stream/subscription stays alive and all this process happens in the background as shown in the code below. | |
/// usage: | |
/// ```dart | |
/// // import this file | |
/// import '../hasura_connect_ex.dart'; | |
/// | |
/// final hasuraClient = HasuraConnectX( | |
/// url, | |
/// // use your own TokenInterceptor per hasura_connect docs | |
/// interceptors: [TokenInterceptor(tokenProvider: tokenProvider)], | |
/// // you can pass firebase stream onAuthStateChanged here, or any stream that trigers when token is updated. | |
/// onAuthStateChanged: tokenProvider(), | |
/// ); | |
/// | |
/// | |
/// final snapshot = await hasuraClient.subscriptionX(docSubscription); // this will return SnapshotX | |
/// | |
/// // you can use it the same way as hasuraClient.subscription: | |
/// // map and listen | |
/// snapshot.map((event) {....} )listen((event) { ... }); | |
/// | |
/// // change variables | |
/// snapshot.changeVariables(variables); | |
/// | |
/// // close snapshot | |
/// snapshot.close(); | |
/// ``` | |
// an extended HasuraConnect that updates subscription based on an auth stream. | |
// make sure you've an TokenInterceptor in place in order for this implementation to work. | |
class HasuraConnectX extends HasuraConnect { | |
HasuraConnectX( | |
String url, { | |
@required Stream onAuthStateChanged, | |
int reconnectionAttemp, | |
List<Interceptor> interceptors, | |
Map<String, String> headers, | |
http.Client httpClient, | |
}) : super( | |
url, | |
reconnectionAttemp: reconnectionAttemp, | |
interceptors: interceptors, | |
headers: headers, | |
httpClient: httpClient, | |
) { | |
// skip the first event since it'll not indicate an actual change in the Auth State | |
authStateSubscription = onAuthStateChanged.skip(1).listen((event) { | |
// to avoid unncessary connections/disconnetions | |
// another option is to pause authStateSubscription when currentSnapshots.isEmpty | |
if (currentSnapshots.isNotEmpty) { | |
//print('refreshing ${currentSnapshots.length} snapshot(s) from HasuraConnectX'); | |
refreshSubscriptions(); | |
} | |
}); | |
} | |
// this can be any stream that sends events whenever auth state changed (the evnets are not read) | |
// make sure the TokenInterceptor always has an up to date token because HasuraConnect will use that create a the new | |
// websocket connection | |
StreamSubscription authStateSubscription; | |
// a map for the current snapshots that to be updated when Auth State Changed | |
final Map<String, SnapshotX<Object>> currentSnapshots = {}; | |
// a wrapper around HasuraConnect subscription | |
Future<SnapshotX> subscriptionX(String document, {String key, Map<String, dynamic> variables}) async { | |
final snapshot = await super.subscription(document, key: key, variables: variables); | |
// if the key is not provided, use the one created by hasuraClient.subscription | |
key ??= snapshot.query.key; | |
currentSnapshots[key] = SnapshotX( | |
sourceSnapshot: snapshot, | |
onClose: () => currentSnapshots.remove(key), | |
onMap: (newSnapshotX) => currentSnapshots[key] = newSnapshotX, | |
); | |
return currentSnapshots[key]; | |
} | |
void refreshSubscriptions() async { | |
// when disconnect is called, hasuraClient will close all the active snapshots. | |
// the disconnection process includes futures + a 300 ms Future.delayed. | |
await disconnect(); | |
_recreateSnapshots(); | |
} | |
// recreate all the snapshots that are still active based on currentSnapshots map | |
void _recreateSnapshots() { | |
// print('recreating connections....'); | |
for (var key in currentSnapshots.keys) { | |
final intermediateSnapshot = currentSnapshots[key]; | |
subscription( | |
intermediateSnapshot.document, | |
key: intermediateSnapshot.key, | |
variables: intermediateSnapshot.variables, | |
).then((newSnapshot) => intermediateSnapshot.updateSourceSnapshot(newSnapshot)); | |
} | |
} | |
@override | |
Future<void> dispose() async { | |
// no need to call disconnect, super.dispose() already does | |
await authStateSubscription?.cancel(); | |
return super.dispose(); | |
} | |
} | |
// note: I tried initially to extend the Snapshot class but that didn't go well since Snapshot has a constructor body | |
// that does its own shenanigans | |
// this class mainly keeps the sourceSnapshot and pipe its event to a stream that can be used by the user. | |
class SnapshotX<T> { | |
// keep dynamic without generic types otherwise it'll throw an error when updating (because snapshots comes as dynamic) | |
Snapshot sourceSnapshot; | |
final void Function() onClose; | |
final void Function(SnapshotX newSnapshotX) onMap; | |
// controller should be dynamic because when the snapshot is mapped, the Type will change. | |
// rootStream will take care of mapping the stream of the controller into the new types without changing the controller | |
// controller and rootstream are mutable because they are assigned based on two cases: | |
// - in a new snapshot case, will initialize a streamcontroller and pass its stream to rootStream | |
// - in a mapping case, controller and roostream are passed from the previous snapshot (see `map` method) | |
StreamController controller; | |
Stream<T> rootStream; | |
// used to update the snapshot with the same query. | |
final String key; | |
final String document; | |
Map<String, dynamic> variables; | |
SnapshotX({ | |
@required this.sourceSnapshot, | |
@required this.onClose, | |
@required this.onMap, | |
StreamController controller, | |
Stream<T> rootStream, | |
}) : key = sourceSnapshot.query.key, | |
document = sourceSnapshot.query.document, | |
variables = sourceSnapshot.query.variables { | |
// TODO: should convert to a broadcast stream instead? | |
this.controller = controller ?? StreamController(onListen: _pipe, onCancel: _onCancel); | |
this.rootStream = rootStream ?? this.controller.stream; | |
} | |
Future<void> changeVariables(Map<String, dynamic> variables) async { | |
this.variables = variables; | |
// to keep the latest variables up to date on currentSnapshots map | |
onMap(this); | |
// HasuraConnect will take care of changing the variables and updating the sourceSnapshot internally. | |
await sourceSnapshot.changeVariables(variables); | |
} | |
void updateSourceSnapshot(Snapshot snapshot) { | |
// update the source snapshot with the new snapshot | |
sourceSnapshot = snapshot; | |
// cancel the previous subscription | |
_sourceSnapshotSubscription?.cancel(); | |
// pipe the new snapshot to create a new subscription | |
_pipe(); | |
} | |
// a subscription to the source snapshot | |
StreamSubscription _sourceSnapshotSubscription; | |
// to keep track if data changed between connecting and disconnecting | |
dynamic latestEvent; | |
// pipe events from sourceSnapshot to this Snapshot. | |
// the following approach is naive but does the job and avoids closing/pausing the stream or creating a complex one. | |
void _pipe() { | |
_sourceSnapshotSubscription = sourceSnapshot.listen((event) { | |
// check if the last event before disconnecting is still the same after reconnecting | |
// note: (latestEvent == event) always returns false. | |
// comment it out if not desired. | |
if (latestEvent.toString() == event.toString()) return; | |
latestEvent = event; | |
controller.add(event); | |
}); | |
_sourceSnapshotSubscription.onError(controller.addError); | |
} | |
// when a user maps the stream, return a new SnapshotX instance | |
// we return SnapshotX instead of a stream mainly to expose `changeVariables` method | |
SnapshotX<S> map<S>(S Function(T event) convert) { | |
final newSnapshotX = SnapshotX<S>( | |
sourceSnapshot: sourceSnapshot.map<S>(convert), | |
rootStream: rootStream.map(convert), | |
onClose: onClose, | |
onMap: onMap, | |
controller: controller, | |
); | |
// update the map of currentSnapshots with the new mapped instance | |
onMap?.call(newSnapshotX); | |
return newSnapshotX; | |
} | |
// when a user stops listening, cancel the subscription to the source snapshot. | |
void _onCancel() { | |
_sourceSnapshotSubscription?.cancel(); | |
} | |
// the user must close the snapshot after consuming it. | |
void close() { | |
_onCancel(); | |
sourceSnapshot.close(); | |
controller.close(); | |
onClose?.call(); | |
} | |
// listen to the rootStream which has the latest mapping (don't listen to stream controller directly) | |
StreamSubscription<T> listen(void Function(T event) onData, | |
{Function onError, void Function() onDone, bool cancelOnError}) { | |
return rootStream.listen(onData, onError: onError, onDone: onDone, cancelOnError: cancelOnError); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment