Skip to content

Instantly share code, notes, and snippets.

@wmakeev
Last active November 8, 2024 01:45
Show Gist options
  • Save wmakeev/8a15f0b1dcb9c268b95c22fffcf4e101 to your computer and use it in GitHub Desktop.
Save wmakeev/8a15f0b1dcb9c268b95c22fffcf4e101 to your computer and use it in GitHub Desktop.
[Yandex Query API wrapper] #yandex-cloud #yandex #api #yandex-query
import '@formatjs/intl-durationformat/polyfill.js'
import assert from 'assert'
import jose from 'node-jose'
import { setTimeout as setTimeoutAsync } from 'node:timers/promises'
const YA_IAM_TOKENS_URL = 'https://iam.api.cloud.yandex.net/iam/v1/tokens'
const YA_QUERY_ENDPOINT = 'https://api.yandex-query.cloud.yandex.net'
// @ts-ignore no typings
const durationFormat = new Intl.DurationFormat('en', {
style: 'digital',
hoursDisplay: 'auto'
})
// https://yandex.cloud/en-ru/docs/query/api/methods/
export class YandexQuery {
static INITIAL_WAIT_TIME_MS = 500
static WAIT_TIME_MULTIPLIER = 1.1
static MAX_WAIT_TIME_INTERVAL = 5 * 60 * 1000
static MAX_WAIT_TIME_MS = 5 * 60 * 1000
#key
/** @type {Yandex.IAM.Token | null} */
#token = null
#folderId
/**
* @param {Yandex.IAM.AuthKey} key
* @param {string} folderId
*/
constructor(key, folderId) {
this.#key = key
this.#folderId = folderId
}
async #updateToken() {
const {
private_key: privateKey,
service_account_id: iss,
id: kid
} = this.#key
const now = Math.floor(new Date().getTime() / 1000)
const payload = {
aud: YA_IAM_TOKENS_URL,
iss,
iat: now,
exp: now + 3600
}
const key = await jose.JWK.asKey(privateKey, 'pem', { kid, alg: 'PS256' })
const jwt = await jose.JWS.createSign({ format: 'compact' }, key)
.update(JSON.stringify(payload))
.final()
const response = await fetch(YA_IAM_TOKENS_URL, {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ jwt })
})
const token = await response.json()
assert.ok(
token != null &&
typeof token === 'object' &&
'expiresAt' in token &&
'iamToken' in token
)
assert.ok(typeof token.iamToken === 'string')
assert.ok(typeof token.expiresAt === 'string')
this.#token = {
iamToken: token.iamToken,
expiresAt: new Date(token.expiresAt)
}
}
async #getAuthHeader() {
if (this.#token == null) {
await this.#updateToken()
}
assert.ok(this.#token)
if (this.#token.expiresAt.getTime() < Date.now() + 60 * 60 * 1000) {
await this.#updateToken()
}
return this.#token.iamToken
}
/**
* @param {string} path
*/
#getBaseUrl(path) {
return new URL(path, YA_QUERY_ENDPOINT)
}
/**
* @param {RequestInit['method']} method
* @param {URL} url
* @param {unknown} [body]
* @param {{ signal?: AbortSignal }} [options]
* @returns {Promise<any>}
*/
async #fetchUrl(method, url, body, options = {}) {
const { signal } = options
const response = await fetch(url, {
method,
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': await this.#getAuthHeader()
},
body: JSON.stringify(body),
signal
})
/**
* ```json
* {
* status: 400010,
* message: 'BAD_REQUEST',
* details: [
* {
* message: 'Result set index out of bound: 0 >= 0',
* issue_code: 1003,
* severity: 'ERROR',
* issues: []
* }
* ]
* }
* ```
*/
const responseBody = await response.json()
if (response.status !== 200) {
console.log(responseBody)
throw new Error('Ошибка запроса')
}
return responseBody
}
/**
* Get query by id
*
* @param {string} queryId
* @param {{signal?: AbortSignal}} [options]
*/
async getQuery(queryId, options) {
const url = this.#getBaseUrl(`/api/fq/v1/queries/${queryId}`)
url.searchParams.set('project', this.#folderId)
/** @type {Yandex.Query.GetQueryResponse} */
const response = await this.#fetchUrl('GET', url, undefined, options)
return response
}
/**
* Create new query
*
* @param {string} query
* @param {{name?: string; description?: string; signal?: AbortSignal}} [options]
*/
async createQuery(query, options = {}) {
const url = this.#getBaseUrl(`/api/fq/v1/queries`)
url.searchParams.set('project', this.#folderId)
/** @type {Yandex.Query.CreateQueryResponse} */
const response = await this.#fetchUrl(
'POST',
url,
{
name: options.name ?? `query-${Date.now()}`,
type: 'ANALYTICS',
text: query,
description: options.description ?? ''
},
{ signal: options.signal }
)
return response
}
/**
* Stop query
*
* @param {string} queryId
* @param {{signal?: AbortSignal}} [options]
*/
async stopQuery(queryId, options) {
const url = this.#getBaseUrl(`/api/fq/v1/queries/${queryId}/stop`)
url.searchParams.set('project', this.#folderId)
await this.#fetchUrl('POST', url, undefined, options)
}
/**
* Get query stauts
*
* @param {string} queryId
* @param {{signal?: AbortSignal}} [options]
*/
async getQueryStatus(queryId, options) {
const url = this.#getBaseUrl(`/api/fq/v1/queries/${queryId}/status`)
url.searchParams.set('project', this.#folderId)
/** @type {Yandex.Query.GetQueryStatusResponse} */
const response = await this.#fetchUrl('GET', url, undefined, options)
return response
}
/**
* Get query results
*
* @param {string} queryId
* @param {number} resultId
* @param {{offset?: number; limit?: number; signal?: AbortSignal}} options
*/
async getQueryResults(queryId, resultId, options = {}) {
const url = this.#getBaseUrl(
`/api/fq/v1/queries/${queryId}/results/${resultId}`
)
url.searchParams.set('project', this.#folderId)
url.searchParams.set('offset', String(options.offset ?? 0))
url.searchParams.set('limit', String(options.limit ?? 100))
/** @type {Yandex.Query.GetQueryResultsResponse} */
const response = await this.#fetchUrl('GET', url, undefined, {
signal: options.signal
})
return response
}
/**
* Get query results
*
* @param {string} queryId
* @param {number} resultId
* @param {{ batchLimit?: number; signal?: AbortSignal }} [options]
*/
async *getQueryResultsIterator(queryId, resultId, options = {}) {
const { batchLimit: limit = 1000, signal } = options
let offset = 0
while (true) {
const results = await this.getQueryResults(queryId, resultId, {
offset: 0,
limit: 1000,
signal
})
yield results
if (results.rows.length < limit) break
offset += limit
}
}
/**
* Create new query, and wait for it completes
*
* @param {string} query
* @param {{name?: string; description?: string; signal?: AbortSignal}} [options]
*/
async executeQuery(query, options = {}) {
const { signal } = options
const { id: queryId } = await this.createQuery(query, {
name: options.name ?? `query-${Date.now()}`,
description: options.description ?? '',
signal
})
/** @type {Yandex.Query.GetQueryResponse['status'] | undefined} */
let queryStatus
try {
const newQuery = await this.getQuery(queryId, { signal })
queryStatus = newQuery.status
let waitInterval = YandexQuery.INITIAL_WAIT_TIME_MS
let waitTime = Date.now()
while (queryStatus === 'RUNNING') {
queryStatus = (await this.getQueryStatus(queryId, { signal })).status
console.info(
newQuery.name,
queryStatus,
durationFormat.format({
milliseconds: Date.now() - waitTime
})
)
assert.ok(
waitInterval >= YandexQuery.INITIAL_WAIT_TIME_MS &&
waitInterval < YandexQuery.MAX_WAIT_TIME_INTERVAL
)
await setTimeoutAsync(waitInterval, { signal })
waitInterval *= YandexQuery.WAIT_TIME_MULTIPLIER
}
const executedQuery = await this.getQuery(queryId, { signal })
if (executedQuery.status === 'FAILED') {
console.error(executedQuery)
throw new Error('Ошибка запроса')
}
return executedQuery
} catch (err) {
if (
err instanceof Error &&
err.name === 'AbortError' &&
queryStatus === 'RUNNING'
) {
try {
await this.stopQuery(queryId)
} catch (err) {
assert.ok(err instanceof Error)
console.error(err.message)
}
}
throw err
}
}
/**
* Execute query by template. Wait for query complete. And return result
* query object.
*
* @param {string} templateQueryId
* @param {{ signal?: AbortSignal }} [options]
*/
async executeQueryByTemplate(templateQueryId, options = {}) {
const { signal } = options
const templateQuery = await this.getQuery(templateQueryId, { signal })
const queryText = templateQuery.text
const result = await this.executeQuery(queryText, {
name: templateQuery.name,
description: templateQuery.description,
signal
})
return result
}
}
import assert from "node:assert/strict";
import { readFile } from "node:fs/promises";
import path from "node:path";
import test from "node:test";
import { env } from "../../env.js";
import { YandexQuery } from "./index.js";
import { setTimeout as setTimeoutAsync } from "node:timers/promises";
const { YQ_FOLDER_ID, YQ_STOCK_PRICE_BY_STORE_QUERY } = env;
// Идентификатор тестового запроса
// const TEST_QUERY_ID = 'csqueq19shiv8k2vbtst'
// [datalens] moysklad-stock-price-by-store
const TEST_QUERY_ID = "csquepvg9it2sf4qkkrh";
test.skip("YandexQuery", async () => {
/** @type {Yandex.IAM.AuthKey} */
const key = JSON.parse(
await readFile(path.join(process.cwd(), "authorized_key.json"), "utf8")
);
assert.ok(key);
const yq = new YandexQuery(key, YQ_FOLDER_ID);
const sampleQuery = await yq.getQuery(TEST_QUERY_ID);
assert.ok(sampleQuery);
const { id: queryId } = await yq.createQuery(sampleQuery.text, {
name: sampleQuery.name,
description: sampleQuery.description,
});
/** @type {Yandex.Query.GetQueryResponse['status']} */
let queryStatus = "RUNNING";
while (queryStatus === "RUNNING") {
queryStatus = (await yq.getQueryStatus(queryId)).status;
console.info(sampleQuery.name, queryStatus);
await setTimeoutAsync(1000);
}
const query = await yq.getQuery(queryId);
if (query.status === "FAILED") {
throw new Error("Ошибка запроса");
}
if (query.result_sets[0] === undefined) {
throw new Error("Нет результатов запроса");
}
assert.ok(query.result_sets[0]);
const queryResults = [];
for await (const results of yq.getQueryResultsIterator(queryId, 0)) {
queryResults.push(results);
}
assert.ok(queryResults.length);
});
test("YandexQuery.executeQueryByTemplate", async () => {
/** @type {Yandex.IAM.AuthKey} */
const key = JSON.parse(
await readFile(path.join(process.cwd(), "authorized_key.json"), "utf8")
);
assert.ok(key);
const yq = new YandexQuery(key, YQ_FOLDER_ID);
const resultQuery = await yq.executeQueryByTemplate(
YQ_STOCK_PRICE_BY_STORE_QUERY
);
const startedAt = new Date(resultQuery.meta.started_at);
const finishedAt = new Date(resultQuery.meta.finished_at);
console.log(
`"${resultQuery.name}" duration - ${
finishedAt.getTime() - startedAt.getTime()
}ms`
);
assert.ok(resultQuery.status === "COMPLETED");
});
namespace Yandex {
export namespace IAM {
export interface Token {
expiresAt: Date;
iamToken: string;
}
export interface AuthKey {
id: string;
service_account_id: string;
created_at: string;
key_algorithm: "RSA_2048";
public_key: string;
private_key: string;
}
}
export namespace Query {
export interface GetQueryResponse {
id: string;
type: "ANALYTICS";
name: string;
description: string;
status: "RUNNING" | "COMPLETED" | "FAILED";
text: string;
meta: {
started_at: string;
finished_at: string;
};
result_sets: {
rows_count: number;
truncated: boolean;
}[];
}
export interface GetQueryStatusResponse {
status: "RUNNING" | "COMPLETED" | "FAILED";
}
export interface CreateQueryResponse {
id: string;
}
export interface GetQueryResultsResponse {
columns: {
name: string;
type: "Int32";
}[];
rows: string[][];
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment