Azure Service Bus Topics

Introduzione



Nel precedente articolo ho presentato l’infrastruttura di Service Bus che Azure ci mette a disposizione ed i relativi meccanismi di comunicazione. Tra questi mi sono soffermato a descrivere quello delle Queues che prevedono una comunicazione punto a punto monodirezionale. Il presente articolo illustra un altro meccanismo di comunicazione: i Topics.

Topics 

Il modello di messaggistica previsto è di tipo publish/subscribe. I componenti di un’applicazione distribuita si scambiano i messaggi pubblicandoli sul topic che funge da intermediario. A differenza delle queues, dove un singolo client riceve e gestisce il messaggio, i topics permettono a più client di scodare il medesimo messaggio proprio in accordo con il pattern publish/subscribe. 


TopicConcepts

Ogni client può applicare dei filtri/restrizioni per evitare di ricevere quei messaggi che presentano un certo valore all'interno delle proprietà del messaggio stesso. 

Esempio 

Immaginando di utilizzare lo stesso namespace dell'articolo precedente, Azure Service Bus, e di conseguenza la medesima stringa di connessione (da inserire nell'app.config), scriviamo il codice necessario ad inviare un messaggio su un Topic. 

Innanzitutto creiamo il Topic se questo non esiste ancora: 
 
const string topicName = "BloggerTopic";
 
string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
 
TopicDescription td = new TopicDescription(topicName);
td.MaxSizeInMegabytes = 5120;
td.DefaultMessageTimeToLive = new TimeSpan(0, 15, 0);
 
if (!namespaceManager.TopicExists(topicName))
namespaceManager.CreateTopic(td); 
 
La creazione è molto semplice e permette di configurare la dimensione massima del contenitore dei messaggi (anche per i topics il singolo messaggio ha una dimensione massima per il contenuto di 256KB e per gli headers di 64KB), il tempo massimo di conservazione dei messaggi, ed altri parametri (vedi link per approfondimenti) per mezzo dell'oggetto TopicDescription che è la contro parte del QueueDescription del precedente articolo.

 

Invio di un messaggio


TopicClient client = TopicClient.CreateFromConnectionString(connectionString, topicName);
var message = new BrokeredMessage(new Tweet{Title="Azure Bus Topic", Author="Isacco Spaccamonte"});
message.Properties["Sender"] = "ConsoleApp";
client.Send(message);

L'invio di un messaggio è altrattanto semplice e prevede l'utilizzo dell'oggetto BrokeredMessage come nel caso delle Queues; al messaggio ho aggiunto anche una proprietà all'interno del Dictionary delle Properties. L'oggetto passato al costruttore della classe BrokeredMessage deve essere ovviamente serializzabile. La proprietà Sender inserita può essere utilizzata in fase di filtro dei messaggi come vedremo tra breve.

 

Ricezione del messaggio 

La ricezione dei messaggi sulle Topic viene gestito per mezzo delle Subscriptions. Il client crea la sottoscrizione (se non è ancora stata creata in precedenza) specificando il Topic da cui desidera ricevere i messaggi ed il nome della sottoscrizione (nell'esempio che segue "NoFilter").

const string topicName = "BloggerTopic";
string connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString");
var namespaceManager = NamespaceManager.CreateFromConnectionString(connectionString);
if (!namespaceManager.SubscriptionExists(topicName, "
NoFilter"))
    namespaceManager.CreateSubscription(topicName, "NoFilter");

In fase di sottoscrizione possono essere definiti anche eventuali filtri da applicare ai messaggi in arrivo. Nell'esempio considero la tipologia di filtro "SqlFilter"; quest'ultimo, tramite la classica sintassi sql, permette di applicare delle regole di filtraggio che esaminano i valori contenuti nel Dictionary delle Properties.
SqlFilter consoleAppFilter = new SqlFilter("Sender = 'ConsoleApp'");
namespaceManager.CreateSubscription(topicName , "ConsoleAppMessages", consoleAppFilter);

Il codice crea una sottoscrizione chiamata "ConsoleAppMessages" con un filtro che prevede lo scodamento dei soli messaggi che nella proprietà "Sender" presentino il valore "ConsoleApp". Ora possiamo inizializzare l'oggetto SubscriptionClient passando al costruttore nell'ordine:  la stringa di connessione, il nome del topic e quello della sottoscrizione.

var subscriptionName = "NoFilter";
SubscriptionClient client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionName, ReceiveMode.PeekLock));

OnMessageOptions options = new OnMessageOptions
{
    AutoComplete = false,
    AutoRenewTimeout = TimeSpan.FromMinutes(1)
};
client.OnMessage(OnMessageReceived, options); 

private static void OnMessageReceived(BrokeredMessage message)
{
            var tweet = message.GetBody<Tweet>();
            Console.WriteLine(tweet.Title);
            message.Complete();
 }

Una volta creato il client e registrato il delegate da richiamare, siamo pronti a ricevere i messaggi dal Topic. 
Vorrei spendere ancora due parole circa la modalità di ricezione del messaggio. Nell'esempio abbiamo creato il SubscriptionClient specificando come ReceiveMode la tipologia PeekLock. Questa metodologia (che è quella standard) prevede che ogni client riceva il messaggio e lo elabori correttamente. Pertanto è responsabilità di ciascun client chiamare il metodo Complete sul messaggio ricevuto per segnalare all'engine di Azure Service Bus che l'elaborazione è andata a buon fine (evitando in questo modo ulteriori ricezioni del medesimo messaggio). Nella modalità ReceiveAndDelete il messaggio viene inoltrato al client e poi considerato come letto a prescindere dall'esito dell'elaborazione.



Commenti

Post popolari in questo blog

R – regressione lineare semplice

Polly.NET