SQL Server 2005 Enterprise Reporting Solutions leveraging Service Broker Synchronization

Hi!

This blog will outline some high level aspects of implementing service broker, this is targeted at people who are already familiar with the basics of service broker, if you need the basics, check this tutorial:

http://msdn.microsoft.com/en-us/library/bb839483.aspx

Scenario:

You have a production system that is processing millions of records a month.
Reporting requires frequent complex reports to run on the metrics and also on the actual data stored.
The actual data payload is stored as XML data type. This means any reporting metrics on data payload must be done on a fact table of some sort.
Data must be archived for 5 years. (This will be around 500 million records)

Objectives:

Decouple the reporting from the production system
Ensure up to date synchronization from production to the reporting server
Running reports will not affect the production system

Solution:

  • We will use service broker to define ONE message contract for all tables that need to be synced. Synchronization will be activated by a job which activates the SQL service broker process
  • A message type per table is created
  • Synchronization will work in batch mode, supporting running of 40 to 50 simultaneous SQL processes (Optimal for a quad core process = 48)
  • After synchronization of the main transaction tables a job on the reporting server will update the FACTS table.
  • The initial queue on the sender side will have a PROC activation configured on the queue. When the SQL job runs it will activate this job.
  • We will have a service to Trigger the archiving process
  • We will create a Route to the destination system
  • Up to you how to use the Service Broker security JIn this sample I keep it simple, no encryption. You can use Asymmetric Encryption/security etc, not in scope of this article! In this example I USE A SQL USER MYUser and User, notice the service and route user names are swapped on either server J

The fact table is used to extract information from the XML data. This allows the data to be indexed and reports to run smooth.

Do you have a transaction database and need a custom reporting solution, without the complexity of log shipping or replication? Then look no further, SQL Service broker is the ideal tool to synchronize two databases, based on simple tables.

Source SQL Server

On this server you create a Service Broker Database to activate sending the data contracts payload. So for example you can create a database called SSBSender, this will manage the starting and spawning of processes, queues etc

Contract:

USE

[SSBSender]

GO

CREATE CONTRACT [ArchivingTables] AUTHORIZATION [dbo] ([CONF_AuditTrail] SENT BY ANY,

[Table_Error] SENT BY ANY,

[Table_Record] SENT BY ANY,

[Table_RecordData] SENT BY ANY,

[Table_Transaction] SENT BY ANY)

MessageTypes:

USE [SSBSender]

GO

CREATE MESSAGE TYPE [Table_Error] AUTHORIZATION [dbo] VALIDATION = WELL_FORMED_XML

CREATE MESSAGE TYPE [Table_Record] AUTHORIZATION [dbo] VALIDATION = WELL_FORMED_XML

CREATE MESSAGE TYPE [Table_RecordData] AUTHORIZATION [dbo] VALIDATION = WELL_FORMED_XML

CREATE MESSAGE TYPE [Table_Transaction] AUTHORIZATION [dbo] VALIDATION = WELL_FORMED_XML

Sender Queue:

CREATE QUEUE [dbo].[InitialQueue] WITH STATUS = ON , RETENTION = OFF , ACTIVATION ( STATUS = ON , PROCEDURE_NAME = [dbo].[SSB_ReportTransmission] , MAX_QUEUE_READERS = 20 , EXECUTE AS OWNER ) ON [PRIMARY]

Service:

CREATE SERVICE [TriggerArchiving] AUTHORIZATION [MyUser] ON QUEUE [dbo].[InitialQueue] ([ArchivingTables])

Route

CREATE ROUTE [SSBReceiver] AUTHORIZATION [dbo] WITH SERVICE_NAME = N’ReportingReceivingService’ , ADDRESS = N’TCP://DestinationServer:4022′

Remote Service Binding

CREATE REMOTE SERVICE BINDING [ReceivingServiceBinding] AUTHORIZATION [dbo] TO SERVICE N’ReportingReceivingService’ WITH USER = [MyUserName] , ANONYMOUS = OFF

Activation Job

Schedule a job to activate the process.

declare @datetime datetime

select @datetime = getdate()

EXEC [SSBSender].[dbo].[Start_Report_Transmission] @batch_size = 2500, @archive_date_limit = @datetime

Sender Procedures

USE [SSBSender]

GO

CREATE PROCEDURE [dbo].[Start_Report_Transmission] @batch_size INT = NULL, @archive_date_limit DATETIME = NULL

AS

DECLARE @xmldata XML

DECLARE @InitDlgHandle UNIQUEIDENTIFIER

SET @batch_size = ISNULL(@batch_size, 100)

SET @archive_date_limit = ISNULL(@archive_date_limit, GETDATE())

–@xmldata contains the batch size for every table

SET @xmldata = N'<batch>’ + CAST(@batch_size AS VARCHAR(30)) + N'</batch><archive_date_limit>’ + CONVERT(NVARCHAR(20), @archive_date_limit, 120)+ N'</archive_date_limit>’

–create a dialog from TriggerArchiving to itself

;BEGIN DIALOG @InitDlgHandle

FROM SERVICE TriggerArchiving

TO SERVICE N’TriggerArchiving’

ON CONTRACT ArchivingTables

WITH ENCRYPTION = OFF;

–send as many messages as the tables that need to be sent to reporting

;SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [Table_Error](@xmldata);

;SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [Table_Transaction](@xmldata);

;SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [Table_Record](@xmldata);

;SEND ON CONVERSATION @InitDlgHandle MESSAGE TYPE [Table_RecordData](@xmldata);

END CONVERSATION @InitDlgHandle;

The procesure above will TRIGEER the Queue activation proc below!!!

USE [SSBSender]

GO

/****** Object: StoredProcedure [dbo].[SSB_ReportTransmission] Script Date: 03/10/2010 11:08:23 ******/

SET ANSI_NULLS ON

GO

SET QUOTED_IDENTIFIER ON

GO

CREATE PROCEDURE [dbo].[SSB_ReportTransmission]

AS

DECLARE @batch INT;

DECLARE @archive_date_limit DATETIME;

DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;

DECLARE @RecvReqMsg XML;

DECLARE @RecvReqMsgName sysname;

DECLARE @ReportTransmissionHandle UNIQUEIDENTIFIER;

DECLARE @list XML;

DECLARE @far_service NVARCHAR(256)

WHILE 1=1

BEGIN

SELECT @RecvReqDlgHandle = NULL, @RecvReqMsg = NULL, @RecvReqMsgName = NULL, @far_service = NULL

WAITFOR

(

RECEIVE TOP(1)

@RecvReqDlgHandle = conversation_handle,

@RecvReqMsg = message_body,

@RecvReqMsgName = message_type_name

FROM InitialQueue

), TIMEOUT 10000

IF @RecvReqMsgName = N’http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog&#8217;

OR @RecvReqMsgName = N’http://schemas.microsoft.com/SQL/ServiceBroker/Error&#8217;

BEGIN

IF @RecvReqDlgHandle IS NOT NULL

BEGIN

END CONVERSATION @RecvReqDlgHandle;

END

RETURN 0

END

–at this point there is a message.

–check where the message is coming from (the far service)

SELECT @far_service = far_service

FROM sys.conversation_endpoints

WHERE conversation_handle = @RecvReqDlgHandle

–based on the source of the message perform different actions

IF @far_service = N’ReportingReceivingService’ –that means this is an answer from the reporting server

BEGIN

–set the status of records

EXEC dbo.SetReportingFlag @tabletype = @RecvReqMsgName, @list = @RecvReqMsg

CONTINUE –to receiving loop

END

–at this point the message came from a local call (triggering repeatable transmission)

–get the batch size

SET @batch = @RecvReqMsg.value(‘(batch/text())[1]’, ‘INT’)

SET @archive_date_limit = @RecvReqMsg.value(‘(archive_date_limit/text())[1]’, ‘DATETIME’)

IF ISNULL(@batch, 0) = 0 OR @archive_date_limit IS NULL

BEGIN

RETURN 0

END

;BEGIN DIALOG @ReportTransmissionHandle

FROM SERVICE TriggerArchiving

TO SERVICE N’ReportingReceivingService’

ON CONTRACT ArchivingTables

WITH ENCRYPTION = OFF;

WHILE 1=1

BEGIN

SET @list = NULL

–call procedure that updates the appropriate table

–and returns a list of ids that need to be transferred

EXEC dbo.GetArchiveIdsforReporting @tabletype = @RecvReqMsgName, @batch = @batch, @archive_date_limit = @archive_date_limit, @list = @list OUTPUT

–break the loop when list is null (no records to be transferred)

IF @list IS NULL

BEGIN

BREAK

END

–transmit the list, with the initially received message type

;SEND ON CONVERSATION @ReportTransmissionHandle MESSAGE TYPE @RecvReqMsgName(@list);

END

–end the conversation when no more records are to be transmitted

END CONVERSATION @ReportTransmissionHandle;

–break receiving loop

BREAK

END –of receiving looping

RETURN 0

USE [SSBSender]

GO

/****** Object: StoredProcedure [dbo].[GetArchiveIdsforReporting] Script Date: 03/10/2010 11:09:22 ******/

SET ANSI_NULLS ON

GO

SET QUOTED_IDENTIFIER ON

GO

CREATE PROCEDURE [dbo].[GetArchiveIdsforReporting] @tabletype SYSNAME = NULL, @batch INT = NULL, @archive_date_limit DATETIME = NULL, @list XML = NULL OUTPUT

AS

–first NULLify the output variable

–this is important for always returning the right value (when output param is NULL)

–to the caller

SET @list = NULL

IF ISNULL(@tabletype, ”) = ” OR ISNULL(@batch, 0) = 0 OR @archive_date_limit IS NULL

BEGIN

RETURN

END

DECLARE @tmp TABLE (archive_id BIGINT, tmpid INT IDENTITY(1,1) PRIMARY KEY CLUSTERED (archive_id, tmpid))

–trigger update of table and sending of records

–based on the message type

IF @tabletype = ‘Table_Error’

BEGIN

UPDATE TOP (@batch) MyTransactionDatabase.dbo.tfe_TableError

SET sent_to_report = 1

OUTPUT inserted.tfe_id INTO @tmp(archive_id)

WHERE sent_to_report = 0

AND archive_date <= @archive_date_limit

END

ELSE IF @tabletype = ‘Table_Transaction’

BEGIN

UPDATE TOP (@batch) MyTransactionDatabase.dbo.tft_TableTransaction

SET sent_to_report = 1

OUTPUT inserted.tft_id INTO @tmp(archive_id)

WHERE sent_to_report = 0

AND archive_date <= @archive_date_limit

END

ELSE IF @tabletype = ‘Table_Record’

BEGIN

UPDATE TOP (@batch) MyTransactionDatabase.dbo.tfr_TableRecord

SET sent_to_report = 1

OUTPUT inserted.tfr_id INTO @tmp(archive_id)

WHERE sent_to_report = 0

AND archive_date <= @archive_date_limit

END

ELSE IF @tabletype = ‘Table_RecordData’

BEGIN

UPDATE TOP (@batch) MyTransactionDatabase.dbo.trd_TableRecordData

SET sent_to_report = 1

OUTPUT inserted.trd_id INTO @tmp(archive_id)

WHERE sent_to_report = 0

AND archive_date <= @archive_date_limit

END

IF NOT EXISTS(SELECT * FROM @tmp)

BEGIN

RETURN

END

SET @list = NULL

–create the list of tbl_id’s

SET @list = (SELECT TOP 100 PERCENT archive_id AS id

FROM @tmp

ORDER BY archive_id

FOR XML PATH(”)

)

USE [SSBSender]

GO

/****** Object: StoredProcedure [dbo].[SetReportingFlag] Script Date: 03/10/2010 11:11:55 ******/

SET ANSI_NULLS ON

GO

SET QUOTED_IDENTIFIER ON

GO

CREATE PROCEDURE [dbo].[SetReportingFlag] @tabletype sysname = NULL, @list XML = NULL

AS

DECLARE @TransferStatus TINYINT

IF @tabletype IS NULL OR @list IS NULL

BEGIN

RETURN 0

END

–get the transmission flag value

SET @TransferStatus = @list.value(‘(TransferStatus/text())[1]’, ‘TINYINT’)

IF @TransferStatus IS NULL

BEGIN

RETURN 0

END

–shred the @list and get id values

DECLARE @tmp TABLE (archive_id BIGINT, tmpid INT IDENTITY(1,1) PRIMARY KEY CLUSTERED (archive_id, tmpid))

–shred the @list of values

INSERT INTO @tmp(archive_id)

SELECT a.col.value(‘text()[1]’, ‘BIGINT’)

FROM @list.nodes(‘id’) a(col)

IF NOT EXISTS(SELECT TOP 1 * FROM @tmp)

BEGIN

RETURN 0

END

–update proper table with @TransferStatus

IF @tabletype = ‘Table_Error’

BEGIN

UPDATE archive_tbl

SET sent_to_report = @TransferStatus

FROM MyTransactionDatabase.dbo.tfe_TableError archive_tbl

JOIN @tmp tmp ON archive_tbl.tfe_id = tmp.archive_id

END

ELSE IF @tabletype = ‘Table_Transaction’

BEGIN

UPDATE archive_tbl

SET sent_to_report = @TransferStatus

FROM MyTransactionDatabase.dbo.tft_TaleTransaction archive_tbl

JOIN @tmp tmp ON archive_tbl.tft_id = tmp.archive_id

END

ELSE IF @tabletype = ‘Table_Record’

BEGIN

UPDATE archive_tbl

SET sent_to_report = @TransferStatus

FROM MyTransactionDatabase.dbo.tfr_tableRecord archive_tbl

JOIN @tmp tmp ON archive_tbl.tfr_id = tmp.archive_id

END

ELSE IF @tabletype = ‘Table_RecordData’

blah blah blah

RETURN 0

Destination SQL Server

This is the server that will receive the data and process the messages in the queue. The database here to manage synchronization will be SSBReceiver, this will manage the receiving queuing and parsing the message payload to the receiver procedure.

On the receiving database, you will have the SAME contract and message types.

Queue

CREATE QUEUE [dbo].[ReceivingQueue] WITH STATUS = ON , RETENTION = OFF , ACTIVATION ( STATUS = ON , PROCEDURE_NAME = [dbo].[SSBReceiveMessage] , MAX_QUEUE_READERS = 40 , EXECUTE AS OWNER ) ON [PRIMARY]

Service

CREATE SERVICE [ReportingReceivingService] AUTHORIZATION [User] ON QUEUE [dbo].[ReceivingQueue] ([ArchivingTables])

Route

CREATE ROUTE [TriggerArchiving] AUTHORIZATION [dbo] WITH SERVICE_NAME = N’TriggerArchiving’ , ADDRESS = N’TCP://SourceServer:4022′

Remote Service Binding

CREATE REMOTE SERVICE BINDING [TriggerArchivingServiceBinding] AUTHORIZATION [dbo] TO SERVICE N’TriggerArchiving’ WITH USER = [MyUserName] , ANONYMOUS = OFF

Now all you need to do is get the stored procedure that is activated on the receiving queue to manage the message payload JEasy as that!

USE [SSBReceiver]

GO

/****** Object: StoredProcedure [dbo].[SSBReceiveMessage] Script Date: 03/10/2010 11:20:17 ******/

SET ANSI_NULLS ON

GO

SET QUOTED_IDENTIFIER ON

GO

CREATE PROCEDURE [dbo].[SSBReceiveMessage]

AS

print ‘SSBReceiveMessage started’

DECLARE @RecvReqDlgHandle UNIQUEIDENTIFIER;

DECLARE @RecvReqMsg XML;

DECLARE @RecvReqMsgName SYSNAME;

DECLARE @ReportAnswerHandle UNIQUEIDENTIFIER;

–begin an answering dialogue with the TriggerArchiving service

;BEGIN DIALOG @ReportAnswerHandle

FROM SERVICE ReportingReceivingService

TO SERVICE N’TriggerArchiving’

ON CONTRACT ArchivingTables

WITH ENCRYPTION = OFF;

WHILE 1=1

BEGIN

SELECT @RecvReqDlgHandle = NULL, @RecvReqMsg = NULL, @RecvReqMsgName = NULL;

WAITFOR

(

RECEIVE TOP(1)

@RecvReqDlgHandle = conversation_handle,

@RecvReqMsg = message_body,

@RecvReqMsgName = message_type_name

FROM [ReceivingQueue]

), TIMEOUT 10000

IF @RecvReqMsgName = N’http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog&#8217;

OR @RecvReqMsgName = N’http://schemas.microsoft.com/SQL/ServiceBroker/Error&#8217;

BEGIN

IF @RecvReqDlgHandle IS NOT NULL

BEGIN

END CONVERSATION @RecvReqDlgHandle;

END

BREAK

END

–in case the message is null, break looping

IF @RecvReqMsg IS NULL

BEGIN

BREAK

END

BEGIN TRY

–execute procedure that pulls archived records

EXEC dbo.PullArchiveRecords @tabletype = @RecvReqMsgName, @list = @RecvReqMsg

–apply Success Status element in @RecvReqMsg

SET @RecvReqMsg.modify(‘insert element TransferStatus {2} as first into .’)

END TRY

BEGIN CATCH

–at this point an error occured

print ‘Error occured during PullArchiveRecords: ‘ + ERROR_MESSAGE()

–apply Error Status element in @RecvReqMsg

SET @RecvReqMsg.modify(‘insert element TransferStatus {3} as first into .’)

END CATCH

–send the action answer to TriggerArchiving

;SEND ON CONVERSATION @ReportAnswerHandle MESSAGE TYPE @RecvReqMsgName(@RecvReqMsg);

END

–end conversation with TriggerArchiving service

END CONVERSATION @ReportAnswerHandle

RETURN 0

From here, you CREATE your own logic on the calling proc from the above proc to transfer the batches to the reporting/Archive tables.

USE [SSBReceiver]

GO

SET ANSI_NULLS ON

GO

SET QUOTED_IDENTIFIER ON

GO

CREATE PROCEDURE [dbo].[PullArchiveRecords] @tabletype SYSNAME = NULL, @list XML = NULL

AS

IF @tabletype IS NULL OR @list IS NULL

BEGIN

RETURN 0

END

DECLARE @tmp TABLE (archive_id BIGINT, tmpid INT IDENTITY(1,1) PRIMARY KEY CLUSTERED (archive_id, tmpid))

–shred the @list of values

INSERT INTO @tmp(archive_id)

SELECT a.col.value(‘text()[1]’, ‘BIGINT’)

FROM @list.nodes(‘id’) a(col)

IF NOT EXISTS(SELECT TOP 1 * FROM @tmp)

BEGIN

RETURN 0

END

–pull records from Archiving server

–based on @tabletype

BEGIN TRY

IF @tabletype = ‘Table_Error’

BEGIN

–PUT LOGIC HERE to insert from payload joining on @tmp

END

ELSE IF @tabletype = ‘Table_Transaction’

BEGIN

–PUT LOGIC HERE to insert from payload joining on @tmp

END

ELSE IF @tabletype = ‘Table_Record’

BEGIN

–PUT LOGIC HERE to insert from payload joining on @tmp

END

ELSE IF @tabletype = ‘Table_RecordData’

BEGIN

–PUT LOGIC HERE to insert from payload joining on @tmp

END

END TRY

BEGIN CATCH

INSERT INTO dbo.myerrors(errormessage)

values(ERROR_MESSAGE())

END CATCH

Reporting Server Facts Job

You can then create a FACT Table job to update the facts table with data that was just imported. You can use a logical flag to ensure data is flagged in the procedure above, where you implement your logic when inserting the new data. The fact job can then use a flag value to update facts, similar to a CUBE on an OLAP server.

Conclusion

You can develop some super fast and stream lime synchronization mechanisms with service broker, transferring millions of records is a breeze, since batch sizes are kept small and we spawn 48-50 parrallel batching processes at once into the queue, you ensure your Transaction log checkpoints are efficient, especially on high processing load days. Much more efficient that Replication and Log Shipping, and MUCH easier to support. This means, we can always keep the queue size at a max of +- 50 πŸ™‚ Easy to manage on large transaction days.

I have transferred around 300 million records and not had a hiccup since.

Advertisement
  • Uncategorized

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s