Last active
December 11, 2022 14:46
-
-
Save bellbind/050f69714a968695f895d4996217dc7d to your computer and use it in GitHub Desktop.
[js-ipfs/libp2p] dial to read events from infinite sending from handler, then close read
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// wrapping libp2p stream (mplex/stream) | |
// - stream.source: AsyncIterable<Uint8Array> | |
// - stream.sink: (Iterable<Uint8Array> | AsyncIterable<Uint8Array>) => Promise<undefined> | |
// - stream.close, stream.closeRead, stream.closeWrite, stream.abort, stream.reset | |
const newQueue = () => { | |
const [gets, polls] = [[], []]; | |
const next = () => new Promise( | |
get => polls.length > 0 ? polls.shift()(get) : gets.push(get)); | |
const poll = () => new Promise( | |
poll => gets.length > 0 ? poll(gets.shift()) : polls.push(poll)); | |
const push = value => poll().then(get => get({value, done: false})); | |
const close = () => poll().then(get => get({done: true})); | |
return {[Symbol.asyncIterator]() {return this;}, next, push, close}; | |
} | |
const payload = (u8a, type = 0) => { | |
const ret = new Uint8Array(u8a.length + 1); | |
ret[0] = type; | |
ret.set(u8a, 1); | |
return ret; | |
} | |
export const newClosableStream = stream => { | |
const eventTarget = new EventTarget(); | |
let sinkFinished = false, sourceFinished = false; | |
// send to remote | |
const writeQueue = newQueue(); | |
const writing = async () => { | |
return stream.sink((async function* () { | |
let closed = false, finished = false; | |
while (!closed || !finished) { | |
const {done, value: {type, value}} = await writeQueue.next(); | |
if (type === "data") { | |
yield payload(value, 0); | |
} else if (type === "close") { | |
yield Uint8Array.from([1]); | |
closed = true; | |
} else if (type === "finished") { | |
yield Uint8Array.from([2]); | |
finished = true; | |
} | |
} | |
stream.closeWrite(); | |
//console.info("[stream.closeWrite()]"); | |
})()); | |
}; | |
const writingPromise = writing().catch(error => { | |
eventTarget.dispatchEvent(new CustomEvent("error", {detail: error})); | |
}); | |
// receive from remote | |
const readQueue = newQueue(); | |
let remoteClosed = false; | |
const reading = async () => { | |
for await (const bl of stream.source) { | |
if (sourceFinished) break; | |
const u8a = bl.slice(); | |
//console.log("type", u8a[0], u8a); | |
if (u8a[0] === 0) readQueue.push({type: "data", value: u8a.slice(1)}); | |
if (u8a[0] === 1) remoteClosed = true; | |
if (u8a[0] === 2) readQueue.push({type: "finished"}); | |
} | |
readQueue.push({type: "finished"}); | |
stream.closeRead(); | |
//console.info("[stream.closeRead()]"); | |
}; | |
const readingPromise = reading().catch(error => { | |
// (ipfs-0.65.0) may spawn `Error: Socket read timeout` | |
eventTarget.dispatchEvent(new CustomEvent("error", {detail: error})); | |
}); | |
// wrapped stream.source | |
const source = (async function* () { | |
for (;;) { | |
const {done, value: {type, value}} = await readQueue.next(); | |
if (type === "data") yield value; | |
if (type === "finished") break; | |
} | |
writeQueue.push({type: "close"}); | |
sourceFinished = true; | |
})(); | |
// wrapped stream.sink | |
const sink = async iter => { | |
for await (const value of iter) { | |
if (remoteClosed) break; | |
writeQueue.push({type: "data", value}); | |
} | |
writeQueue.push({type: "finished"}); | |
sinkFinished = true; | |
}; | |
// send close to read; | |
const closeRead = async () => { | |
writeQueue.push({type: "close"}); | |
sourceFinished = true; | |
}; | |
const closeWrite = async () => { | |
writeQueue.push({type: "finished"}); | |
sinkFinished = true; | |
}; | |
// wrapped stream | |
return Object.assign(eventTarget, { | |
source, sink, closeRead, closeWrite, | |
close() {return Promise.all([closeRead(), closeWrite()]);}, | |
reset() {return stream.reset();}, | |
abort(...args) {return stream.abort(...args);}, | |
}); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as fs from "node:fs"; | |
import * as IPFS from "ipfs-core"; | |
import {newClosableStream} from "./closable-stream.js"; | |
// setup two IPFS nodes | |
const repo1 = "./test-repo1", repo2 = "./test-repo2"; | |
fs.rmSync(repo1, {recursive: true, force: true}); | |
fs.rmSync(repo2, {recursive: true, force: true}); | |
const config = { | |
Addresses: { | |
Swarm: [ | |
"/ip4/0.0.0.0/tcp/0", | |
], | |
}, | |
}; | |
const node1 = await IPFS.create({ | |
repo: repo1, | |
config, | |
}); | |
const id1 = await node1.id(); | |
console.info("[node1 id]", id1.id.toJSON()); | |
console.info("[node1 address]", id1.addresses[0].toJSON()); | |
const node2 = await IPFS.create({ | |
repo: repo2, | |
config, | |
}); | |
const id2 = await node2.id(); | |
console.info("[node2 id]", id2.id.toJSON()); | |
console.info("[node2 address]", id2.addresses[0].toJSON()); | |
// connect and ping as p2p-circuit | |
await node2.swarm.connect(id1.addresses[0].toJSON()); | |
await node2.libp2p.ping(`/p2p/${id2.id.toJSON()}/p2p-circuit/p2p/${id1.id.toJSON()}`); | |
// handler: serve inifinitely | |
const protocol = "/example-protocol/1.0"; | |
{ | |
const handler = ({connection, stream}) => { | |
const cStream = newClosableStream(stream); | |
let count = 0; | |
cStream.sink((async function* () { | |
// while (true) { | |
for (let i = 0; i < 20; i++) { | |
// infinite stream | |
yield new TextEncoder().encode(`count: ${++count}`); | |
console.log(`[Served] ${count}`); | |
await new Promise(f => setTimeout(f, 50)); | |
} | |
})()); | |
}; | |
await node1.libp2p.handle(protocol, handler); | |
} | |
// dialer: accept several messages, then stop | |
{ | |
const stream = newClosableStream(await node2.libp2p.dialProtocol(`/p2p/${id1.id.toJSON()}`, protocol)); | |
stream.addEventListener("error", async ev => { | |
console.log(ev.detail); | |
}); | |
let i = 0; | |
for await (const bl of stream.source) { | |
console.log(new TextDecoder().decode(bl.slice().slice())); | |
if (++i === 10) break; | |
} | |
await stream.close(); | |
} | |
const sec = 45; | |
console.log(`[wait ${sec}sec for spawn Socket read timeout]`); | |
await new Promise(f => setTimeout(f, sec * 1000)); // > Socket read timeout | |
// reconnect after socket read timeout | |
console.log("[connect and ping again]"); | |
await node2.swarm.connect(id1.addresses[0].toJSON()); | |
await node2.libp2p.ping(`/p2p/${id2.id.toJSON()}/p2p-circuit/p2p/${id1.id.toJSON()}`); | |
{// dial again | |
const stream = newClosableStream(await node2.libp2p.dialProtocol(`/p2p/${id1.id.toJSON()}`, protocol)); | |
stream.addEventListener("error", async ev => { | |
console.log(ev.detail); | |
}); | |
let i = 0; | |
for await (const bl of stream.source) { | |
console.log(new TextDecoder().decode(bl.slice().slice())); | |
if (++i === 10) break; | |
} | |
await stream.close(); | |
} | |
console.log("[stop nodes]"); | |
await node1.stop(); | |
await node2.stop(); | |
console.log("[stopped]"); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"type": "module", | |
"dependencies": { | |
"ipfs": "^0.65.0" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
result: