Skip to content

Instantly share code, notes, and snippets.

@BossBele
Created March 16, 2025 21:46
Show Gist options
  • Save BossBele/d80070e832687410e406a9c58bf313e8 to your computer and use it in GitHub Desktop.
Save BossBele/d80070e832687410e406a9c58bf313e8 to your computer and use it in GitHub Desktop.
Implementation of the RxStorage interface using React Native's AsyncStorage
import { BehaviorSubject, Observable } from 'rxjs';
import AsyncStorage from '@react-native-async-storage/async-storage';
import type {
BulkWriteRow,
EventBulk,
RxDocumentData,
RxStorage,
RxStorageBulkWriteResponse,
RxStorageChangeEvent,
RxStorageCountResult,
RxStorageInstance,
RxStorageInstanceCreationParams,
RxStorageQueryResult,
PreparedQuery,
} from 'rxdb';
import { ensureNotFalsy, getQueryMatcher, getPrimaryFieldOfPrimaryKey } from 'rxdb';
import { filter, map, shareReplay } from 'rxjs/operators';
/**
* Internals interface for the AsyncStorage implementation
*/
interface AsyncStorageInternals {
/**
* BehaviorSubject containing all documents
* in the storage as a map by primary key
*/
documents$: BehaviorSubject<Map<string, RxDocumentData<any>>>;
/**
* Observable for the change events
*/
changes$: Observable<EventBulk<RxStorageChangeEvent<any>, string>>;
/**
* Prefix used for this storage instance
*/
storagePrefix: string;
}
/**
* Options for creating a AsyncStorage instance
*/
interface AsyncStorageInstanceCreationOptions {
/**
* Optional prefix to use for the AsyncStorage keys
* This allows you to use multiple databases in the same app
*/
prefix?: string;
}
/**
* Our checkpoint type is just the document id
*/
type AsyncStorageCheckpoint = string;
/**
* List of standard meta fields in RxDB documents
*/
const RX_META_FIELDS = ['_rev', '_attachments', '_deleted', '_meta'];
/**
* Implementation of the RxStorage interface using React Native's AsyncStorage
*/
export class AsyncStorageRxStorage implements RxStorage<AsyncStorageInternals, AsyncStorageInstanceCreationOptions> {
name: string = 'async-storage';
rxdbVersion: string;
constructor(version: string) {
this.rxdbVersion = version;
}
async createStorageInstance<RxDocType>(
params: RxStorageInstanceCreationParams<RxDocType, AsyncStorageInstanceCreationOptions>
): Promise<RxStorageInstance<RxDocType, AsyncStorageInternals, AsyncStorageInstanceCreationOptions, AsyncStorageCheckpoint>> {
const { databaseName, collectionName, schema, options = {} } = params;
// Create storage prefix for the keys in AsyncStorage
const prefix = options.prefix || '';
const storagePrefix = `${prefix}${databaseName}-${collectionName}-`;
// Create a BehaviorSubject to hold all documents in memory
const documents$ = new BehaviorSubject<Map<string, RxDocumentData<RxDocType>>>(new Map());
// Create a subject for change events
const changeEventSubject = new BehaviorSubject<EventBulk<RxStorageChangeEvent<RxDocType>, AsyncStorageCheckpoint> | null>(null);
const changes$ = changeEventSubject.pipe(
filter(ev => !!ev),
map(ev => ensureNotFalsy(ev)),
shareReplay(1)
);
// Load initial data from AsyncStorage
await this.loadInitialData(storagePrefix, documents$);
// Instance internals
const internals: AsyncStorageInternals = {
documents$,
changes$,
storagePrefix
};
// Primary key field of the collection
const primaryPath = getPrimaryFieldOfPrimaryKey(schema.primaryKey);
// Create and return the storage instance
const instance: RxStorageInstance<RxDocType, AsyncStorageInternals, AsyncStorageInstanceCreationOptions, AsyncStorageCheckpoint> = {
databaseName,
collectionName,
schema,
internals,
options,
// Write multiple documents
async bulkWrite(
documentWrites: BulkWriteRow<RxDocType>[],
context: string
): Promise<RxStorageBulkWriteResponse<RxDocType>> {
const response: RxStorageBulkWriteResponse<RxDocType> = {
error: []
};
// Get current documents map
const documentsMap = internals.documents$.getValue();
const eventBulk: RxStorageChangeEvent<RxDocType>[] = [];
// Handle each write
for (const writeRow of documentWrites) {
const docId = (writeRow.document as any)[primaryPath];
try {
const existingDoc = documentsMap.get(docId);
// Check for conflicts
if (existingDoc && writeRow.previous && existingDoc._rev !== writeRow.previous._rev) {
throw new Error(`Conflict: Document with id ${docId} has been modified`);
}
// Store document in AsyncStorage
await AsyncStorage.setItem(
`${internals.storagePrefix}${docId}`,
JSON.stringify(writeRow.document)
);
// Update in-memory map
documentsMap.set(docId, writeRow.document);
// Add to success
// response.success[docId] = writeRow.document;
// Create change event
eventBulk.push({
documentId: docId,
operation: existingDoc ? 'UPDATE' : 'INSERT',
documentData: writeRow.document,
previousDocumentData: writeRow.previous || undefined
});
} catch (err) {
console.error('Error writing document:', err);
// Add to error
(response.error as any)[docId] = {
status: 409,
isError: true,
documentId: docId,
writeRow,
};
}
}
// Update documents$ subject
internals.documents$.next(documentsMap);
// Emit change events if any
if (eventBulk.length > 0) {
changeEventSubject.next({
id: Date.now().toString(),
events: eventBulk,
context,
checkpoint: documentWrites[documentWrites.length - 1].document._rev
});
}
return response;
},
// Find documents by their IDs
async findDocumentsById(
ids: string[],
withDeleted: boolean
): Promise<RxDocumentData<RxDocType>[]> {
const documentsMap = internals.documents$.getValue();
const docs: RxDocumentData<RxDocType>[] = [];
for (const id of ids) {
const doc = documentsMap.get(id);
if (doc && (withDeleted || !doc._deleted)) {
docs.push(doc);
}
}
return docs;
},
// Query documents
async query(
preparedQuery: PreparedQuery<RxDocType>
): Promise<RxStorageQueryResult<RxDocType>> {
const { query } = preparedQuery;
const documentsMap = internals.documents$.getValue();
// Convert map to array of documents
const allDocs = Array.from(documentsMap.values())
.filter(doc => !doc._deleted);
// Create matcher function based on the query
const matcher = getQueryMatcher(schema, query);
// Apply selector
let filteredDocs = allDocs.filter(doc => matcher(doc));
// Sort documents
if (query.sort && query.sort.length > 0) {
filteredDocs = sortDocuments(filteredDocs, query.sort);
}
// Apply skip and limit
const skip = query.skip || 0;
const limit = query.limit;
let resultDocs = filteredDocs.slice(skip);
if (typeof limit === 'number') {
resultDocs = resultDocs.slice(0, limit);
}
return {
documents: resultDocs
};
},
// Count documents
async count(
preparedQuery: PreparedQuery<RxDocType>
): Promise<RxStorageCountResult> {
const { query } = preparedQuery;
const documentsMap = internals.documents$.getValue();
// Convert map to array of non-deleted documents
const allDocs = Array.from(documentsMap.values())
.filter(doc => !doc._deleted);
// Create matcher function based on the query
const matcher = getQueryMatcher(schema, query);
// Count documents that match the selector
const count = allDocs.filter(doc => matcher(doc)).length;
return {
count,
mode: 'fast'
};
},
// Get attachment data
async getAttachmentData(
documentId: string,
attachmentId: string,
digest: string
): Promise<string> {
const attachmentKey = `${internals.storagePrefix}${documentId}_${attachmentId}`;
// Try to retrieve attachment from AsyncStorage
const data = await AsyncStorage.getItem(attachmentKey);
if (!data) {
throw new Error(`Attachment ${attachmentId} not found for document ${documentId}`);
}
return data;
},
// Optional: Get changed documents since a checkpoint
async getChangedDocumentsSince(
limit: number,
checkpoint?: AsyncStorageCheckpoint
): Promise<{
documents: RxDocumentData<RxDocType>[];
checkpoint: AsyncStorageCheckpoint;
}> {
const documentsMap = internals.documents$.getValue();
const docs = Array.from(documentsMap.values());
// If no checkpoint, return the first 'limit' documents
if (!checkpoint) {
const limitedDocs = docs.slice(0, limit);
if (limitedDocs.length === 0) {
return {
documents: [],
checkpoint: ''
};
}
return {
documents: limitedDocs,
checkpoint: limitedDocs[limitedDocs.length - 1]._id
};
}
// Find the index of the document with the checkpoint id
const checkpointIndex = docs.findIndex(doc => doc._id === checkpoint);
if (checkpointIndex === -1) {
// If checkpoint not found, start from beginning
const limitedDocs = docs.slice(0, limit);
return {
documents: limitedDocs,
checkpoint: limitedDocs.length ? limitedDocs[limitedDocs.length - 1]._id : ''
};
}
// Get documents after the checkpoint
const docsAfterCheckpoint = docs.slice(checkpointIndex + 1, checkpointIndex + 1 + limit);
return {
documents: docsAfterCheckpoint,
checkpoint: docsAfterCheckpoint.length ?
docsAfterCheckpoint[docsAfterCheckpoint.length - 1]._id :
checkpoint
};
},
// Observable of change events
changeStream() {
return internals.changes$;
},
// Cleanup deleted documents
async cleanup(minimumDeletedTime: number): Promise<boolean> {
const documentsMap = internals.documents$.getValue();
const now = Date.now();
const toDelete: string[] = [];
// Find deleted documents older than minimumDeletedTime
for (const [id, doc] of documentsMap.entries()) {
if (doc._deleted && doc._meta?.deletedAt) {
const deletedAt = doc._meta.deletedAt;
if (now - deletedAt >= minimumDeletedTime) {
toDelete.push(id);
}
}
}
// Remove the documents
for (const id of toDelete) {
await AsyncStorage.removeItem(`${internals.storagePrefix}${id}`);
documentsMap.delete(id);
}
// Update documents$ subject if documents were deleted
if (toDelete.length > 0) {
internals.documents$.next(documentsMap);
}
// Return true if all deleted documents were cleaned up
return true;
},
// Close the storage instance
async close(): Promise<void> {
// In AsyncStorage, we don't need to close connections
// Just remove all listeners
documents$.complete();
changeEventSubject.complete();
},
// Remove the collection's data
async remove(): Promise<void> {
const documentsMap = internals.documents$.getValue();
const allKeys = Array.from(documentsMap.keys()).map(id => `${internals.storagePrefix}${id}`);
// Remove all documents from AsyncStorage
await AsyncStorage.multiRemove(allKeys);
// Clear the in-memory map
documentsMap.clear();
internals.documents$.next(documentsMap);
}
};
return instance;
}
/**
* Load all documents from AsyncStorage for the given prefix
*/
private async loadInitialData<RxDocType>(
prefix: string,
documents$: BehaviorSubject<Map<string, RxDocumentData<RxDocType>>>
): Promise<void> {
try {
// Get all keys
const allKeys = await AsyncStorage.getAllKeys();
// Filter keys that belong to this collection
const collectionKeys = allKeys.filter(key => key.startsWith(prefix));
// If no documents, return
if (collectionKeys.length === 0) {
return;
}
// Get all documents
const keyValues = await AsyncStorage.multiGet(collectionKeys);
const documentsMap = new Map<string, RxDocumentData<RxDocType>>();
// Parse each document and add to the map
for (const [key, value] of keyValues) {
if (value) {
const docId = key.substring(prefix.length);
// Skip attachment keys (they contain underscore after docId)
if (docId.includes('_')) {
continue;
}
try {
const document = JSON.parse(value) as RxDocumentData<RxDocType>;
documentsMap.set(docId, document);
} catch (err) {
console.error(`Failed to parse document with id ${docId}:`, err);
}
}
}
// Update the BehaviorSubject
documents$.next(documentsMap);
} catch (err) {
console.error('Error loading initial data from AsyncStorage:', err);
throw err;
}
}
}
/**
* Sort documents based on the query sort parameters
*/
function sortDocuments<RxDocType>(
docs: RxDocumentData<RxDocType>[],
sortParams: { [propPath: string]: 'asc' | 'desc' }[]
): RxDocumentData<RxDocType>[] {
return [...docs].sort((a, b) => {
for (const sort of sortParams) {
const [propPath, direction] = Object.entries(sort)[0];
// Get values from documents
const aValue = getValueByPath(a, propPath);
const bValue = getValueByPath(b, propPath);
// Compare values
if (aValue !== bValue) {
const ascFactor = direction === 'asc' ? 1 : -1;
if (aValue === undefined) return 1 * ascFactor;
if (bValue === undefined) return -1 * ascFactor;
return aValue < bValue ? -1 * ascFactor : 1 * ascFactor;
}
}
return 0;
});
}
/**
* Get nested property value using a path string (e.g., "user.address.city")
*/
function getValueByPath<T>(obj: T, path: string): any {
const parts = path.split('.');
let current: any = obj;
for (const part of parts) {
if (current === null || current === undefined) {
return undefined;
}
current = current[part];
}
return current;
}
/**
* Create and return a new AsyncStorageRxStorage instance
*/
export function getRxStorageAsyncStorage(version: string): RxStorage<AsyncStorageInternals, AsyncStorageInstanceCreationOptions> {
return new AsyncStorageRxStorage(version);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment