Skip to content

Instantly share code, notes, and snippets.

@FikretHassan
Last active March 14, 2025 23:52
Show Gist options
  • Save FikretHassan/6129eb12dfcf0c7bf5a246d600afab0d to your computer and use it in GitHub Desktop.
Save FikretHassan/6129eb12dfcf0c7bf5a246d600afab0d to your computer and use it in GitHub Desktop.
GDScript Pubsub Singleton Example
extends Node
var topics = {} # Stores event subscribers { name, token, callback, unsubscribeOnExecute }
var published_topics = [] # Ordered list of published topics
var execution_counts = {} # Tracks how many times each topic's subscribers have run
var next_token = 1 # Auto-incrementing ID for subscription tokens
# Subscribe to an event with a unique token and optional auto-unsubscribe
func subscribe(topic: String, func_ref: Callable, options: Dictionary) -> String:
var run_if_already_published = options.get("runIfAlreadyPublished", false)
var unsubscribe_on_execute = options.get("unsubscribeOnExecute", false) # Auto-unsub after first execution
var token = str(next_token) # Generate a unique token
next_token += 1
if not topics.has(topic):
topics[topic] = []
topics[topic].append({ "name": topic, "token": token, "callback": func_ref, "unsubscribeOnExecute": unsubscribe_on_execute })
print("\nPubsub: [Subscribe] Subscribed to:", topic, " | Token:", token, " | runIfAlreadyPublished:", run_if_already_published, " | unsubscribeOnExecute:", unsubscribe_on_execute)
# If topic was already published, call the function immediately with stored data
if run_if_already_published:
for past_topic in published_topics:
if past_topic == topic:
print("Pubsub: [Executing] Running immediately due to previous publish:", topic)
func_ref.call(topic)
_track_execution(topic, token)
# Auto-unsubscribe if `unsubscribeOnExecute` is enabled
if unsubscribe_on_execute:
unsubscribe(topic, token)
return token # Return the token so it can be used for unsubscribing
# Publish an event, optionally passing data
func publish(topic: String, _data = null):
published_topics.append(topic) # Store topic in order of publication
print("\nPubsub: [Publish] Published:", topic, " | Data:", _data)
if topics.has(topic):
var to_remove = [] # Track subscribers to remove (for unsubscribeOnExecute)
for subscriber in topics[topic]:
print("Pubsub: [Executing] Running subscription for:", topic, " | Token:", subscriber["token"], " | Data:", _data)
subscriber["callback"].call(_data) # Call each subscriber with the data
_track_execution(topic, subscriber["token"])
# Auto-unsubscribe if `unsubscribeOnExecute` is enabled
if subscriber["unsubscribeOnExecute"]:
to_remove.append(subscriber["token"])
# Remove "unsubscribeOnExecute" subscriptions after loop
for token in to_remove:
unsubscribe(topic, token)
# Unsubscribe from an event using a token
func unsubscribe(topic: String, token: String):
if topics.has(topic):
topics[topic] = topics[topic].filter(func(sub): return sub["token"] != token)
print("\nPubsub: [Unsubscribe] Unsubscribed from:", topic, " | Token:", token)
# Remove topic entry if no more subscribers
if topics[topic].size() == 0:
topics.erase(topic)
print("[Unsubscribe] No more subscribers for topic:", topic)
# Tracks how many times a topic's subscribers have been executed
func _track_execution(topic: String, token: String):
if not execution_counts.has(token):
execution_counts[token] = 0
execution_counts[token] += 1
print("Execution Count: Topic:", topic, " | Token:", token, " | Total Executions:", execution_counts[token])
# Auto-remove published topics if needed
func clear_published_topic(topic: String):
if topic in published_topics:
published_topics.erase(topic)
print("[Cleanup] Cleared published topic:", topic)
func _ready():
# Example Usage Below
# Publishing an event
pubsub.publish("game_started", "Game has begun!")
# Subscribe with auto-unsubscribe after execution (unsubscribeOnExecute = true)
pubsub.subscribe("game_started", func(_data):
print("\nI can only do this once\n")
, { "runIfAlreadyPublished": true, "unsubscribeOnExecute": true })
# Subscribe normally (persists across multiple publishes)
# Store the token if unsubscribeOnExecute is set to false
# but you wish to unsubscribe at some point, using
# pubsub.unsubscribe("game_running", _token1)
var _token1 = pubsub.subscribe("game_running", func(_data):
print("\nI will run as many times as game_running is published\n")
, { "runIfAlreadyPublished": true, "unsubscribeOnExecute": false })
# You can have as many subscriptions to a topic as you wish
pubsub.subscribe("game_running", func(_data):
print("\nSecond subscription to game_running - I will only run once\n")
, { "runIfAlreadyPublished": true, "unsubscribeOnExecute": true })
# You can publish multiple times to rerun the subscription(s) functions
# Publish with 3-second intervals
await get_tree().create_timer(3).timeout
# The below should not rerun 'I can do this once' because we've unsubscribed
pubsub.publish("game_started", "this shouldnt trigger 'I can only do this once' again")
await get_tree().create_timer(3).timeout
pubsub.publish("game_running", "First time publishing this.")
await get_tree().create_timer(3).timeout
pubsub.publish("game_running", "Second time publishing this.")
print('\nTopics:', topics, '\nPublished Topics (Ordered):', published_topics)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment