Skip to content

Instantly share code, notes, and snippets.

@afruzan
Last active April 23, 2023 15:44
Show Gist options
  • Save afruzan/a705d7d6264550849a7ee980b7d231fc to your computer and use it in GitHub Desktop.
Save afruzan/a705d7d6264550849a7ee980b7d231fc to your computer and use it in GitHub Desktop.
typescript signalr client hub base class that provides auth token and rxjs observable pattern.
import { HubConnection, HubConnectionBuilder, IHttpConnectionOptions, LogLevel, MessageHeaders } from '@microsoft/signalr';
import { Observable } from 'rxjs';
import { Subject } from 'rxjs';
import { map } from 'rxjs/operators';
import { AuthService } from '../auth.service';
import { RetryCanceledError, RetryPolicy } from './retry-policy';
export abstract class SignalRHubBase {
constructor(private authService: AuthService) {
}
public abstract get hubUrl(): string;
private get hubConnectionOptions(): IHttpConnectionOptions {
// NOTE: The auth token must be updated for each request. So using headers option is not true.
// Also for websockets and some other protocols signalr cannot set auth headers.
// See https://docs.microsoft.com/en-us/aspnet/core/signalr/authn-and-authz?view=aspnetcore-5.0#bearer-token-authentication
//const headers: MessageHeaders = {
// Authorization: 'Bearer ' + this.authService.accessToken,
// 'Content-Type': 'application/json',
// Accept: 'application/json, text/plain, */*'
//};
return {
/*headers,*/
accessTokenFactory: () => {
return this.authService.refreshLogin()
.pipe(map(_ => this.authService.accessToken)).toPromise();
}
};
// NOTE:
// The access token function you provide is called before every HTTP request made by SignalR. If you need to renew the token in order to keep the connection active (because it may expire during the connection), do so from within this function and return the updated token.
// In standard web APIs, bearer tokens are sent in an HTTP header. However, SignalR is unable to set these headers in browsers when using some transports. When using WebSockets and Server - Sent Events, the token is transmitted as a query string parameter.
}
private hubConnection: HubConnection;
private hubConnectionStartAction: RetryPolicy<void>;
private retryDelays = [0, 10000, 30000, 120000, 120000, 360000, 360000, 360000, 360000, 720000, 720000, 720000];
private _isConnectionStarted: boolean = false;
public get isConnectionStarted(): boolean {
return this._isConnectionStarted;
}
public set isConnectionStarted(value: boolean) {
this._isConnectionStarted = value;
}
private _onConnectionStarted: Subject<void> = new Subject<void>();
public get onConnectionStarted(): Subject<void> {
return this._onConnectionStarted;
}
private _onConnectionClosed: Subject<void> = new Subject<void>();
public get onConnectionClosed(): Subject<void> {
return this._onConnectionClosed;
}
public async startConnection(): Promise<void> {
if (this.hubConnection) {
console.error('stop hub connection before starting a new again. ' + this.hubUrl);
return Promise.reject('stop hub connection before starting a new again.');
}
this.hubConnection = new HubConnectionBuilder()
.withUrl(this.hubUrl, this.hubConnectionOptions)
.configureLogging(LogLevel.Information)
// automatic reconnect (start failures need to be handled manually)
.withAutomaticReconnect(this.retryDelays)
.build();
this.hubConnection.onclose(() => {
console.log('hub connection closed ' + this.hubUrl);
this.onConnectionClosed.next();
});
this.hubConnectionStartAction = new RetryPolicy(
() => this.hubConnection.start(),
this.retryDelays);
try {
await this.hubConnectionStartAction.run();
console.log('hub connection started ' + this.hubUrl);
this._isConnectionStarted = true;
this.onConnectionStarted.next();
} catch (error) {
if (error instanceof RetryCanceledError) {
console.warn('retry for starting hub connection canceled ' + this.hubUrl, error.error);
}
else {
console.error('error while starting hub connection ' + this.hubUrl, error);
this.hubConnectionStartAction?.cancel();
}
this.hubConnection = null;
this.hubConnectionStartAction = null;
throw error;
}
}
// stop cause completing all current observables.
public stopConnection(): Promise<void> {
if (this.hubConnection) {
this.hubConnectionStartAction?.cancel();
return this.hubConnection
.stop()
.then(() => {
this.hubConnection = null;
this.hubConnectionStartAction = null;
console.log('hub connection stoped ' + this.hubUrl);
})
.catch(error => {
console.error('error while stopping hub connection ' + this.hubUrl, error);
throw error;
});
}
return Promise.resolve();
}
protected createSubjectFactory<T, K extends keyof T>(methodName: string, paramNamesInOrder: K[], onSubjectCreated: (observable: Observable<T>) => void) {
if (this._isConnectionStarted) {
// it is started
console.debug(methodName + ': it is started')
onSubjectCreated(this.createSubject(methodName, paramNamesInOrder));
}
this.onConnectionStarted.subscribe(() => {
// new connection is started. previous subject is completed before; so create new:
console.debug(methodName + ': new connection is started')
onSubjectCreated(this.createSubject(methodName, paramNamesInOrder));
});
}
protected createSubject<T, K extends keyof T>(methodName: string, paramNamesInOrder: K[]): Observable<T> {
const subject: Subject<T> = new Subject<T>();
this.hubConnection.on(methodName, (...args: any[]) => {
// raise the event
var obj = {} as T;
for (var i = 0; i < args.length; i++) {
obj[paramNamesInOrder[i]] = args[i];
}
subject.next(obj);
});
this.hubConnection.onclose((err?: Error) => {
if (err) {
// an error occurred
// (The basic concept of RxJS is that any error or complete-call will basically "kill" a stream.)
subject.error(err);
} else {
// no more events to be sent
// (in order avoid mem. leaks, as calling complete on the source stream will remove the references to all the subscribed observers)
subject.complete();
}
});
return subject.asObservable();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment