Last active
November 8, 2024 01:45
-
-
Save wmakeev/8a15f0b1dcb9c268b95c22fffcf4e101 to your computer and use it in GitHub Desktop.
[Yandex Query API wrapper] #yandex-cloud #yandex #api #yandex-query
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import '@formatjs/intl-durationformat/polyfill.js' | |
import assert from 'assert' | |
import jose from 'node-jose' | |
import { setTimeout as setTimeoutAsync } from 'node:timers/promises' | |
const YA_IAM_TOKENS_URL = 'https://iam.api.cloud.yandex.net/iam/v1/tokens' | |
const YA_QUERY_ENDPOINT = 'https://api.yandex-query.cloud.yandex.net' | |
// @ts-ignore no typings | |
const durationFormat = new Intl.DurationFormat('en', { | |
style: 'digital', | |
hoursDisplay: 'auto' | |
}) | |
// https://yandex.cloud/en-ru/docs/query/api/methods/ | |
export class YandexQuery { | |
static INITIAL_WAIT_TIME_MS = 500 | |
static WAIT_TIME_MULTIPLIER = 1.1 | |
static MAX_WAIT_TIME_INTERVAL = 5 * 60 * 1000 | |
static MAX_WAIT_TIME_MS = 5 * 60 * 1000 | |
#key | |
/** @type {Yandex.IAM.Token | null} */ | |
#token = null | |
#folderId | |
/** | |
* @param {Yandex.IAM.AuthKey} key | |
* @param {string} folderId | |
*/ | |
constructor(key, folderId) { | |
this.#key = key | |
this.#folderId = folderId | |
} | |
async #updateToken() { | |
const { | |
private_key: privateKey, | |
service_account_id: iss, | |
id: kid | |
} = this.#key | |
const now = Math.floor(new Date().getTime() / 1000) | |
const payload = { | |
aud: YA_IAM_TOKENS_URL, | |
iss, | |
iat: now, | |
exp: now + 3600 | |
} | |
const key = await jose.JWK.asKey(privateKey, 'pem', { kid, alg: 'PS256' }) | |
const jwt = await jose.JWS.createSign({ format: 'compact' }, key) | |
.update(JSON.stringify(payload)) | |
.final() | |
const response = await fetch(YA_IAM_TOKENS_URL, { | |
method: 'POST', | |
headers: { | |
'Content-Type': 'application/json' | |
}, | |
body: JSON.stringify({ jwt }) | |
}) | |
const token = await response.json() | |
assert.ok( | |
token != null && | |
typeof token === 'object' && | |
'expiresAt' in token && | |
'iamToken' in token | |
) | |
assert.ok(typeof token.iamToken === 'string') | |
assert.ok(typeof token.expiresAt === 'string') | |
this.#token = { | |
iamToken: token.iamToken, | |
expiresAt: new Date(token.expiresAt) | |
} | |
} | |
async #getAuthHeader() { | |
if (this.#token == null) { | |
await this.#updateToken() | |
} | |
assert.ok(this.#token) | |
if (this.#token.expiresAt.getTime() < Date.now() + 60 * 60 * 1000) { | |
await this.#updateToken() | |
} | |
return this.#token.iamToken | |
} | |
/** | |
* @param {string} path | |
*/ | |
#getBaseUrl(path) { | |
return new URL(path, YA_QUERY_ENDPOINT) | |
} | |
/** | |
* @param {RequestInit['method']} method | |
* @param {URL} url | |
* @param {unknown} [body] | |
* @param {{ signal?: AbortSignal }} [options] | |
* @returns {Promise<any>} | |
*/ | |
async #fetchUrl(method, url, body, options = {}) { | |
const { signal } = options | |
const response = await fetch(url, { | |
method, | |
headers: { | |
'Accept': 'application/json', | |
'Content-Type': 'application/json', | |
'Authorization': await this.#getAuthHeader() | |
}, | |
body: JSON.stringify(body), | |
signal | |
}) | |
/** | |
* ```json | |
* { | |
* status: 400010, | |
* message: 'BAD_REQUEST', | |
* details: [ | |
* { | |
* message: 'Result set index out of bound: 0 >= 0', | |
* issue_code: 1003, | |
* severity: 'ERROR', | |
* issues: [] | |
* } | |
* ] | |
* } | |
* ``` | |
*/ | |
const responseBody = await response.json() | |
if (response.status !== 200) { | |
console.log(responseBody) | |
throw new Error('Ошибка запроса') | |
} | |
return responseBody | |
} | |
/** | |
* Get query by id | |
* | |
* @param {string} queryId | |
* @param {{signal?: AbortSignal}} [options] | |
*/ | |
async getQuery(queryId, options) { | |
const url = this.#getBaseUrl(`/api/fq/v1/queries/${queryId}`) | |
url.searchParams.set('project', this.#folderId) | |
/** @type {Yandex.Query.GetQueryResponse} */ | |
const response = await this.#fetchUrl('GET', url, undefined, options) | |
return response | |
} | |
/** | |
* Create new query | |
* | |
* @param {string} query | |
* @param {{name?: string; description?: string; signal?: AbortSignal}} [options] | |
*/ | |
async createQuery(query, options = {}) { | |
const url = this.#getBaseUrl(`/api/fq/v1/queries`) | |
url.searchParams.set('project', this.#folderId) | |
/** @type {Yandex.Query.CreateQueryResponse} */ | |
const response = await this.#fetchUrl( | |
'POST', | |
url, | |
{ | |
name: options.name ?? `query-${Date.now()}`, | |
type: 'ANALYTICS', | |
text: query, | |
description: options.description ?? '' | |
}, | |
{ signal: options.signal } | |
) | |
return response | |
} | |
/** | |
* Stop query | |
* | |
* @param {string} queryId | |
* @param {{signal?: AbortSignal}} [options] | |
*/ | |
async stopQuery(queryId, options) { | |
const url = this.#getBaseUrl(`/api/fq/v1/queries/${queryId}/stop`) | |
url.searchParams.set('project', this.#folderId) | |
await this.#fetchUrl('POST', url, undefined, options) | |
} | |
/** | |
* Get query stauts | |
* | |
* @param {string} queryId | |
* @param {{signal?: AbortSignal}} [options] | |
*/ | |
async getQueryStatus(queryId, options) { | |
const url = this.#getBaseUrl(`/api/fq/v1/queries/${queryId}/status`) | |
url.searchParams.set('project', this.#folderId) | |
/** @type {Yandex.Query.GetQueryStatusResponse} */ | |
const response = await this.#fetchUrl('GET', url, undefined, options) | |
return response | |
} | |
/** | |
* Get query results | |
* | |
* @param {string} queryId | |
* @param {number} resultId | |
* @param {{offset?: number; limit?: number; signal?: AbortSignal}} options | |
*/ | |
async getQueryResults(queryId, resultId, options = {}) { | |
const url = this.#getBaseUrl( | |
`/api/fq/v1/queries/${queryId}/results/${resultId}` | |
) | |
url.searchParams.set('project', this.#folderId) | |
url.searchParams.set('offset', String(options.offset ?? 0)) | |
url.searchParams.set('limit', String(options.limit ?? 100)) | |
/** @type {Yandex.Query.GetQueryResultsResponse} */ | |
const response = await this.#fetchUrl('GET', url, undefined, { | |
signal: options.signal | |
}) | |
return response | |
} | |
/** | |
* Get query results | |
* | |
* @param {string} queryId | |
* @param {number} resultId | |
* @param {{ batchLimit?: number; signal?: AbortSignal }} [options] | |
*/ | |
async *getQueryResultsIterator(queryId, resultId, options = {}) { | |
const { batchLimit: limit = 1000, signal } = options | |
let offset = 0 | |
while (true) { | |
const results = await this.getQueryResults(queryId, resultId, { | |
offset: 0, | |
limit: 1000, | |
signal | |
}) | |
yield results | |
if (results.rows.length < limit) break | |
offset += limit | |
} | |
} | |
/** | |
* Create new query, and wait for it completes | |
* | |
* @param {string} query | |
* @param {{name?: string; description?: string; signal?: AbortSignal}} [options] | |
*/ | |
async executeQuery(query, options = {}) { | |
const { signal } = options | |
const { id: queryId } = await this.createQuery(query, { | |
name: options.name ?? `query-${Date.now()}`, | |
description: options.description ?? '', | |
signal | |
}) | |
/** @type {Yandex.Query.GetQueryResponse['status'] | undefined} */ | |
let queryStatus | |
try { | |
const newQuery = await this.getQuery(queryId, { signal }) | |
queryStatus = newQuery.status | |
let waitInterval = YandexQuery.INITIAL_WAIT_TIME_MS | |
let waitTime = Date.now() | |
while (queryStatus === 'RUNNING') { | |
queryStatus = (await this.getQueryStatus(queryId, { signal })).status | |
console.info( | |
newQuery.name, | |
queryStatus, | |
durationFormat.format({ | |
milliseconds: Date.now() - waitTime | |
}) | |
) | |
assert.ok( | |
waitInterval >= YandexQuery.INITIAL_WAIT_TIME_MS && | |
waitInterval < YandexQuery.MAX_WAIT_TIME_INTERVAL | |
) | |
await setTimeoutAsync(waitInterval, { signal }) | |
waitInterval *= YandexQuery.WAIT_TIME_MULTIPLIER | |
} | |
const executedQuery = await this.getQuery(queryId, { signal }) | |
if (executedQuery.status === 'FAILED') { | |
console.error(executedQuery) | |
throw new Error('Ошибка запроса') | |
} | |
return executedQuery | |
} catch (err) { | |
if ( | |
err instanceof Error && | |
err.name === 'AbortError' && | |
queryStatus === 'RUNNING' | |
) { | |
try { | |
await this.stopQuery(queryId) | |
} catch (err) { | |
assert.ok(err instanceof Error) | |
console.error(err.message) | |
} | |
} | |
throw err | |
} | |
} | |
/** | |
* Execute query by template. Wait for query complete. And return result | |
* query object. | |
* | |
* @param {string} templateQueryId | |
* @param {{ signal?: AbortSignal }} [options] | |
*/ | |
async executeQueryByTemplate(templateQueryId, options = {}) { | |
const { signal } = options | |
const templateQuery = await this.getQuery(templateQueryId, { signal }) | |
const queryText = templateQuery.text | |
const result = await this.executeQuery(queryText, { | |
name: templateQuery.name, | |
description: templateQuery.description, | |
signal | |
}) | |
return result | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import assert from "node:assert/strict"; | |
import { readFile } from "node:fs/promises"; | |
import path from "node:path"; | |
import test from "node:test"; | |
import { env } from "../../env.js"; | |
import { YandexQuery } from "./index.js"; | |
import { setTimeout as setTimeoutAsync } from "node:timers/promises"; | |
const { YQ_FOLDER_ID, YQ_STOCK_PRICE_BY_STORE_QUERY } = env; | |
// Идентификатор тестового запроса | |
// const TEST_QUERY_ID = 'csqueq19shiv8k2vbtst' | |
// [datalens] moysklad-stock-price-by-store | |
const TEST_QUERY_ID = "csquepvg9it2sf4qkkrh"; | |
test.skip("YandexQuery", async () => { | |
/** @type {Yandex.IAM.AuthKey} */ | |
const key = JSON.parse( | |
await readFile(path.join(process.cwd(), "authorized_key.json"), "utf8") | |
); | |
assert.ok(key); | |
const yq = new YandexQuery(key, YQ_FOLDER_ID); | |
const sampleQuery = await yq.getQuery(TEST_QUERY_ID); | |
assert.ok(sampleQuery); | |
const { id: queryId } = await yq.createQuery(sampleQuery.text, { | |
name: sampleQuery.name, | |
description: sampleQuery.description, | |
}); | |
/** @type {Yandex.Query.GetQueryResponse['status']} */ | |
let queryStatus = "RUNNING"; | |
while (queryStatus === "RUNNING") { | |
queryStatus = (await yq.getQueryStatus(queryId)).status; | |
console.info(sampleQuery.name, queryStatus); | |
await setTimeoutAsync(1000); | |
} | |
const query = await yq.getQuery(queryId); | |
if (query.status === "FAILED") { | |
throw new Error("Ошибка запроса"); | |
} | |
if (query.result_sets[0] === undefined) { | |
throw new Error("Нет результатов запроса"); | |
} | |
assert.ok(query.result_sets[0]); | |
const queryResults = []; | |
for await (const results of yq.getQueryResultsIterator(queryId, 0)) { | |
queryResults.push(results); | |
} | |
assert.ok(queryResults.length); | |
}); | |
test("YandexQuery.executeQueryByTemplate", async () => { | |
/** @type {Yandex.IAM.AuthKey} */ | |
const key = JSON.parse( | |
await readFile(path.join(process.cwd(), "authorized_key.json"), "utf8") | |
); | |
assert.ok(key); | |
const yq = new YandexQuery(key, YQ_FOLDER_ID); | |
const resultQuery = await yq.executeQueryByTemplate( | |
YQ_STOCK_PRICE_BY_STORE_QUERY | |
); | |
const startedAt = new Date(resultQuery.meta.started_at); | |
const finishedAt = new Date(resultQuery.meta.finished_at); | |
console.log( | |
`"${resultQuery.name}" duration - ${ | |
finishedAt.getTime() - startedAt.getTime() | |
}ms` | |
); | |
assert.ok(resultQuery.status === "COMPLETED"); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Yandex { | |
export namespace IAM { | |
export interface Token { | |
expiresAt: Date; | |
iamToken: string; | |
} | |
export interface AuthKey { | |
id: string; | |
service_account_id: string; | |
created_at: string; | |
key_algorithm: "RSA_2048"; | |
public_key: string; | |
private_key: string; | |
} | |
} | |
export namespace Query { | |
export interface GetQueryResponse { | |
id: string; | |
type: "ANALYTICS"; | |
name: string; | |
description: string; | |
status: "RUNNING" | "COMPLETED" | "FAILED"; | |
text: string; | |
meta: { | |
started_at: string; | |
finished_at: string; | |
}; | |
result_sets: { | |
rows_count: number; | |
truncated: boolean; | |
}[]; | |
} | |
export interface GetQueryStatusResponse { | |
status: "RUNNING" | "COMPLETED" | "FAILED"; | |
} | |
export interface CreateQueryResponse { | |
id: string; | |
} | |
export interface GetQueryResultsResponse { | |
columns: { | |
name: string; | |
type: "Int32"; | |
}[]; | |
rows: string[][]; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment