Created
October 20, 2025 21:22
-
-
Save cablehead/b853c7f6bea96fab49a66c0352f10574 to your computer and use it in GitHub Desktop.
xs.nu - Nushell wrapper for cross.stream using http builtins with Unix socket and TCP support
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| export const XS_CONTEXT_SYSTEM = "0000000000000000000000000" | |
| def and-then [next: closure --else: closure] { | |
| if ($in | is-not-empty) { do $next } else { | |
| if $else != null { do $else } | |
| } | |
| } | |
| def or-else [or_else: closure] { | |
| if ($in | is-not-empty) { $in } else { do $or_else } | |
| } | |
| def conditional-pipe [ | |
| condition: bool | |
| action: closure | |
| ] { | |
| if $condition { do $action } else { } | |
| } | |
| export def xs-addr [] { | |
| $env | get XS_ADDR? | or-else { try { open ~/.config/cross.stream/XS_ADDR | str trim | path expand } } | or-else { "~/.local/share/cross.stream/store" | path expand } | |
| } | |
| # Build HTTP request with proper socket/TCP handling | |
| def http-request [ | |
| method: string | |
| path: string | |
| --body: any | |
| --headers: record = {} | |
| ] { | |
| let addr = xs-addr | |
| # Determine if using Unix socket or TCP | |
| let is_tcp = ($addr | str starts-with ":") | |
| let url = if $is_tcp { | |
| $"http://localhost($addr)($path)" | |
| } else { | |
| $"http://localhost($path)" | |
| } | |
| match [$method, $is_tcp, ($headers | is-empty)] { | |
| # GET requests | |
| ["GET", true, true] => { http get $url } | |
| ["GET", true, false] => { http get --headers $headers $url } | |
| ["GET", false, true] => { http get --unix-socket ($addr | path join "sock") $url } | |
| ["GET", false, false] => { http get --unix-socket ($addr | path join "sock") --headers $headers $url } | |
| # POST requests | |
| ["POST", true, true] => { http post $url ($body | default "") } | |
| ["POST", true, false] => { http post --headers $headers $url ($body | default "") } | |
| ["POST", false, true] => { http post --unix-socket ($addr | path join "sock") $url ($body | default "") } | |
| ["POST", false, false] => { http post --unix-socket ($addr | path join "sock") --headers $headers $url ($body | default "") } | |
| # DELETE requests | |
| ["DELETE", true, true] => { http delete $url } | |
| ["DELETE", true, false] => { http delete --headers $headers $url } | |
| ["DELETE", false, true] => { http delete --unix-socket ($addr | path join "sock") $url } | |
| ["DELETE", false, false] => { http delete --unix-socket ($addr | path join "sock") --headers $headers $url } | |
| } | |
| } | |
| export def xs-context-collect [] { | |
| _cat {context: $XS_CONTEXT_SYSTEM} | reduce --fold {} {|frame acc| | |
| match $frame.topic { | |
| "xs.context" => ($acc | insert $frame.id $frame.meta?.name?) | |
| "xs.annotate" => ( | |
| if $frame.meta?.updates? in $acc { | |
| $acc | update $frame.meta.updates $frame.meta?.name? | |
| } else { | |
| $acc | |
| } | |
| ) | |
| _ => $acc | |
| } | |
| } | transpose id name | prepend { | |
| id: $XS_CONTEXT_SYSTEM | |
| name: "system" | |
| } | |
| } | |
| export def xs-context [selected?: string span?] { | |
| if $selected == null { | |
| return ($env | get XS_CONTEXT?) | |
| } | |
| xs-context-collect | where id == $selected or name == $selected | try { first | get id } catch { | |
| if $span != null { | |
| error make { | |
| msg: $"context not found: ($selected)" | |
| label: {text: "provided span" span: $span} | |
| } | |
| } else { | |
| error make -u {msg: $"context not found: ($selected)"} | |
| } | |
| } | |
| } | |
| def _cat [options: record] { | |
| let query_params = [ | |
| (if ($options | get follow? | default false) { "follow=true" }) | |
| (if ($options | get tail? | default false) { "tail=true" }) | |
| (if ($options | get all? | default false) { "all=true" }) | |
| (if $options.last_id? != null { $"last-id=($options.last_id)" }) | |
| (if $options.limit? != null { $"limit=($options.limit)" }) | |
| (if $options.pulse? != null { $"pulse=($options.pulse)" }) | |
| (if $options.context? != null { $"context-id=($options.context)" }) | |
| (if $options.topic? != null { $"topic=($options.topic)" }) | |
| ] | compact | str join "&" | |
| let path = if ($query_params | is-empty) { "/" } else { $"/?($query_params)" } | |
| let headers = if ($options | get follow? | default false) { | |
| {"Accept": "text/event-stream"} | |
| } else { | |
| {} | |
| } | |
| http-request "GET" $path --headers $headers | lines | each {|x| $x | from json } | |
| } | |
| export def .cat [ | |
| --follow (-f) # long poll for new events | |
| --pulse (-p): int # specifies the interval (in milliseconds) to receive a synthetic "xs.pulse" event | |
| --tail (-t) # begin long after the end of the stream | |
| --detail (-d) # include all frame fields in the output | |
| --last-id (-l): string | |
| --limit: int | |
| --context (-c): string # the context to read from | |
| --all (-a) # cat across all contexts | |
| --topic (-T): string # filter by topic | |
| ] { | |
| _cat { | |
| follow: $follow | |
| pulse: $pulse | |
| tail: $tail | |
| last_id: $last_id | |
| limit: $limit | |
| context: (if not $all { (xs-context $context (metadata $context).span) }) | |
| all: $all | |
| topic: $topic | |
| } | conditional-pipe (not ($detail or $all)) { each { reject context_id ttl } } | |
| } | |
| def read_hash [hash?: any] { | |
| match ($hash | describe -d | get type) { | |
| "string" => $hash | |
| "record" => ($hash | get hash?) | |
| _ => null | |
| } | |
| } | |
| export def .cas [hash?: any] { | |
| let alt = $in | |
| let hash = read_hash (if $hash != null { $hash } else { $alt }) | |
| if $hash == null { return } | |
| http-request "GET" $"/cas/($hash)" | |
| } | |
| export def .get [id: string] { | |
| http-request "GET" $"/($id)" | |
| } | |
| export def .head [ | |
| topic: string | |
| --follow (-f) | |
| --context (-c): string | |
| ] { | |
| let query_params = [ | |
| (if $follow { "follow=true" }) | |
| (xs-context $context (metadata $context).span | and-then { $"context=($in)" }) | |
| ] | compact | str join "&" | |
| let path = if ($query_params | is-empty) { | |
| $"/head/($topic)" | |
| } else { | |
| $"/head/($topic)?($query_params)" | |
| } | |
| let headers = if $follow { | |
| {"Accept": "text/event-stream"} | |
| } else { | |
| {} | |
| } | |
| if $follow { | |
| http-request "GET" $path --headers $headers | lines | each {|x| $x | from json } | |
| } else { | |
| http-request "GET" $path --headers $headers | |
| } | |
| } | |
| # Append an event to the stream | |
| export def .append [ | |
| topic: string # The topic to append the event to | |
| --meta: record # Optional metadata to include with the event, provided as a record | |
| --context (-c): string # the context to append to | |
| --ttl: string # Optional Time-To-Live for the event. Supported formats: | |
| # - "forever": The event is kept indefinitely. | |
| # - "ephemeral": The event is not stored; only active subscribers can see it. | |
| # - "time:<milliseconds>": The event is kept for a custom duration in milliseconds. | |
| # - "head:<n>": Retains only the last n events for the topic (n must be >= 1). | |
| ] { | |
| let body = $in | |
| let query_params = [ | |
| (if $ttl != null { $"ttl=($ttl)" }) | |
| (xs-context $context (metadata $context).span | and-then { $"context=($in)" }) | |
| ] | compact | str join "&" | |
| let path = if ($query_params | is-empty) { | |
| $"/($topic)" | |
| } else { | |
| $"/($topic)?($query_params)" | |
| } | |
| let headers = if $meta != null { | |
| {"xs-meta": ($meta | to json -r | encode base64)} | |
| } else { | |
| {} | |
| } | |
| http-request "POST" $path --body $body --headers $headers | |
| } | |
| export def .remove [id: string] { | |
| http-request "DELETE" $"/($id)" | |
| } | |
| export alias .rm = .remove | |
| export def ".ctx" [ | |
| --detail (-d) # return a record with id and name fields | |
| ] { | |
| let id = xs-context | or-else { $XS_CONTEXT_SYSTEM } | |
| let name = xs-context-collect | where id == $id | get name.0 | |
| if $detail { | |
| {id: $id} | if $name != null { insert name $name } else { $in } | |
| } else { | |
| $name | default $id | |
| } | |
| } | |
| export def ".ctx list" [] { | |
| let active = .ctx -d | get id | |
| xs-context-collect | insert active { | |
| $in.id == $active | |
| } | |
| } | |
| export alias ".ctx ls" = .ctx list | |
| export def --env ".ctx switch" [id?: string] { | |
| $env.XS_CONTEXT = $id | or-else { .ctx select } | |
| .ctx --detail | get id | |
| } | |
| export def --env ".ctx new" [name: string] { | |
| .append "xs.context" -c $XS_CONTEXT_SYSTEM --meta {name: $name} | .ctx switch $in.id | |
| } | |
| export def --env ".ctx rename" [id: string name: string] { | |
| .append "xs.annotate" -c $XS_CONTEXT_SYSTEM --meta { | |
| updates: (xs-context $id (metadata $id).span) | |
| name: $name | |
| } | |
| } | |
| export def --env ".ctx select" [] { | |
| .ctx list | input list | get id | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment