Last active
August 20, 2024 13:01
-
-
Save EitanBlumin/30f51ee8c59791334a8126dc8662f585 to your computer and use it in GitHub Desktop.
Asynchronous Triggers Using Service Broker (more info: https://eitanblumin.com/2018/10/31/advanced-service-broker-sample-asynchronous-triggers/ )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
USE AdventureWorks2008R2 | |
GO | |
SET ANSI_NULLS ON | |
GO | |
SET QUOTED_IDENTIFIER ON | |
GO | |
IF OBJECT_ID('Purchasing.usp_AT_uPurchaseOrderDetail', 'P') IS NOT NULL | |
DROP PROCEDURE Purchasing.usp_AT_uPurchaseOrderDetail; | |
GO | |
CREATE PROCEDURE Purchasing.usp_AT_uPurchaseOrderDetail | |
@inserted XML, | |
@deleted XML = NULL | |
AS | |
SET NOCOUNT ON; | |
BEGIN TRY | |
IF EXISTS ( SELECT NULL FROM @inserted.nodes('inserted/row') AS T(X) ) | |
BEGIN | |
-- Insert record into TransactionHistory | |
INSERT INTO [Production].[TransactionHistory] | |
([ProductID] | |
,[ReferenceOrderID] | |
,[ReferenceOrderLineID] | |
,[TransactionType] | |
,[TransactionDate] | |
,[Quantity] | |
,[ActualCost]) | |
SELECT | |
inserted.[ProductID] | |
,inserted.[PurchaseOrderID] | |
,inserted.[PurchaseOrderDetailID] | |
,'P' | |
,GETDATE() | |
,inserted.[OrderQty] | |
,inserted.[UnitPrice] | |
FROM | |
( | |
SELECT | |
X.query('.').value('(row/PurchaseOrderID)[1]', 'int') AS PurchaseOrderID | |
, X.query('.').value('(row/PurchaseOrderDetailID)[1]', 'int') AS PurchaseOrderDetailID | |
, X.query('.').value('(row/ProductID)[1]', 'int') AS ProductID | |
, X.query('.').value('(row/OrderQty)[1]', 'smallint') AS OrderQty | |
, X.query('.').value('(row/UnitPrice)[1]', 'money') AS UnitPrice | |
FROM @inserted.nodes('inserted/row') AS T(X) | |
) AS inserted | |
INNER JOIN [Purchasing].[PurchaseOrderDetail] | |
ON inserted.[PurchaseOrderID] = [Purchasing].[PurchaseOrderDetail].[PurchaseOrderID]; | |
-- Update SubTotal in PurchaseOrderHeader record. Note that this causes the | |
-- PurchaseOrderHeader trigger to fire which will update the RevisionNumber. | |
UPDATE [Purchasing].[PurchaseOrderHeader] | |
SET [Purchasing].[PurchaseOrderHeader].[SubTotal] = | |
(SELECT SUM([Purchasing].[PurchaseOrderDetail].[LineTotal]) | |
FROM [Purchasing].[PurchaseOrderDetail] | |
WHERE [Purchasing].[PurchaseOrderHeader].[PurchaseOrderID] | |
= [Purchasing].[PurchaseOrderDetail].[PurchaseOrderID]) | |
WHERE [Purchasing].[PurchaseOrderHeader].[PurchaseOrderID] | |
IN ( | |
SELECT inserted.[PurchaseOrderID] | |
FROM ( | |
SELECT | |
X.query('.').value('(row/PurchaseOrderID)[1]', 'int') AS PurchaseOrderID | |
FROM @inserted.nodes('inserted/row') AS T(X) | |
) AS inserted | |
); | |
UPDATE [Purchasing].[PurchaseOrderDetail] | |
SET [Purchasing].[PurchaseOrderDetail].[ModifiedDate] = GETDATE() | |
FROM ( | |
SELECT | |
X.query('.').value('(row/PurchaseOrderID)[1]', 'int') AS PurchaseOrderID | |
, X.query('.').value('(row/PurchaseOrderDetailID)[1]', 'int') AS PurchaseOrderDetailID | |
FROM @inserted.nodes('inserted/row') AS T(X) | |
) AS inserted | |
WHERE inserted.[PurchaseOrderID] = [Purchasing].[PurchaseOrderDetail].[PurchaseOrderID] | |
AND inserted.[PurchaseOrderDetailID] = [Purchasing].[PurchaseOrderDetail].[PurchaseOrderDetailID]; | |
END; | |
END TRY | |
BEGIN CATCH | |
-- Since we're in an Asynchronous Trigger, rolling back an update operation | |
-- is a lot more complicated than in a regular trigger. | |
-- For now, for this scenario we'll take the risk of having partial data. | |
EXECUTE [dbo].[uspLogError]; | |
END CATCH; | |
GO | |
ALTER TRIGGER [Purchasing].[uPurchaseOrderDetail] ON [Purchasing].[PurchaseOrderDetail] | |
AFTER UPDATE AS | |
BEGIN | |
DECLARE @Count int; | |
SET @Count = @@ROWCOUNT; | |
IF @Count = 0 | |
RETURN; | |
SET NOCOUNT ON; | |
BEGIN TRY | |
IF UPDATE([ProductID]) OR UPDATE([OrderQty]) OR UPDATE([UnitPrice]) | |
BEGIN | |
DECLARE | |
@inserted XML, | |
@deleted XML; | |
SELECT @inserted = | |
( SELECT * FROM inserted FOR XML PATH('row'), ROOT('inserted') ); | |
SELECT @deleted = | |
( SELECT * FROM deleted FOR XML PATH('row'), ROOT('deleted') ); | |
EXECUTE SB_AT_Fire_Trigger 'Purchasing.usp_AT_uPurchaseOrderDetail', @inserted, @deleted; | |
END; | |
END TRY | |
BEGIN CATCH | |
EXECUTE [dbo].[uspPrintError]; | |
-- Rollback any active or uncommittable transactions before | |
-- inserting information in the ErrorLog | |
IF @@TRANCOUNT > 0 | |
BEGIN | |
ROLLBACK TRANSACTION; | |
END | |
EXECUTE [dbo].[uspLogError]; | |
END CATCH; | |
END; | |
GO | |
/* | |
==================================================== | |
Test script | |
==================================================== | |
*/ | |
-- See the data before the update | |
SELECT * | |
FROM Purchasing.PurchaseOrderDetail | |
WHERE PurchaseOrderID = 8 | |
-- Update the data without actually performing any change | |
UPDATE | |
Purchasing.PurchaseOrderDetail | |
SET ProductID = ProductID | |
WHERE PurchaseOrderID = 8 | |
-- Wait 5 seconds | |
WAITFOR DELAY '00:00:05'; | |
-- See the updated data (ModifiedDate should be updated) | |
SELECT * | |
FROM Purchasing.PurchaseOrderDetail | |
WHERE PurchaseOrderID = 8 | |
GO | |
SELECT * | |
FROM SB_AT_ServiceBrokerLogs | |
SELECT * | |
FROM sys.conversation_endpoints | |
/* -- cleanup closed conversations (SQL Server eventually does this automatically) | |
declare @q uniqueidentifier; | |
select @q = conversation_handle from sys.conversation_endpoints where state='CD'; | |
end conversation @q with cleanup | |
*/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
=========================================================================== | |
Service Broker Sample 2: Asynchronous Triggers | |
=========================================================================== | |
This script creates a set of objects that allow | |
the easy implementation and usage of Asynchronous Triggers. | |
Usage: | |
1. Run this script on the database where you want the asynchronous trigger(s). | |
2. Create a stored procedure which will receive the following two parameters: | |
@inserted XML, | |
@deleted XML | |
This procedure will be responsible for parsing the inserted/deleted data | |
and executing the actual trigger logic based on that data. | |
3. Inside the actual table trigger, use the following code: | |
DECLARE | |
@inserted XML, | |
@deleted XML; | |
SELECT @inserted = | |
( SELECT * FROM inserted FOR XML PATH('row'), ROOT('inserted') ); | |
SELECT @deleted = | |
( SELECT * FROM deleted FOR XML PATH('row'), ROOT('deleted') ); | |
EXECUTE SB_AT_Fire_Trigger '{YourProcedureName}', @inserted, @deleted; | |
But replace {YourProcedureName} with the name of the procedure you've | |
created in step 2. | |
You may review script SB_AT_Sample for an AdventureWorks2008R2 example. | |
=========================================================================== | |
Copyright: Eitan Blumin (C) 2013 | |
Email: [email protected] | |
Source: www.madeira.co.il | |
Disclaimer: | |
The author is not responsible for any damage this | |
script or any of its variations may cause. | |
Do not execute it or any variations of it on production | |
environments without first verifying its validity | |
on controlled testing and/or QA environments. | |
You may use this script at your own risk and may change it | |
to your liking, as long as you leave this disclaimer header | |
fully intact and unchanged. | |
=========================================================================== | |
*/ | |
-- Creation of the table to hold SB logs | |
IF OBJECT_ID('SB_AT_ServiceBrokerLogs') IS NULL | |
BEGIN | |
CREATE TABLE SB_AT_ServiceBrokerLogs | |
( | |
LogID BIGINT IDENTITY(1,1) NOT NULL, | |
LogDate DATETIME NOT NULL DEFAULT (GETDATE()), | |
SPID INT NOT NULL DEFAULT (@@SPID), | |
ProgramName NVARCHAR(255) NOT NULL DEFAULT (APP_NAME()), | |
HostName NVARCHAR(255) NOT NULL DEFAULT (HOST_NAME()), | |
ErrorSeverity INT NOT NULL DEFAULT (0), | |
ErrorMessage NVARCHAR(MAX) NULL, | |
ErrorLine INT NULL, | |
ErrorProc SYSNAME NOT NULL DEFAULT (COALESCE(ERROR_PROCEDURE(),OBJECT_NAME(@@PROCID),'<unknown>')), | |
QueueMessage XML NULL, | |
PRIMARY KEY NONCLUSTERED (LogID) | |
); | |
CREATE CLUSTERED INDEX IX_SB_AT_ServiceBrokerLogs ON SB_AT_ServiceBrokerLogs (LogDate ASC) WITH FILLFACTOR=100; | |
PRINT 'Table SB_AT_ServiceBrokerLogs Created'; | |
END | |
ELSE | |
TRUNCATE TABLE SB_AT_ServiceBrokerLogs | |
GO | |
IF OBJECT_ID('SB_AT_HandleQueue') IS NOT NULL DROP PROCEDURE SB_AT_HandleQueue | |
RAISERROR(N'Creating SB_AT_HandleQueue...',0,0) WITH NOWAIT; | |
GO | |
-- This procedure is activated to handle each item in the Request queue | |
CREATE PROCEDURE SB_AT_HandleQueue | |
AS | |
SET NOCOUNT ON; | |
SET ARITHABORT ON | |
DECLARE @msg XML | |
DECLARE @DlgId UNIQUEIDENTIFIER | |
DECLARE @Info nvarchar(max) | |
DECLARE @ErrorsCount int | |
SET @ErrorsCount = 0 | |
-- Set whether to log verbose status messages before and after each operation | |
DECLARE @Verbose BIT = 1 | |
-- Allow 10 retries in case of service broker errors | |
WHILE @ErrorsCount < 10 | |
BEGIN | |
BEGIN TRANSACTION | |
BEGIN TRY | |
-- Make sure queue is active | |
IF EXISTS (SELECT NULL FROM sys.service_queues | |
WHERE NAME = 'SB_AT_Request_Queue' | |
AND is_receive_enabled = 0) | |
ALTER QUEUE SB_AT_Request_Queue WITH STATUS = ON; | |
-- handle one message at a time | |
WAITFOR | |
( | |
RECEIVE TOP(1) | |
@msg = convert(xml,message_body), | |
@DlgId = conversation_handle | |
FROM dbo.SB_AT_Request_Queue | |
); | |
-- exit when waiting has been timed out | |
IF @@ROWCOUNT = 0 | |
BEGIN | |
IF @@TRANCOUNT > 0 | |
ROLLBACK TRANSACTION; | |
BREAK; | |
END | |
-- Retreive data from xml message | |
DECLARE | |
@ProcedureName VARCHAR(1000), | |
@inserted XML, | |
@deleted XML | |
SELECT | |
@ProcedureName = x.value('(/Request/ProcedureName)[1]','VARCHAR(1000)'), | |
@inserted = x.query('/Request/inserted/inserted'), | |
@deleted = x.query('/Request/deleted/deleted') | |
FROM @msg.nodes('/Request') AS T(x); | |
-- Log operation start | |
IF @Verbose = 1 | |
INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,QueueMessage) | |
VALUES(0,'Starting Process',@msg); | |
-- Encapsulate execution in TRY..CATCH | |
-- to catch errors in the specific request | |
BEGIN TRY | |
-- Execute Request | |
EXEC @ProcedureName @inserted, @deleted; | |
END TRY | |
BEGIN CATCH | |
-- log operation fail | |
INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorLine,ErrorProc,QueueMessage) | |
VALUES(ERROR_SEVERITY(),ERROR_MESSAGE(),ERROR_LINE(),ERROR_PROCEDURE(),@msg); | |
END CATCH | |
-- commit | |
IF @@TRANCOUNT > 0 | |
COMMIT TRANSACTION; | |
-- Log operation end | |
IF @Verbose = 1 | |
INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) | |
VALUES(0,'Finished Process',OBJECT_NAME(@@PROCID),@msg); | |
-- Close dialogue | |
END CONVERSATION @DlgId; | |
-- reset xml message | |
SET @msg = NULL; | |
END TRY | |
BEGIN CATCH | |
-- rollback transaction | |
-- this will also rollback the extraction of the message from the queue | |
IF @@TRANCOUNT > 0 | |
ROLLBACK TRANSACTION; | |
-- log operation fail | |
INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorLine,ErrorProc,QueueMessage) | |
VALUES(ERROR_SEVERITY(),ERROR_MESSAGE(),ERROR_LINE(),ERROR_PROCEDURE(),@msg); | |
-- increase error counter | |
SET @ErrorsCount = @ErrorsCount + 1; | |
-- wait 5 seconds before retrying | |
WAITFOR DELAY '00:00:05' | |
END CATCH | |
END | |
GO | |
IF OBJECT_ID('SB_AT_CloseDialogs') IS NOT NULL DROP PROCEDURE SB_AT_CloseDialogs | |
RAISERROR(N'Creating SB_AT_CloseDialogs...',0,0) WITH NOWAIT; | |
GO | |
-- This procedure is activated to handle each item in the Response queue | |
CREATE PROCEDURE SB_AT_CloseDialogs | |
AS | |
SET NOCOUNT ON; | |
SET ARITHABORT ON | |
DECLARE @MsgType SYSNAME | |
DECLARE @msg XML | |
DECLARE @DlgId UNIQUEIDENTIFIER | |
DECLARE @Info nvarchar(max) | |
DECLARE @ErrorsCount int | |
SET @ErrorsCount = 0 | |
-- Set whether to log verbose status messages before and after each operation | |
DECLARE @Verbose BIT = 0 | |
-- Allow 10 retries in case of service broker errors | |
WHILE @ErrorsCount < 10 | |
BEGIN | |
BEGIN TRY | |
-- Make sure queue is active | |
IF EXISTS (SELECT NULL FROM sys.service_queues | |
WHERE NAME = 'SB_AT_Response_Queue' | |
AND is_receive_enabled = 0) | |
ALTER QUEUE SB_AT_Response_Queue WITH STATUS = ON; | |
-- handle one message at a time | |
WAITFOR | |
( | |
RECEIVE TOP(1) | |
@msg = CONVERT(xml, message_body), | |
@MsgType = message_type_name, | |
@DlgId = conversation_handle | |
FROM dbo.SB_AT_Response_Queue | |
); | |
-- exit when waiting has been timed out | |
IF @@ROWCOUNT = 0 | |
BREAK; | |
-- If message type is end dialog or error, end the conversation | |
IF (@MsgType = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' OR | |
@MsgType = N'http://schemas.microsoft.com/SQL/ServiceBroker/Error') | |
BEGIN | |
END CONVERSATION @DlgId; | |
IF @Verbose = 1 | |
INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) | |
VALUES(0,'Ended Conversation ' + CONVERT(nvarchar(max),@DlgId),OBJECT_NAME(@@PROCID),@msg); | |
END | |
ELSE IF @Verbose = 1 | |
INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorProc,QueueMessage) | |
VALUES(0,'Unknown Message from ' + CONVERT(nvarchar(max),@DlgId),OBJECT_NAME(@@PROCID),@msg); | |
-- reset variables | |
SET @MsgType = NULL; | |
SET @msg = NULL; | |
END TRY | |
BEGIN CATCH | |
-- log operation fail | |
INSERT INTO SB_AT_ServiceBrokerLogs(ErrorSeverity,ErrorMessage,ErrorLine,ErrorProc) | |
VALUES(ERROR_SEVERITY(),ERROR_MESSAGE(),ERROR_LINE(),ERROR_PROCEDURE()); | |
-- increase error counter | |
SET @ErrorsCount = @ErrorsCount + 1; | |
-- wait 5 seconds before retrying | |
WAITFOR DELAY '00:00:05' | |
END CATCH | |
END | |
GO | |
DECLARE @SQL nvarchar(max) | |
-- Enable service broker | |
IF EXISTS (SELECT * FROM sys.databases WHERE database_id = DB_ID() AND is_broker_enabled = 0) | |
BEGIN | |
SET @SQL = 'ALTER DATABASE [' + DB_NAME() + '] SET NEW_BROKER WITH ROLLBACK IMMEDIATE'; | |
EXEC(@SQL); | |
PRINT 'Enabled Service Broker for DB ' + DB_NAME(); | |
END | |
GO | |
-- Drop existing objects | |
IF EXISTS (SELECT NULL FROM sys.services WHERE NAME = '//SB_AT/ProcessReceivingService') | |
DROP SERVICE [//SB_AT/ProcessReceivingService]; | |
IF EXISTS (SELECT NULL FROM sys.services WHERE NAME = '//SB_AT/ProcessStartingService') | |
DROP SERVICE [//SB_AT/ProcessStartingService]; | |
IF EXISTS (SELECT NULL FROM sys.service_queues WHERE NAME = 'SB_AT_Request_Queue') | |
DROP QUEUE dbo.SB_AT_Request_Queue; | |
IF EXISTS (SELECT NULL FROM sys.service_queues WHERE NAME = 'SB_AT_Response_Queue') | |
DROP QUEUE dbo.SB_AT_Response_Queue; | |
IF EXISTS (SELECT NULL FROM sys.service_contracts WHERE NAME = '//SB_AT/Contract') | |
DROP CONTRACT [//SB_AT/Contract]; | |
IF EXISTS (SELECT NULL FROM sys.service_message_types WHERE name='//SB_AT/Message') | |
DROP MESSAGE TYPE [//SB_AT/Message]; | |
GO | |
-- Create service broker objects | |
RAISERROR(N'Creating Message Type...',0,0) WITH NOWAIT; | |
CREATE MESSAGE TYPE [//SB_AT/Message] | |
VALIDATION = WELL_FORMED_XML; | |
RAISERROR(N'Creating Contract...',0,0) WITH NOWAIT; | |
CREATE CONTRACT [//SB_AT/Contract] | |
([//SB_AT/Message] SENT BY ANY); | |
RAISERROR(N'Creating Response Queue...',0,0) WITH NOWAIT; | |
CREATE QUEUE dbo.SB_AT_Response_Queue | |
WITH STATUS=ON, | |
ACTIVATION ( | |
PROCEDURE_NAME = SB_AT_CloseDialogs, -- sproc to run when queue receives message | |
MAX_QUEUE_READERS = 10, -- max concurrent instances | |
EXECUTE AS SELF | |
); | |
RAISERROR(N'Creating Request Queue...',0,0) WITH NOWAIT; | |
CREATE QUEUE dbo.SB_AT_Request_Queue | |
WITH STATUS=ON, | |
ACTIVATION ( | |
PROCEDURE_NAME = SB_AT_HandleQueue, -- sproc to run when queue receives message | |
MAX_QUEUE_READERS = 10, -- max concurrent instances | |
EXECUTE AS SELF | |
); | |
RAISERROR(N'Creating Recieving Service...',0,0) WITH NOWAIT; | |
CREATE SERVICE [//SB_AT/ProcessReceivingService] | |
AUTHORIZATION dbo | |
ON QUEUE dbo.SB_AT_Request_Queue ([//SB_AT/Contract]); | |
RAISERROR(N'Creating Sending Service...',0,0) WITH NOWAIT; | |
CREATE SERVICE [//SB_AT/ProcessStartingService] | |
AUTHORIZATION dbo | |
ON QUEUE dbo.SB_AT_Response_Queue ([//SB_AT/Contract]); | |
GO | |
IF OBJECT_ID('SB_AT_Fire_Trigger') IS NOT NULL DROP PROCEDURE SB_AT_Fire_Trigger; | |
RAISERROR(N'Creating SB_AT_Fire_Trigger...',0,0) WITH NOWAIT; | |
GO | |
-- This procedure sends items to the queue for asynchronous triggers | |
CREATE PROCEDURE SB_AT_Fire_Trigger | |
@ProcedureName VARCHAR(1000), | |
@inserted XML = NULL, | |
@deleted XML = NULL | |
AS | |
SET NOCOUNT ON; | |
DECLARE @msg XML | |
-- build the XML message | |
SET @msg = (SELECT | |
ProcedureName = @ProcedureName, | |
inserted = @inserted, | |
deleted = @deleted | |
FOR XML PATH('Request')) | |
DECLARE @DlgId UNIQUEIDENTIFIER | |
BEGIN DIALOG @DlgId | |
FROM SERVICE [//SB_AT/ProcessStartingService] | |
TO SERVICE '//SB_AT/ProcessReceivingService', | |
'CURRENT DATABASE' | |
ON CONTRACT [//SB_AT/Contract] | |
WITH ENCRYPTION = OFF; | |
-- send the message | |
SEND ON CONVERSATION @DlgId | |
MESSAGE TYPE [//SB_AT/Message] (@msg); | |
PRINT N'Started SB_AT process on dialogId ' + ISNULL(convert(varchar(100),@DlgId),'(null)'); | |
GO |
This worked: use the sproc below instead of END CONVERSATION @handle
in your sproc SB_AT_HandleQueue
. I'm on SQL SERVER 2014.
create procedure SB_AT_EndConversation (
@handle uniqueidentifier
)
As
begin
declare @sql varchar(8000) = 'end conversation ''' + convert(varchar(max),@handle) + '''';
exec (@sql)
end
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi @EitanBlumin , sproc SB_AT_HandleQueue is erroring, not sure why? It has to do with that end conversation inside the try/catch.