Created
October 31, 2023 00:04
-
-
Save tsibley/911b641e245fadc1f2eff8de1bab4fa0 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/resourceIndexer/coreStagingS3.js b/resourceIndexer/coreStagingS3.js | |
index ad1cce9d..78cc2284 100644 | |
--- a/resourceIndexer/coreStagingS3.js | |
+++ b/resourceIndexer/coreStagingS3.js | |
@@ -1,6 +1,7 @@ | |
import {logger} from './logger.js'; | |
import { SOURCE, VALID_AUSPICE_PATTERNS, INVALID_AUSPICE_PATTERNS, | |
DATESTAMP_REGEX, MAIN_DATASET_JSON } from './constants.js'; | |
+import { fetchInventoryRemote, fetchInventoryLocal, parseInventory } from './inventory.js'; | |
/** | |
* The inventory of buckets (especially the core bucket) is in some ways a | |
@@ -273,20 +274,56 @@ function validIntermediate(id, date, objects) { | |
} | |
+/** | |
+ * XXX FIXME doc | |
+ */ | |
+async function collect({name, sourceBucket, local = false} = {}) { | |
+ return await parseInventory(await fetchInventory({ name, sourceBucket, local })); | |
+} | |
+ | |
+ | |
+/** | |
+ * XXX FIXME doc | |
+ */ | |
+async function fetchInventory({name, sourceBucket, local = false} = {}) { | |
+ if (local) { | |
+ return await fetchInventoryLocal({ | |
+ manifestPath: `./devData/${name}.manifest.json`, | |
+ inventoryPath: `./devData/${name}.inventory.csv.gz`, | |
+ name, | |
+ }); | |
+ } else { | |
+ return await fetchInventoryRemote({ | |
+ bucket: 'nextstrain-inventories', | |
+ prefix: `${sourceBucket}/config-v1/`, | |
+ name, | |
+ }); | |
+ } | |
+} | |
+ | |
+ | |
export const coreS3Data = { | |
- type: 's3Inventory', | |
- bucket: 'nextstrain-inventories', | |
- prefix: 'nextstrain-data/config-v1/', | |
name: 'core', | |
+ async collect({local = false} = {}) { | |
+ return await collect({ | |
+ name: this.name, | |
+ sourceBucket: "nextstrain-data", | |
+ local, | |
+ }); | |
+ }, | |
categorise: (item) => categoriseCoreObjects(item, false), | |
createResource: createVersionedResources | |
}; | |
export const stagingS3Data = { | |
- type: 's3Inventory', | |
- bucket: 'nextstrain-inventories', | |
- prefix: 'nextstrain-staging/config-v1/', | |
name: 'staging', | |
+ async collect({local = false} = {}) { | |
+ return await collect({ | |
+ name: this.name, | |
+ sourceBucket: "nextstrain-staging", | |
+ local, | |
+ }); | |
+ }, | |
categorise: (item) => categoriseCoreObjects(item, true), | |
createResource: createVersionedResources | |
}; | |
diff --git a/resourceIndexer/inventory.js b/resourceIndexer/inventory.js | |
index a6b49630..488a89bc 100644 | |
--- a/resourceIndexer/inventory.js | |
+++ b/resourceIndexer/inventory.js | |
@@ -19,7 +19,7 @@ const gunzip = promisify(zlib.gunzip) | |
* | |
* @returns {object[]} list of entries in the inventory, using the schema to define keys | |
*/ | |
-const fetchInventoryRemote = async ({bucket, prefix, name}) => { | |
+export const fetchInventoryRemote = async ({bucket, prefix, name}) => { | |
const S3 = new AWS.S3(); | |
const manifestKeyPattern = new RegExp(`^${prefix}\\d{4}-\\d{2}-\\d{2}T\\d{2}-\\d{2}Z/manifest\\.json$`); | |
const manifestKey = await new Promise((resolve, reject) => { | |
@@ -52,15 +52,11 @@ const fetchInventoryRemote = async ({bucket, prefix, name}) => { | |
} | |
/** | |
- * Parse an on-disk inventory: | |
- * - `./devData/${name}.manifest.json` | |
- * - `./devData/${name}.inventory.csv.gz` | |
+ * Parse an on-disk inventory. | |
* | |
* @returns {object[]} list of entries in the inventory, using the schema to define keys | |
*/ | |
-const fetchInventoryLocal = async ({name}) => { | |
- const manifestPath = `./devData/${name}.manifest.json`; | |
- const inventoryPath = `./devData/${name}.inventory.csv.gz`; | |
+export const fetchInventoryLocal = async ({manifestPath, inventoryPath, name}) => { | |
logger.info(`inventory for ${name} -- reading S3 inventories from ${manifestPath} and ${inventoryPath}`); | |
const manifestHandle = fs.open(manifestPath, 'r'); | |
const manifest = JSON.parse( | |
@@ -87,22 +83,8 @@ const fetchInventoryLocal = async ({name}) => { | |
* This includes all versions past before and after the delete marker. Internally we haven't | |
* explicitly considered what it means to "delete" a resource, so this is the most conservative | |
* implementation. | |
- * | |
- * The option exists to source the inventory files locally (useful for dev purposes to avoid constant | |
- * downloads from S3) | |
*/ | |
-export const parseInventory = async ({bucket, prefix, name, local}) => { | |
- let objects; | |
- try { | |
- if (local) { | |
- objects = await fetchInventoryLocal({bucket, prefix, name}); | |
- } else { | |
- objects = await fetchInventoryRemote({bucket, prefix, name}); | |
- } | |
- } catch (e) { | |
- logger.error(`Error while fetching s3 inventory for ${name}: ${e.message}`) | |
- return []; | |
- } | |
+export const parseInventory = async (objects) => { | |
/* Scan the objects to find delete markers and filter out both the markers | |
themselves and all other keys which are identical to a delete marker. We may | |
want to relax this in the future, e.g. if a v1 dataset was deleted but a v2 | |
diff --git a/resourceIndexer/main.js b/resourceIndexer/main.js | |
index 15f17e26..f7e22c34 100644 | |
--- a/resourceIndexer/main.js | |
+++ b/resourceIndexer/main.js | |
@@ -67,13 +67,7 @@ async function main(args) { | |
continue | |
} | |
- if (collection.type !== 's3Inventory') { | |
- logger.error(`Data source provided for a type we don't yet handle (${collection.type})`) | |
- } | |
- | |
- const groupedObjects = (await parseInventory( | |
- {bucket: collection.bucket, prefix: collection.prefix, name: collection.name, local: args.local} | |
- )) | |
+ const groupedObjects = (await collection.collect({local: args.local})) | |
.map(collection.categorise) | |
.filter((item) => !!item) | |
// Collect together all items ("files") based on their assigned resourceType & resourcePath | |
@@ -104,4 +98,4 @@ async function main(args) { | |
output = await gzip(output) | |
} | |
fs.writeFileSync(args.output, output); | |
-} | |
\ No newline at end of file | |
+} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment