Created
March 16, 2025 21:46
-
-
Save BossBele/d80070e832687410e406a9c58bf313e8 to your computer and use it in GitHub Desktop.
Implementation of the RxStorage interface using React Native's AsyncStorage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { BehaviorSubject, Observable } from 'rxjs'; | |
import AsyncStorage from '@react-native-async-storage/async-storage'; | |
import type { | |
BulkWriteRow, | |
EventBulk, | |
RxDocumentData, | |
RxStorage, | |
RxStorageBulkWriteResponse, | |
RxStorageChangeEvent, | |
RxStorageCountResult, | |
RxStorageInstance, | |
RxStorageInstanceCreationParams, | |
RxStorageQueryResult, | |
PreparedQuery, | |
} from 'rxdb'; | |
import { ensureNotFalsy, getQueryMatcher, getPrimaryFieldOfPrimaryKey } from 'rxdb'; | |
import { filter, map, shareReplay } from 'rxjs/operators'; | |
/** | |
* Internals interface for the AsyncStorage implementation | |
*/ | |
interface AsyncStorageInternals { | |
/** | |
* BehaviorSubject containing all documents | |
* in the storage as a map by primary key | |
*/ | |
documents$: BehaviorSubject<Map<string, RxDocumentData<any>>>; | |
/** | |
* Observable for the change events | |
*/ | |
changes$: Observable<EventBulk<RxStorageChangeEvent<any>, string>>; | |
/** | |
* Prefix used for this storage instance | |
*/ | |
storagePrefix: string; | |
} | |
/** | |
* Options for creating a AsyncStorage instance | |
*/ | |
interface AsyncStorageInstanceCreationOptions { | |
/** | |
* Optional prefix to use for the AsyncStorage keys | |
* This allows you to use multiple databases in the same app | |
*/ | |
prefix?: string; | |
} | |
/** | |
* Our checkpoint type is just the document id | |
*/ | |
type AsyncStorageCheckpoint = string; | |
/** | |
* List of standard meta fields in RxDB documents | |
*/ | |
const RX_META_FIELDS = ['_rev', '_attachments', '_deleted', '_meta']; | |
/** | |
* Implementation of the RxStorage interface using React Native's AsyncStorage | |
*/ | |
export class AsyncStorageRxStorage implements RxStorage<AsyncStorageInternals, AsyncStorageInstanceCreationOptions> { | |
name: string = 'async-storage'; | |
rxdbVersion: string; | |
constructor(version: string) { | |
this.rxdbVersion = version; | |
} | |
async createStorageInstance<RxDocType>( | |
params: RxStorageInstanceCreationParams<RxDocType, AsyncStorageInstanceCreationOptions> | |
): Promise<RxStorageInstance<RxDocType, AsyncStorageInternals, AsyncStorageInstanceCreationOptions, AsyncStorageCheckpoint>> { | |
const { databaseName, collectionName, schema, options = {} } = params; | |
// Create storage prefix for the keys in AsyncStorage | |
const prefix = options.prefix || ''; | |
const storagePrefix = `${prefix}${databaseName}-${collectionName}-`; | |
// Create a BehaviorSubject to hold all documents in memory | |
const documents$ = new BehaviorSubject<Map<string, RxDocumentData<RxDocType>>>(new Map()); | |
// Create a subject for change events | |
const changeEventSubject = new BehaviorSubject<EventBulk<RxStorageChangeEvent<RxDocType>, AsyncStorageCheckpoint> | null>(null); | |
const changes$ = changeEventSubject.pipe( | |
filter(ev => !!ev), | |
map(ev => ensureNotFalsy(ev)), | |
shareReplay(1) | |
); | |
// Load initial data from AsyncStorage | |
await this.loadInitialData(storagePrefix, documents$); | |
// Instance internals | |
const internals: AsyncStorageInternals = { | |
documents$, | |
changes$, | |
storagePrefix | |
}; | |
// Primary key field of the collection | |
const primaryPath = getPrimaryFieldOfPrimaryKey(schema.primaryKey); | |
// Create and return the storage instance | |
const instance: RxStorageInstance<RxDocType, AsyncStorageInternals, AsyncStorageInstanceCreationOptions, AsyncStorageCheckpoint> = { | |
databaseName, | |
collectionName, | |
schema, | |
internals, | |
options, | |
// Write multiple documents | |
async bulkWrite( | |
documentWrites: BulkWriteRow<RxDocType>[], | |
context: string | |
): Promise<RxStorageBulkWriteResponse<RxDocType>> { | |
const response: RxStorageBulkWriteResponse<RxDocType> = { | |
error: [] | |
}; | |
// Get current documents map | |
const documentsMap = internals.documents$.getValue(); | |
const eventBulk: RxStorageChangeEvent<RxDocType>[] = []; | |
// Handle each write | |
for (const writeRow of documentWrites) { | |
const docId = (writeRow.document as any)[primaryPath]; | |
try { | |
const existingDoc = documentsMap.get(docId); | |
// Check for conflicts | |
if (existingDoc && writeRow.previous && existingDoc._rev !== writeRow.previous._rev) { | |
throw new Error(`Conflict: Document with id ${docId} has been modified`); | |
} | |
// Store document in AsyncStorage | |
await AsyncStorage.setItem( | |
`${internals.storagePrefix}${docId}`, | |
JSON.stringify(writeRow.document) | |
); | |
// Update in-memory map | |
documentsMap.set(docId, writeRow.document); | |
// Add to success | |
// response.success[docId] = writeRow.document; | |
// Create change event | |
eventBulk.push({ | |
documentId: docId, | |
operation: existingDoc ? 'UPDATE' : 'INSERT', | |
documentData: writeRow.document, | |
previousDocumentData: writeRow.previous || undefined | |
}); | |
} catch (err) { | |
console.error('Error writing document:', err); | |
// Add to error | |
(response.error as any)[docId] = { | |
status: 409, | |
isError: true, | |
documentId: docId, | |
writeRow, | |
}; | |
} | |
} | |
// Update documents$ subject | |
internals.documents$.next(documentsMap); | |
// Emit change events if any | |
if (eventBulk.length > 0) { | |
changeEventSubject.next({ | |
id: Date.now().toString(), | |
events: eventBulk, | |
context, | |
checkpoint: documentWrites[documentWrites.length - 1].document._rev | |
}); | |
} | |
return response; | |
}, | |
// Find documents by their IDs | |
async findDocumentsById( | |
ids: string[], | |
withDeleted: boolean | |
): Promise<RxDocumentData<RxDocType>[]> { | |
const documentsMap = internals.documents$.getValue(); | |
const docs: RxDocumentData<RxDocType>[] = []; | |
for (const id of ids) { | |
const doc = documentsMap.get(id); | |
if (doc && (withDeleted || !doc._deleted)) { | |
docs.push(doc); | |
} | |
} | |
return docs; | |
}, | |
// Query documents | |
async query( | |
preparedQuery: PreparedQuery<RxDocType> | |
): Promise<RxStorageQueryResult<RxDocType>> { | |
const { query } = preparedQuery; | |
const documentsMap = internals.documents$.getValue(); | |
// Convert map to array of documents | |
const allDocs = Array.from(documentsMap.values()) | |
.filter(doc => !doc._deleted); | |
// Create matcher function based on the query | |
const matcher = getQueryMatcher(schema, query); | |
// Apply selector | |
let filteredDocs = allDocs.filter(doc => matcher(doc)); | |
// Sort documents | |
if (query.sort && query.sort.length > 0) { | |
filteredDocs = sortDocuments(filteredDocs, query.sort); | |
} | |
// Apply skip and limit | |
const skip = query.skip || 0; | |
const limit = query.limit; | |
let resultDocs = filteredDocs.slice(skip); | |
if (typeof limit === 'number') { | |
resultDocs = resultDocs.slice(0, limit); | |
} | |
return { | |
documents: resultDocs | |
}; | |
}, | |
// Count documents | |
async count( | |
preparedQuery: PreparedQuery<RxDocType> | |
): Promise<RxStorageCountResult> { | |
const { query } = preparedQuery; | |
const documentsMap = internals.documents$.getValue(); | |
// Convert map to array of non-deleted documents | |
const allDocs = Array.from(documentsMap.values()) | |
.filter(doc => !doc._deleted); | |
// Create matcher function based on the query | |
const matcher = getQueryMatcher(schema, query); | |
// Count documents that match the selector | |
const count = allDocs.filter(doc => matcher(doc)).length; | |
return { | |
count, | |
mode: 'fast' | |
}; | |
}, | |
// Get attachment data | |
async getAttachmentData( | |
documentId: string, | |
attachmentId: string, | |
digest: string | |
): Promise<string> { | |
const attachmentKey = `${internals.storagePrefix}${documentId}_${attachmentId}`; | |
// Try to retrieve attachment from AsyncStorage | |
const data = await AsyncStorage.getItem(attachmentKey); | |
if (!data) { | |
throw new Error(`Attachment ${attachmentId} not found for document ${documentId}`); | |
} | |
return data; | |
}, | |
// Optional: Get changed documents since a checkpoint | |
async getChangedDocumentsSince( | |
limit: number, | |
checkpoint?: AsyncStorageCheckpoint | |
): Promise<{ | |
documents: RxDocumentData<RxDocType>[]; | |
checkpoint: AsyncStorageCheckpoint; | |
}> { | |
const documentsMap = internals.documents$.getValue(); | |
const docs = Array.from(documentsMap.values()); | |
// If no checkpoint, return the first 'limit' documents | |
if (!checkpoint) { | |
const limitedDocs = docs.slice(0, limit); | |
if (limitedDocs.length === 0) { | |
return { | |
documents: [], | |
checkpoint: '' | |
}; | |
} | |
return { | |
documents: limitedDocs, | |
checkpoint: limitedDocs[limitedDocs.length - 1]._id | |
}; | |
} | |
// Find the index of the document with the checkpoint id | |
const checkpointIndex = docs.findIndex(doc => doc._id === checkpoint); | |
if (checkpointIndex === -1) { | |
// If checkpoint not found, start from beginning | |
const limitedDocs = docs.slice(0, limit); | |
return { | |
documents: limitedDocs, | |
checkpoint: limitedDocs.length ? limitedDocs[limitedDocs.length - 1]._id : '' | |
}; | |
} | |
// Get documents after the checkpoint | |
const docsAfterCheckpoint = docs.slice(checkpointIndex + 1, checkpointIndex + 1 + limit); | |
return { | |
documents: docsAfterCheckpoint, | |
checkpoint: docsAfterCheckpoint.length ? | |
docsAfterCheckpoint[docsAfterCheckpoint.length - 1]._id : | |
checkpoint | |
}; | |
}, | |
// Observable of change events | |
changeStream() { | |
return internals.changes$; | |
}, | |
// Cleanup deleted documents | |
async cleanup(minimumDeletedTime: number): Promise<boolean> { | |
const documentsMap = internals.documents$.getValue(); | |
const now = Date.now(); | |
const toDelete: string[] = []; | |
// Find deleted documents older than minimumDeletedTime | |
for (const [id, doc] of documentsMap.entries()) { | |
if (doc._deleted && doc._meta?.deletedAt) { | |
const deletedAt = doc._meta.deletedAt; | |
if (now - deletedAt >= minimumDeletedTime) { | |
toDelete.push(id); | |
} | |
} | |
} | |
// Remove the documents | |
for (const id of toDelete) { | |
await AsyncStorage.removeItem(`${internals.storagePrefix}${id}`); | |
documentsMap.delete(id); | |
} | |
// Update documents$ subject if documents were deleted | |
if (toDelete.length > 0) { | |
internals.documents$.next(documentsMap); | |
} | |
// Return true if all deleted documents were cleaned up | |
return true; | |
}, | |
// Close the storage instance | |
async close(): Promise<void> { | |
// In AsyncStorage, we don't need to close connections | |
// Just remove all listeners | |
documents$.complete(); | |
changeEventSubject.complete(); | |
}, | |
// Remove the collection's data | |
async remove(): Promise<void> { | |
const documentsMap = internals.documents$.getValue(); | |
const allKeys = Array.from(documentsMap.keys()).map(id => `${internals.storagePrefix}${id}`); | |
// Remove all documents from AsyncStorage | |
await AsyncStorage.multiRemove(allKeys); | |
// Clear the in-memory map | |
documentsMap.clear(); | |
internals.documents$.next(documentsMap); | |
} | |
}; | |
return instance; | |
} | |
/** | |
* Load all documents from AsyncStorage for the given prefix | |
*/ | |
private async loadInitialData<RxDocType>( | |
prefix: string, | |
documents$: BehaviorSubject<Map<string, RxDocumentData<RxDocType>>> | |
): Promise<void> { | |
try { | |
// Get all keys | |
const allKeys = await AsyncStorage.getAllKeys(); | |
// Filter keys that belong to this collection | |
const collectionKeys = allKeys.filter(key => key.startsWith(prefix)); | |
// If no documents, return | |
if (collectionKeys.length === 0) { | |
return; | |
} | |
// Get all documents | |
const keyValues = await AsyncStorage.multiGet(collectionKeys); | |
const documentsMap = new Map<string, RxDocumentData<RxDocType>>(); | |
// Parse each document and add to the map | |
for (const [key, value] of keyValues) { | |
if (value) { | |
const docId = key.substring(prefix.length); | |
// Skip attachment keys (they contain underscore after docId) | |
if (docId.includes('_')) { | |
continue; | |
} | |
try { | |
const document = JSON.parse(value) as RxDocumentData<RxDocType>; | |
documentsMap.set(docId, document); | |
} catch (err) { | |
console.error(`Failed to parse document with id ${docId}:`, err); | |
} | |
} | |
} | |
// Update the BehaviorSubject | |
documents$.next(documentsMap); | |
} catch (err) { | |
console.error('Error loading initial data from AsyncStorage:', err); | |
throw err; | |
} | |
} | |
} | |
/** | |
* Sort documents based on the query sort parameters | |
*/ | |
function sortDocuments<RxDocType>( | |
docs: RxDocumentData<RxDocType>[], | |
sortParams: { [propPath: string]: 'asc' | 'desc' }[] | |
): RxDocumentData<RxDocType>[] { | |
return [...docs].sort((a, b) => { | |
for (const sort of sortParams) { | |
const [propPath, direction] = Object.entries(sort)[0]; | |
// Get values from documents | |
const aValue = getValueByPath(a, propPath); | |
const bValue = getValueByPath(b, propPath); | |
// Compare values | |
if (aValue !== bValue) { | |
const ascFactor = direction === 'asc' ? 1 : -1; | |
if (aValue === undefined) return 1 * ascFactor; | |
if (bValue === undefined) return -1 * ascFactor; | |
return aValue < bValue ? -1 * ascFactor : 1 * ascFactor; | |
} | |
} | |
return 0; | |
}); | |
} | |
/** | |
* Get nested property value using a path string (e.g., "user.address.city") | |
*/ | |
function getValueByPath<T>(obj: T, path: string): any { | |
const parts = path.split('.'); | |
let current: any = obj; | |
for (const part of parts) { | |
if (current === null || current === undefined) { | |
return undefined; | |
} | |
current = current[part]; | |
} | |
return current; | |
} | |
/** | |
* Create and return a new AsyncStorageRxStorage instance | |
*/ | |
export function getRxStorageAsyncStorage(version: string): RxStorage<AsyncStorageInternals, AsyncStorageInstanceCreationOptions> { | |
return new AsyncStorageRxStorage(version); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment