Skip to content

Instantly share code, notes, and snippets.

@cablehead
Created October 20, 2025 21:22
Show Gist options
  • Save cablehead/b853c7f6bea96fab49a66c0352f10574 to your computer and use it in GitHub Desktop.
Save cablehead/b853c7f6bea96fab49a66c0352f10574 to your computer and use it in GitHub Desktop.
xs.nu - Nushell wrapper for cross.stream using http builtins with Unix socket and TCP support
export const XS_CONTEXT_SYSTEM = "0000000000000000000000000"
def and-then [next: closure --else: closure] {
if ($in | is-not-empty) { do $next } else {
if $else != null { do $else }
}
}
def or-else [or_else: closure] {
if ($in | is-not-empty) { $in } else { do $or_else }
}
def conditional-pipe [
condition: bool
action: closure
] {
if $condition { do $action } else { }
}
export def xs-addr [] {
$env | get XS_ADDR? | or-else { try { open ~/.config/cross.stream/XS_ADDR | str trim | path expand } } | or-else { "~/.local/share/cross.stream/store" | path expand }
}
# Build HTTP request with proper socket/TCP handling
def http-request [
method: string
path: string
--body: any
--headers: record = {}
] {
let addr = xs-addr
# Determine if using Unix socket or TCP
let is_tcp = ($addr | str starts-with ":")
let url = if $is_tcp {
$"http://localhost($addr)($path)"
} else {
$"http://localhost($path)"
}
match [$method, $is_tcp, ($headers | is-empty)] {
# GET requests
["GET", true, true] => { http get $url }
["GET", true, false] => { http get --headers $headers $url }
["GET", false, true] => { http get --unix-socket ($addr | path join "sock") $url }
["GET", false, false] => { http get --unix-socket ($addr | path join "sock") --headers $headers $url }
# POST requests
["POST", true, true] => { http post $url ($body | default "") }
["POST", true, false] => { http post --headers $headers $url ($body | default "") }
["POST", false, true] => { http post --unix-socket ($addr | path join "sock") $url ($body | default "") }
["POST", false, false] => { http post --unix-socket ($addr | path join "sock") --headers $headers $url ($body | default "") }
# DELETE requests
["DELETE", true, true] => { http delete $url }
["DELETE", true, false] => { http delete --headers $headers $url }
["DELETE", false, true] => { http delete --unix-socket ($addr | path join "sock") $url }
["DELETE", false, false] => { http delete --unix-socket ($addr | path join "sock") --headers $headers $url }
}
}
export def xs-context-collect [] {
_cat {context: $XS_CONTEXT_SYSTEM} | reduce --fold {} {|frame acc|
match $frame.topic {
"xs.context" => ($acc | insert $frame.id $frame.meta?.name?)
"xs.annotate" => (
if $frame.meta?.updates? in $acc {
$acc | update $frame.meta.updates $frame.meta?.name?
} else {
$acc
}
)
_ => $acc
}
} | transpose id name | prepend {
id: $XS_CONTEXT_SYSTEM
name: "system"
}
}
export def xs-context [selected?: string span?] {
if $selected == null {
return ($env | get XS_CONTEXT?)
}
xs-context-collect | where id == $selected or name == $selected | try { first | get id } catch {
if $span != null {
error make {
msg: $"context not found: ($selected)"
label: {text: "provided span" span: $span}
}
} else {
error make -u {msg: $"context not found: ($selected)"}
}
}
}
def _cat [options: record] {
let query_params = [
(if ($options | get follow? | default false) { "follow=true" })
(if ($options | get tail? | default false) { "tail=true" })
(if ($options | get all? | default false) { "all=true" })
(if $options.last_id? != null { $"last-id=($options.last_id)" })
(if $options.limit? != null { $"limit=($options.limit)" })
(if $options.pulse? != null { $"pulse=($options.pulse)" })
(if $options.context? != null { $"context-id=($options.context)" })
(if $options.topic? != null { $"topic=($options.topic)" })
] | compact | str join "&"
let path = if ($query_params | is-empty) { "/" } else { $"/?($query_params)" }
let headers = if ($options | get follow? | default false) {
{"Accept": "text/event-stream"}
} else {
{}
}
http-request "GET" $path --headers $headers | lines | each {|x| $x | from json }
}
export def .cat [
--follow (-f) # long poll for new events
--pulse (-p): int # specifies the interval (in milliseconds) to receive a synthetic "xs.pulse" event
--tail (-t) # begin long after the end of the stream
--detail (-d) # include all frame fields in the output
--last-id (-l): string
--limit: int
--context (-c): string # the context to read from
--all (-a) # cat across all contexts
--topic (-T): string # filter by topic
] {
_cat {
follow: $follow
pulse: $pulse
tail: $tail
last_id: $last_id
limit: $limit
context: (if not $all { (xs-context $context (metadata $context).span) })
all: $all
topic: $topic
} | conditional-pipe (not ($detail or $all)) { each { reject context_id ttl } }
}
def read_hash [hash?: any] {
match ($hash | describe -d | get type) {
"string" => $hash
"record" => ($hash | get hash?)
_ => null
}
}
export def .cas [hash?: any] {
let alt = $in
let hash = read_hash (if $hash != null { $hash } else { $alt })
if $hash == null { return }
http-request "GET" $"/cas/($hash)"
}
export def .get [id: string] {
http-request "GET" $"/($id)"
}
export def .head [
topic: string
--follow (-f)
--context (-c): string
] {
let query_params = [
(if $follow { "follow=true" })
(xs-context $context (metadata $context).span | and-then { $"context=($in)" })
] | compact | str join "&"
let path = if ($query_params | is-empty) {
$"/head/($topic)"
} else {
$"/head/($topic)?($query_params)"
}
let headers = if $follow {
{"Accept": "text/event-stream"}
} else {
{}
}
if $follow {
http-request "GET" $path --headers $headers | lines | each {|x| $x | from json }
} else {
http-request "GET" $path --headers $headers
}
}
# Append an event to the stream
export def .append [
topic: string # The topic to append the event to
--meta: record # Optional metadata to include with the event, provided as a record
--context (-c): string # the context to append to
--ttl: string # Optional Time-To-Live for the event. Supported formats:
# - "forever": The event is kept indefinitely.
# - "ephemeral": The event is not stored; only active subscribers can see it.
# - "time:<milliseconds>": The event is kept for a custom duration in milliseconds.
# - "head:<n>": Retains only the last n events for the topic (n must be >= 1).
] {
let body = $in
let query_params = [
(if $ttl != null { $"ttl=($ttl)" })
(xs-context $context (metadata $context).span | and-then { $"context=($in)" })
] | compact | str join "&"
let path = if ($query_params | is-empty) {
$"/($topic)"
} else {
$"/($topic)?($query_params)"
}
let headers = if $meta != null {
{"xs-meta": ($meta | to json -r | encode base64)}
} else {
{}
}
http-request "POST" $path --body $body --headers $headers
}
export def .remove [id: string] {
http-request "DELETE" $"/($id)"
}
export alias .rm = .remove
export def ".ctx" [
--detail (-d) # return a record with id and name fields
] {
let id = xs-context | or-else { $XS_CONTEXT_SYSTEM }
let name = xs-context-collect | where id == $id | get name.0
if $detail {
{id: $id} | if $name != null { insert name $name } else { $in }
} else {
$name | default $id
}
}
export def ".ctx list" [] {
let active = .ctx -d | get id
xs-context-collect | insert active {
$in.id == $active
}
}
export alias ".ctx ls" = .ctx list
export def --env ".ctx switch" [id?: string] {
$env.XS_CONTEXT = $id | or-else { .ctx select }
.ctx --detail | get id
}
export def --env ".ctx new" [name: string] {
.append "xs.context" -c $XS_CONTEXT_SYSTEM --meta {name: $name} | .ctx switch $in.id
}
export def --env ".ctx rename" [id: string name: string] {
.append "xs.annotate" -c $XS_CONTEXT_SYSTEM --meta {
updates: (xs-context $id (metadata $id).span)
name: $name
}
}
export def --env ".ctx select" [] {
.ctx list | input list | get id
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment