Last active
January 29, 2023 15:20
-
-
Save tuespetre/0bf3a4c64cc92724416b8753282b79b2 to your computer and use it in GitHub Desktop.
An attempt at mimicking Postgresql's LISTEN/NOTIFY functionality in SQL Server, using Service Broker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Worker process: | |
EXEC [notifications].[Listen] N'something-happened', 5000 | |
// Some other process: | |
EXEC [notifications].[Notify] N'something-happened' |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE SCHEMA [notifications]; | |
GO | |
CREATE MESSAGE TYPE [notifications/Listen] VALIDATION = NONE; | |
CREATE MESSAGE TYPE [notifications/Notify] VALIDATION = NONE; | |
CREATE CONTRACT [notifications/Contract] | |
( | |
[notifications/Listen] SENT BY INITIATOR, | |
[notifications/Notify] SENT BY TARGET | |
); | |
CREATE QUEUE [notifications].[ListenerQueue] WITH | |
STATUS = ON, | |
RETENTION = OFF, | |
POISON_MESSAGE_HANDLING ( STATUS = OFF ); | |
CREATE QUEUE [notifications].[NotifierQueue] WITH | |
STATUS = ON, | |
RETENTION = OFF, | |
POISON_MESSAGE_HANDLING ( STATUS = ON ); | |
CREATE SERVICE [notifications/ListenerService] ON QUEUE [notifications].[ListenerQueue]; | |
CREATE SERVICE [notifications/NotifierService] ON QUEUE [notifications].[NotifierQueue] ([notifications/Contract]); | |
CREATE TABLE [notifications].[Listeners] | |
( | |
[ListenerId] int NOT NULL IDENTITY(1,1), | |
[ConnectionId] uniqueidentifier NOT NULL, | |
[Channel] sysname NOT NULL, | |
CONSTRAINT PK_Listeners PRIMARY KEY ([ListenerId]), | |
CONSTRAINT AK_Listeners UNIQUE ([ConnectionId], [Channel]) | |
); | |
CREATE TABLE [notifications].[ListenerHandles] | |
( | |
[ListenerId] int NOT NULL, | |
[ListenerHandle] uniqueidentifier NOT NULL, | |
CONSTRAINT PK_ListenerHandles PRIMARY KEY ([ListenerId]), | |
CONSTRAINT AK_ListenerHandles UNIQUE ([ListenerHandle]) | |
); | |
CREATE TABLE [notifications].[NotifierHandles] | |
( | |
[ListenerId] int NOT NULL, | |
[NotifierHandle] uniqueidentifier NOT NULL, | |
CONSTRAINT PK_NotifierHandles PRIMARY KEY ([ListenerId]), | |
CONSTRAINT AK_NotifierHandles UNIQUE ([NotifierHandle]) | |
); | |
GO | |
CREATE PROCEDURE [notifications].[ActivationProcedure] | |
WITH EXECUTE AS OWNER | |
AS | |
BEGIN | |
DECLARE @messageType sysname; | |
DECLARE @messageBody varbinary(max); | |
DECLARE @notifierHandle uniqueidentifier; | |
DECLARE @listenerId int; | |
BEGIN TRY | |
BEGIN TRANSACTION; | |
RECEIVE TOP(1) | |
@messageType = [message_type_name], | |
@messageBody = [message_body], | |
@notifierHandle = [conversation_handle] | |
FROM [notifications].[NotifierQueue]; | |
IF @messageType = N'notifications/Listen' | |
BEGIN | |
INSERT INTO [notifications].[NotifierHandles] ([ListenerId], [NotifierHandle]) | |
VALUES (CONVERT(int, @messageBody), @notifierHandle); | |
BEGIN CONVERSATION TIMER (@notifierHandle) TIMEOUT = 3600; | |
END | |
ELSE IF @messageType = N'http://schemas.microsoft.com/SQL/ServiceBroker/DialogTimer' | |
BEGIN | |
SET @listenerId = | |
( | |
SELECT [ListenerId] | |
FROM [notifications].[NotifierHandles] | |
WHERE [NotifierHandle] = @notifierHandle | |
); | |
IF NOT EXISTS | |
( | |
SELECT 1 | |
FROM [sys].[dm_exec_connections] c | |
JOIN [notifications].[Listeners] l | |
ON l.[ConnectionId] = c.[connection_id] | |
WHERE l.[ListenerId] = @listenerId | |
) | |
BEGIN | |
END CONVERSATION @notifierHandle; | |
DECLARE @listenerHandle uniqueidentifier = | |
( | |
SELECT [ListenerHandle] | |
FROM [notifications].[ListenerHandles] | |
WHERE [ListenerId] = @listenerId | |
); | |
IF (@listenerHandle IS NOT NULL) | |
BEGIN | |
END CONVERSATION @listenerHandle; | |
END | |
DELETE [notifications].[Listeners] WHERE [ListenerId] = @listenerId; | |
DELETE [notifications].[ListenerHandles] WHERE [ListenerId] = @listenerId; | |
DELETE [notifications].[NotifierHandles] WHERE [ListenerId] = @listenerId; | |
END | |
ELSE | |
BEGIN | |
BEGIN CONVERSATION TIMER (@notifierHandle) TIMEOUT = 3600; | |
END | |
END | |
ELSE IF @messageType IN | |
( | |
N'http://schemas.microsoft.com/SQL/ServiceBroker/Error', | |
N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' | |
) | |
BEGIN | |
END CONVERSATION @notifierHandle; | |
SET @listenerId = | |
( | |
SELECT [ListenerId] | |
FROM [notifications].[NotifierHandles] | |
WHERE [NotifierHandle] = @notifierHandle | |
); | |
DELETE [notifications].[Listeners] WHERE [ListenerId] = @listenerId; | |
DELETE [notifications].[ListenerHandles] WHERE [ListenerId] = @listenerId; | |
DELETE [notifications].[NotifierHandles] WHERE [ListenerId] = @listenerId; | |
END | |
COMMIT TRANSACTION | |
END TRY | |
BEGIN CATCH | |
IF (XACT_STATE() <> 0) | |
BEGIN | |
ROLLBACK TRANSACTION | |
END | |
END CATCH | |
END | |
GO | |
ALTER QUEUE [notifications].[NotifierQueue] WITH | |
STATUS = ON, | |
ACTIVATION | |
( | |
STATUS = ON, | |
MAX_QUEUE_READERS = 1, | |
PROCEDURE_NAME = [notifications].[ActivationProcedure], | |
EXECUTE AS OWNER | |
); | |
GO | |
CREATE PROCEDURE [notifications].[Notify] | |
@channel sysname, | |
@payload nvarchar(max) = '' | |
WITH EXECUTE AS OWNER | |
AS | |
BEGIN | |
DECLARE @send nvarchar(max) = | |
( | |
SELECT DISTINCT N'''' + CAST([NotifierHandle] as nvarchar(36)) + N'''' + ', ' | |
FROM [notifications].[Listeners] l WITH (READUNCOMMITTED) | |
JOIN [notifications].[NotifierHandles] nh | |
ON nh.[ListenerId] = l.[ListenerId] | |
WHERE @channel LIKE l.[Channel] | |
FOR XML PATH('') | |
); | |
IF (LEN(@send) > 0) | |
BEGIN | |
SET @send = SUBSTRING(@send, 0, LEN(@send)); | |
SET @send = N'SEND ON CONVERSATION (' + @send + N') MESSAGE TYPE [notifications/Notify] (@payload)'; | |
EXEC sp_executesql @send, N'@payload nvarchar(max)', @payload; | |
END | |
END | |
GO | |
CREATE PROCEDURE [notifications].[Listen] | |
@channel sysname, | |
@timeout int = -1 | |
WITH EXECUTE AS OWNER | |
AS | |
BEGIN | |
DECLARE @connectionId uniqueidentifier = | |
( | |
SELECT TOP(1) [connection_id] | |
FROM [sys].[dm_exec_connections] | |
WHERE [session_id] = @@SPID | |
); | |
DECLARE @listenerHandle uniqueidentifier = | |
( | |
SELECT TOP(1) lh.[ListenerHandle] | |
FROM [notifications].[Listeners] l | |
JOIN [notifications].[ListenerHandles] lh | |
ON lh.[ListenerId] = l.[ListenerId] | |
WHERE l.[ConnectionId] = @connectionId | |
AND l.[Channel] = @channel | |
); | |
IF (@listenerHandle IS NULL) | |
BEGIN | |
BEGIN TRY | |
BEGIN TRANSACTION | |
BEGIN DIALOG CONVERSATION @listenerHandle | |
FROM SERVICE [notifications/ListenerService] | |
TO SERVICE N'notifications/NotifierService' | |
ON CONTRACT [notifications/Contract] | |
WITH ENCRYPTION = OFF; | |
INSERT INTO [notifications].[Listeners] (ConnectionId, Channel) | |
VALUES (@connectionId, @channel); | |
DECLARE @listenerId int = SCOPE_IDENTITY(); | |
INSERT INTO [notifications].[ListenerHandles] (ListenerId, ListenerHandle) | |
VALUES (@listenerId, @listenerHandle); | |
SEND ON CONVERSATION @listenerHandle MESSAGE TYPE [notifications/Listen] (@listenerId); | |
COMMIT TRANSACTION | |
END TRY | |
BEGIN CATCH | |
ROLLBACK TRANSACTION | |
RETURN | |
END CATCH | |
END | |
WAITFOR | |
( | |
RECEIVE CONVERT(nvarchar(max), [message_body]) AS Payload | |
FROM [notifications].[ListenerQueue] | |
WHERE conversation_handle = @listenerHandle | |
), | |
TIMEOUT @timeout; | |
END | |
GO |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment