Created
April 8, 2022 21:42
-
-
Save evantahler/71b22b63963c965cabf860f8a707a9f1 to your computer and use it in GitHub Desktop.
Airbyte Source in TS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM node:alpine | |
LABEL maintainer="[email protected]" | |
WORKDIR /airbyte/integration_code | |
COPY package*.json ./ | |
COPY . . | |
RUN npm install | |
RUN npm run build | |
ENTRYPOINT ["node", "/airbyte/integration_code/dist/source.js"] | |
ENV AIRBYTE_ENTRYPOINT="node /airbyte/integration_code/dist/source.js" | |
LABEL io.airbyte.name=airbyte/source-stock-ticker-api | |
LABEL io.airbyte.version=0.1.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"name": "source-stock-ticker-api", | |
"version": "1.0.0", | |
"description": "This is the repository for the Stock Ticker Api source connector. For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/stock-ticker-api).", | |
"scripts": { | |
"build": "tsc", | |
"watch": "tsc --watch", | |
"test": "echo \"Error: no test specified\" && exit 1" | |
}, | |
"author": "", | |
"license": "ISC", | |
"dependencies": { | |
"axios": "0.26.1", | |
"commander": "9.1.0" | |
}, | |
"devDependencies": { | |
"@types/node": "17.0.23", | |
"typescript": "4.6.3" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
import path from "path"; | |
import { Command } from "commander"; | |
import axios from "axios"; | |
import { | |
readJsonFile, | |
emit, | |
dateString, | |
log, | |
AirbyteConnectionStatus, | |
} from "./utils.js"; | |
import { config } from "process"; | |
export type SourceStockTickerConfig = { | |
stock_ticker: string; | |
api_key: string; | |
}; | |
export type SyncMode = "full_refresh" | "incremental"; | |
export type SourceStockTickerConfiguredCatalog = { | |
streams: Array<{ | |
sync_mode: SyncMode; | |
stream: { | |
name: string; | |
supported_sync_modes: Array<SyncMode>; | |
json_schema: { | |
properties: Record<string, Record<string, { type: string }>>; | |
}; | |
}; | |
}>; | |
}; | |
class SourceStockTicker { | |
async spec() { | |
const specification = readJsonFile( | |
path.join(__dirname, "..", "data", "spec.json") | |
); | |
const airbyteMessage = { type: "SPEC" as const, spec: specification }; | |
emit(airbyteMessage); | |
} | |
async discover() { | |
const catalog = readJsonFile( | |
path.join(__dirname, "..", "data", "catalog.json") | |
); | |
const airbyteMessage = { type: "CATALOG" as const, catalog }; | |
emit(airbyteMessage); | |
} | |
async check(args: Record<string, any>) { | |
let connectionStatus: AirbyteConnectionStatus; | |
try { | |
const { api_key, stock_ticker } = this.loadConfig(args); | |
const response = await this._callApi(api_key, stock_ticker); | |
if (response.status === 200) { | |
connectionStatus = { status: "SUCCEEDED" }; | |
} else if (response.status === 403) { | |
connectionStatus = { | |
status: "FAILED", | |
message: "API Key is incorrect.", | |
}; | |
} else { | |
connectionStatus = { | |
status: "FAILED", | |
message: | |
"Input configuration is incorrect. Please verify the input stock ticker and API key.", | |
}; | |
} | |
} catch (error) { | |
connectionStatus = { | |
status: "FAILED", | |
message: String(error), | |
}; | |
} | |
const output = { type: "CONNECTION_STATUS" as const, connectionStatus }; | |
emit(output); | |
} | |
async read(args: Record<string, any>) { | |
const { api_key, stock_ticker } = this.loadConfig(args); | |
if (!api_key || !stock_ticker) return log(`no api_key in config`, true); | |
const configuredCatalog = this.loadConfiguredCatalog(args); | |
const stockPricesStream = configuredCatalog.streams.find( | |
(s) => s?.stream?.name === "stock_prices" | |
); | |
if (!stockPricesStream) { | |
log("no streams selected"); | |
return; | |
} | |
if (stockPricesStream.sync_mode !== "full_refresh") { | |
return log("only full_refresh supported (for now)", true); | |
} | |
try { | |
const response = await this._callApi(api_key, stock_ticker); | |
for (const result of response?.data?.results) { | |
const message = { | |
type: "RECORD" as const, | |
record: { | |
stream: "stock_prices", | |
data: { | |
date: dateString(new Date(result.t)), | |
stock_ticker, | |
price: result.c as number, | |
}, | |
emitted_at: Date.now(), | |
}, | |
}; | |
emit(message); | |
} | |
} catch (error) { | |
log(error, true); | |
} | |
} | |
private async _callApi(apiKey: string, stock_ticker: string) { | |
const end = dateString(); | |
const start = dateString(new Date(Date.now() - 7 * 24 * 60 * 60 * 1000)); | |
const url = `https://api.polygon.io/v2/aggs/ticker/${stock_ticker}/range/1/day/${start}/${end}?sort=asc&limit=120&apiKey=${apiKey}`; | |
return axios.get(url, { validateStatus: () => true }); | |
} | |
private loadConfig(args: { config?: string }) { | |
if (args.config) { | |
const config: SourceStockTickerConfig = readJsonFile(args.config); | |
return config; | |
} else { | |
log("config is required", true); | |
} | |
} | |
private loadConfiguredCatalog(args: { catalog?: string }) { | |
if (args.catalog) { | |
const catalog: SourceStockTickerConfiguredCatalog = readJsonFile( | |
args.catalog | |
); | |
return catalog; | |
} else { | |
log("catalog is required", true); | |
} | |
} | |
} | |
// The interface for the CLI commands | |
const program = new Command(); | |
const packageJSON = readJsonFile(path.join(__dirname, "..", "package.json")); | |
const source = new SourceStockTicker(); | |
program | |
.name(packageJSON.name) | |
.description(packageJSON.description) | |
.version(packageJSON.version); | |
program | |
.command("spec") | |
.description("display the Airbyte spec") | |
.action(() => source.spec()); | |
program | |
.command("discover") | |
.description("display the Airbyte catalog") | |
.option("-c, --config <file>", "path to config file") | |
.action(() => source.discover()); | |
program | |
.command("check") | |
.description("check the Airbyte against a config") | |
.option("-c, --config [file]", "path to config file") | |
.action((args) => source.check(args)); | |
program | |
.command("read") | |
.description("get the data") | |
.option("-c, --config [file]", "path to config file") | |
.option("-a, --catalog [file]", "path to catalog file") | |
.action((args) => source.read(args)); | |
program.parse(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs from "fs"; | |
export type AirbyteMessageType = | |
| "LOG" | |
| "SPEC" | |
| "CATALOG" | |
| "CONNECTION_STATUS" | |
| "RECORD"; | |
export type AirbyteConnectionStatus = { | |
status: string; | |
message?: string; | |
catalog?: string; | |
spec?: Record<string, any>; | |
}; | |
export type AirbyteMessage = { | |
type: AirbyteMessageType; | |
log?: string; | |
connectionStatus?: AirbyteConnectionStatus; | |
record?: Record<string, any>; | |
}; | |
export const readJsonFile = (filename: string) => { | |
try { | |
const body = fs.readFileSync(filename).toString(); | |
return JSON.parse(body); | |
} catch (error) { | |
log(error, true); | |
} | |
}; | |
export const emit = (line: AirbyteMessage) => { | |
console.log(JSON.stringify(line)); | |
}; | |
export const log = (message: string, fatal = false) => { | |
emit({ type: "LOG", log: message }); | |
if (fatal) process.exit(1); | |
}; | |
export const dateString = (d = new Date()) => { | |
return d.toISOString().slice(0, 10); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment