Hi!
This blog will outline some high level aspects of implementing service broker, this is targeted at people who are already familiar with the basics of service broker, if you need the basics, check this tutorial:
http://msdn.microsoft.com/en-us/library/bb839483.aspx
Scenario:
You have a production system that is processing millions of records a month.
Reporting requires frequent complex reports to run on the metrics and also on the actual data stored.
The actual data payload is stored as XML data type. This means any reporting metrics on data payload must be done on a fact table of some sort.
Data must be archived for 5 years. (This will be around 500 million records)
Objectives:
Decouple the reporting from the production system
Ensure up to date synchronization from production to the reporting server
Running reports will not affect the production system
Solution:
- We will use service broker to define ONE message contract for all tables that need to be synced. Synchronization will be activated by a job which activates the SQL service broker process
- A message type per table is created
- Synchronization will work in batch mode, supporting running of 40 to 50 simultaneous SQL processes (Optimal for a quad core process = 48)
- After synchronization of the main transaction tables a job on the reporting server will update the FACTS table.
- The initial queue on the sender side will have a PROC activation configured on the queue. When the SQL job runs it will activate this job.
- We will have a service to Trigger the archiving process
- We will create a Route to the destination system
- Up to you how to use the Service Broker security JIn this sample I keep it simple, no encryption. You can use Asymmetric Encryption/security etc, not in scope of this article! In this example I USE A SQL USER MYUser and User, notice the service and route user names are swapped on either server J
The fact table is used to extract information from the XML data. This allows the data to be indexed and reports to run smooth.
Do you have a transaction database and need a custom reporting solution, without the complexity of log shipping or replication? Then look no further, SQL Service broker is the ideal tool to synchronize two databases, based on simple tables.
Source SQL Server
On this server you create a Service Broker Database to activate sending the data contracts payload. So for example you can create a database called SSBSender, this will manage the starting and spawning of processes, queues etc
Contract:
USE
[SSBSender]
GO
CREATE CONTRACT [ArchivingTables] AUTHORIZATION [dbo] ([CONF_AuditTrail] SENT BY ANY,
[Table_Error] SENT BY ANY,
[Table_Record] SENT BY ANY,
[Table_RecordData] SENT BY ANY,
[Table_Transaction] SENT BY ANY)
MessageTypes:
USE [SSBSender]
GO
CREATE MESSAGE TYPE [Table_Error] AUTHORIZATION [dbo] VALIDATION = WELL_FORMED_XML
CREATE MESSAGE TYPE [Table_Record] AUTHORIZATION [dbo] VALIDATION = WELL_FORMED_XML
CREATE MESSAGE TYPE [Table_RecordData] AUTHORIZATION [dbo] VALIDATION = WELL_FORMED_XML
CREATE MESSAGE TYPE [Table_Transaction] AUTHORIZATION [dbo] VALIDATION = WELL_FORMED_XML
Sender Queue:
CREATE QUEUE [dbo].[InitialQueue] WITH STATUS = ON , RETENTION = OFF , ACTIVATION ( STATUS = ON , PROCEDURE_NAME = [dbo].[SSB_ReportTransmission] , MAX_QUEUE_READERS = 20 , EXECUTE AS OWNER ) ON [PRIMARY]
Service:
CREATE SERVICE [TriggerArchiving] AUTHORIZATION [MyUser] ON QUEUE [dbo].[InitialQueue] ([ArchivingTables])
Route
CREATE ROUTE [SSBReceiver] AUTHORIZATION [dbo] WITH SERVICE_NAME = N’ReportingReceivingService’ , ADDRESS = N’TCP://DestinationServer:4022′
Remote Service Binding
CREATE REMOTE SERVICE BINDING [ReceivingServiceBinding] AUTHORIZATION [dbo] TO SERVICE N’ReportingReceivingService’ WITH USER = [MyUserName] , ANONYMOUS = OFF
Activation Job
Schedule a job to activate the process.
declare @datetime datetime
select @datetime = getdate()
EXEC [SSBSender].[dbo].[Start_Report_Transmission] @batch_size = 2500, @archive_date_limit = @datetime
Sender Procedures
USE [SSBSender]
GO
CREATE PROCEDURE [dbo].[Start_Report_Transmission] @batch_size INT = NULL, @archive_date_limit DATETIME = NULL
AS
DECLARE @xmldata XML
DECLARE @InitDlgHandle UNIQUEIDENTIFIER
SET @batch_size = ISNULL(@batch_size, 100)
SET @archive_date_limit = ISNULL(@archive_date_limit, GETDATE())
–@xmldata contains the batch size for every table
SET @xmldata = N'<batch>’ + CAST(@batch_size AS VARCHAR(30)) + N'</batch><archive_date_limit>’ + CONVERT(NVARCHAR(20), @archive_date_limit, 120)+ N'</archive_date_limit>’
–create a dialog from TriggerArchiving to itself
;BEGIN DIALOG @InitDlgHandle
FROM SERVICE TriggerArchiving
TO SERVICE N’TriggerArchiving’
ON CONTRACT ArchivingTables
WITH ENCRYPTION = OFF;
–send as many messages as the tables that need to be sent to reporting
;SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [Table_Error](@xmldata);
;SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [Table_Transaction](@xmldata);
;SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [Table_Record](@xmldata);
;SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [Table_RecordData](@xmldata);
END CONVERSATION @InitDlgHandle;
The procesure above will TRIGEER the Queue activation proc below!!!
USE [SSBSender]
GO
/****** Object: StoredProcedure [dbo].[SSB_ReportTransmission] Script Date: 03/10/2010 11:08:23 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[SSB_ReportTransmission]
AS
DECLARE @batch INT;
DECLARE @archive_date_limit DATETIME;
DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg XML;
DECLARE @RecvReqMsgName sysname;
DECLARE @ReportTransmissionHandle UNIQUEIDENTIFIER;
DECLARE @list XML;
DECLARE @far_service NVARCHAR(256)
WHILE 1=1
BEGIN
SELECT @RecvReqDlgHandle = NULL, @RecvReqMsg = NULL, @RecvReqMsgName = NULL, @far_service = NULL
WAITFOR
(
RECEIVE TOP(1)
@RecvReqDlgHandle = conversation_handle,
@RecvReqMsg = message_body,
@RecvReqMsgName = message_type_name
FROM InitialQueue
), TIMEOUT 10000
IF @RecvReqMsgName = N’http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’
OR @RecvReqMsgName = N’http://schemas.microsoft.com/SQL/ServiceBroker/Error’
BEGIN
IF @RecvReqDlgHandle IS NOT NULL
BEGIN
END CONVERSATION @RecvReqDlgHandle;
END
RETURN 0
END
–at this point there is a message.
–check where the message is coming from (the far service)
SELECT @far_service = far_service
FROM sys.conversation_endpoints
WHERE conversation_handle = @RecvReqDlgHandle
–based on the source of the message perform different actions
IF @far_service = N’ReportingReceivingService’ –that means this is an answer from the reporting server
BEGIN
–set the status of records
EXEC dbo.SetReportingFlag @tabletype = @RecvReqMsgName, @list = @RecvReqMsg
CONTINUE –to receiving loop
END
–at this point the message came from a local call (triggering repeatable transmission)
–get the batch size
SET @batch = @RecvReqMsg.value(‘(batch/text())[1]’, ‘INT’)
SET @archive_date_limit = @RecvReqMsg.value(‘(archive_date_limit/text())[1]’, ‘DATETIME’)
IF ISNULL(@batch, 0) = 0 OR @archive_date_limit IS NULL
BEGIN
RETURN 0
END
;BEGIN DIALOG @ReportTransmissionHandle
FROM SERVICE TriggerArchiving
TO SERVICE N’ReportingReceivingService’
ON CONTRACT ArchivingTables
WITH ENCRYPTION = OFF;
WHILE 1=1
BEGIN
SET @list = NULL
–call procedure that updates the appropriate table
–and returns a list of ids that need to be transferred
EXEC dbo.GetArchiveIdsforReporting @tabletype = @RecvReqMsgName, @batch = @batch, @archive_date_limit = @archive_date_limit, @list = @list OUTPUT
–break the loop when list is null (no records to be transferred)
IF @list IS NULL
BEGIN
BREAK
END
–transmit the list, with the initially received message type
;SEND ON CONVERSATION @ReportTransmissionHandle MESSAGE TYPE @RecvReqMsgName(@list);
END
–end the conversation when no more records are to be transmitted
END CONVERSATION @ReportTransmissionHandle;
–break receiving loop
BREAK
END –of receiving looping
RETURN 0
USE [SSBSender]
GO
/****** Object: StoredProcedure [dbo].[GetArchiveIdsforReporting] Script Date: 03/10/2010 11:09:22 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[GetArchiveIdsforReporting] @tabletype SYSNAME = NULL, @batch INT = NULL, @archive_date_limit DATETIME = NULL, @list XML = NULL OUTPUT
AS
–first NULLify the output variable
–this is important for always returning the right value (when output param is NULL)
–to the caller
SET @list = NULL
IF ISNULL(@tabletype, ”) = ” OR ISNULL(@batch, 0) = 0 OR @archive_date_limit IS NULL
BEGIN
RETURN
END
DECLARE @tmp TABLE (archive_id BIGINT, tmpid INT IDENTITY(1,1) PRIMARY KEY CLUSTERED (archive_id, tmpid))
–trigger update of table and sending of records
–based on the message type
IF @tabletype = ‘Table_Error’
BEGIN
UPDATE TOP (@batch) MyTransactionDatabase.dbo.tfe_TableError
SET sent_to_report = 1
OUTPUT inserted.tfe_id INTO @tmp(archive_id)
WHERE sent_to_report = 0
AND archive_date <= @archive_date_limit
END
ELSE IF @tabletype = ‘Table_Transaction’
BEGIN
UPDATE TOP (@batch) MyTransactionDatabase.dbo.tft_TableTransaction
SET sent_to_report = 1
OUTPUT inserted.tft_id INTO @tmp(archive_id)
WHERE sent_to_report = 0
AND archive_date <= @archive_date_limit
END
ELSE IF @tabletype = ‘Table_Record’
BEGIN
UPDATE TOP (@batch) MyTransactionDatabase.dbo.tfr_TableRecord
SET sent_to_report = 1
OUTPUT inserted.tfr_id INTO @tmp(archive_id)
WHERE sent_to_report = 0
AND archive_date <= @archive_date_limit
END
ELSE IF @tabletype = ‘Table_RecordData’
BEGIN
UPDATE TOP (@batch) MyTransactionDatabase.dbo.trd_TableRecordData
SET sent_to_report = 1
OUTPUT inserted.trd_id INTO @tmp(archive_id)
WHERE sent_to_report = 0
AND archive_date <= @archive_date_limit
END
IF NOT EXISTS(SELECT * FROM @tmp)
BEGIN
RETURN
END
SET @list = NULL
–create the list of tbl_id’s
SET @list = (SELECT TOP 100 PERCENT archive_id AS id
FROM @tmp
ORDER BY archive_id
FOR XML PATH(”)
)
USE [SSBSender]
GO
/****** Object: StoredProcedure [dbo].[SetReportingFlag] Script Date: 03/10/2010 11:11:55 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[SetReportingFlag] @tabletype sysname = NULL, @list XML = NULL
AS
DECLARE @TransferStatus TINYINT
IF @tabletype IS NULL OR @list IS NULL
BEGIN
RETURN 0
END
–get the transmission flag value
SET @TransferStatus = @list.value(‘(TransferStatus/text())[1]’, ‘TINYINT’)
IF @TransferStatus IS NULL
BEGIN
RETURN 0
END
–shred the @list and get id values
DECLARE @tmp TABLE (archive_id BIGINT, tmpid INT IDENTITY(1,1) PRIMARY KEY CLUSTERED (archive_id, tmpid))
–shred the @list of values
INSERT INTO @tmp(archive_id)
SELECT a.col.value(‘text()[1]’, ‘BIGINT’)
FROM @list.nodes(‘id’) a(col)
IF NOT EXISTS(SELECT TOP 1 * FROM @tmp)
BEGIN
RETURN 0
END
–update proper table with @TransferStatus
IF @tabletype = ‘Table_Error’
BEGIN
UPDATE archive_tbl
SET sent_to_report = @TransferStatus
FROM MyTransactionDatabase.dbo.tfe_TableError archive_tbl
JOIN @tmp tmp ON archive_tbl.tfe_id = tmp.archive_id
END
ELSE IF @tabletype = ‘Table_Transaction’
BEGIN
UPDATE archive_tbl
SET sent_to_report = @TransferStatus
FROM MyTransactionDatabase.dbo.tft_TaleTransaction archive_tbl
JOIN @tmp tmp ON archive_tbl.tft_id = tmp.archive_id
END
ELSE IF @tabletype = ‘Table_Record’
BEGIN
UPDATE archive_tbl
SET sent_to_report = @TransferStatus
FROM MyTransactionDatabase.dbo.tfr_tableRecord archive_tbl
JOIN @tmp tmp ON archive_tbl.tfr_id = tmp.archive_id
END
ELSE IF @tabletype = ‘Table_RecordData’
blah blah blah
RETURN 0
Destination SQL Server
This is the server that will receive the data and process the messages in the queue. The database here to manage synchronization will be SSBReceiver, this will manage the receiving queuing and parsing the message payload to the receiver procedure.
On the receiving database, you will have the SAME contract and message types.
Queue
CREATE QUEUE [dbo].[ReceivingQueue] WITH STATUS = ON , RETENTION = OFF , ACTIVATION ( STATUS = ON , PROCEDURE_NAME = [dbo].[SSBReceiveMessage] , MAX_QUEUE_READERS = 40 , EXECUTE AS OWNER ) ON [PRIMARY]
Service
CREATE SERVICE [ReportingReceivingService] AUTHORIZATION [User] ON QUEUE [dbo].[ReceivingQueue] ([ArchivingTables])
Route
CREATE ROUTE [TriggerArchiving] AUTHORIZATION [dbo] WITH SERVICE_NAME = N’TriggerArchiving’ , ADDRESS = N’TCP://SourceServer:4022′
Remote Service Binding
CREATE REMOTE SERVICE BINDING [TriggerArchivingServiceBinding] AUTHORIZATION [dbo] TO SERVICE N’TriggerArchiving’ WITH USER = [MyUserName] , ANONYMOUS = OFF
Now all you need to do is get the stored procedure that is activated on the receiving queue to manage the message payload JEasy as that!
USE [SSBReceiver]
GO
/****** Object: StoredProcedure [dbo].[SSBReceiveMessage] Script Date: 03/10/2010 11:20:17 ******/
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[SSBReceiveMessage]
AS
print ‘SSBReceiveMessage started’
DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;
DECLARE @RecvReqMsg XML;
DECLARE @RecvReqMsgName SYSNAME;
DECLARE @ReportAnswerHandle UNIQUEIDENTIFIER;
–begin an answering dialogue with the TriggerArchiving service
;BEGIN DIALOG @ReportAnswerHandle
FROM SERVICE ReportingReceivingService
TO SERVICE N’TriggerArchiving’
ON CONTRACT ArchivingTables
WITH ENCRYPTION = OFF;
WHILE 1=1
BEGIN
SELECT @RecvReqDlgHandle = NULL, @RecvReqMsg = NULL, @RecvReqMsgName = NULL;
WAITFOR
(
RECEIVE TOP(1)
@RecvReqDlgHandle = conversation_handle,
@RecvReqMsg = message_body,
@RecvReqMsgName = message_type_name
FROM [ReceivingQueue]
), TIMEOUT 10000
IF @RecvReqMsgName = N’http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog’
OR @RecvReqMsgName = N’http://schemas.microsoft.com/SQL/ServiceBroker/Error’
BEGIN
IF @RecvReqDlgHandle IS NOT NULL
BEGIN
END CONVERSATION @RecvReqDlgHandle;
END
BREAK
END
–in case the message is null, break looping
IF @RecvReqMsg IS NULL
BEGIN
BREAK
END
BEGIN TRY
–execute procedure that pulls archived records
EXEC dbo.PullArchiveRecords @tabletype = @RecvReqMsgName, @list = @RecvReqMsg
–apply Success Status element in @RecvReqMsg
SET @RecvReqMsg.modify(‘insert element TransferStatus {2} as first into .’)
END TRY
BEGIN CATCH
–at this point an error occured
print ‘Error occured during PullArchiveRecords: ‘ + ERROR_MESSAGE()
–apply Error Status element in @RecvReqMsg
SET @RecvReqMsg.modify(‘insert element TransferStatus {3} as first into .’)
END CATCH
–send the action answer to TriggerArchiving
;SEND ON CONVERSATION @ReportAnswerHandle MESSAGE TYPE @RecvReqMsgName(@RecvReqMsg);
END
–end conversation with TriggerArchiving service
END CONVERSATION @ReportAnswerHandle
RETURN 0
From here, you CREATE your own logic on the calling proc from the above proc to transfer the batches to the reporting/Archive tables.
USE [SSBReceiver]
GO
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
CREATE PROCEDURE [dbo].[PullArchiveRecords] @tabletype SYSNAME = NULL, @list XML = NULL
AS
IF @tabletype IS NULL OR @list IS NULL
BEGIN
RETURN 0
END
DECLARE @tmp TABLE (archive_id BIGINT, tmpid INT IDENTITY(1,1) PRIMARY KEY CLUSTERED (archive_id, tmpid))
–shred the @list of values
INSERT INTO @tmp(archive_id)
SELECT a.col.value(‘text()[1]’, ‘BIGINT’)
FROM @list.nodes(‘id’) a(col)
IF NOT EXISTS(SELECT TOP 1 * FROM @tmp)
BEGIN
RETURN 0
END
–pull records from Archiving server
–based on @tabletype
BEGIN TRY
IF @tabletype = ‘Table_Error’
BEGIN
–PUT LOGIC HERE to insert from payload joining on @tmp
END
ELSE IF @tabletype = ‘Table_Transaction’
BEGIN
–PUT LOGIC HERE to insert from payload joining on @tmp
END
ELSE IF @tabletype = ‘Table_Record’
BEGIN
–PUT LOGIC HERE to insert from payload joining on @tmp
END
ELSE IF @tabletype = ‘Table_RecordData’
BEGIN
–PUT LOGIC HERE to insert from payload joining on @tmp
END
END TRY
BEGIN CATCH
INSERT INTO dbo.myerrors(errormessage)
values(ERROR_MESSAGE())
END CATCH
Reporting Server Facts Job
You can then create a FACT Table job to update the facts table with data that was just imported. You can use a logical flag to ensure data is flagged in the procedure above, where you implement your logic when inserting the new data. The fact job can then use a flag value to update facts, similar to a CUBE on an OLAP server.
Conclusion
You can develop some super fast and stream lime synchronization mechanisms with service broker, transferring millions of records is a breeze, since batch sizes are kept small and we spawn 48-50 parrallel batching processes at once into the queue, you ensure your Transaction log checkpoints are efficient, especially on high processing load days. Much more efficient that Replication and Log Shipping, and MUCH easier to support. This means, we can always keep the queue size at a max of +- 50 π Easy to manage on large transaction days.
I have transferred around 300 million records and not had a hiccup since.
- Uncategorized