RabbitMQ – Topic Exchange
Selamlar, bu makalede Topic Exchange üzerine konuşuyor olacağız. Bir önceki makalede Direct Exchange konusuna değinmiştik. Exchange yapılarının birbirleri arasındaki farkı anlamak çok önemli. Kullanım ihtiyacı duyulduğunda hangi exchange yapısının tercih edileceği geliştireceğiniz iş ve buna cevap veren exchange yapısı hangisi şeklinde belirlenmektedir. Direct Exchange ile Topic Exchange arasındaki temel farkı anlamak için Direct Exchange yazımı okumanızı tavsiye ederim.
Topic Exchange yine diğer exchange yapıları gibi mesajları alan ve ilgli kuyruklara parametre olarak verilen routingKey değerlerine göre dağıtımı gerçekleştiren yapılardır. Çalışma mantığı ve tanım olarak Direct Exchange yapısından hiç bir farkı yok gibi gözükse de aslında önemli fark routingKey parametresi geçilirken belli bir syntax’a sahip bir string geçebiliyor olmamız.
Direct Exchangede tek bir routing key belirleyebiliyorken Topic Exchange de oluşturmuş olduğumuz string sayesinde bize şablon routingKey geçebilmemizi sağlıyor.Bu avantajının yanında mesajların okunması esnasında “*” ,”#” gibi karakterler ile routingKey’i özelleştirebiliyoruz.
Örnek
Örneğin producer mesajı gönderdi mesajın routingKey’i Error.Warning.Critial olarak belirlendiğini düşünelim ve consumer tarafında routingKey’imizi özelleştirerek mesajı okumaya çalışalım
routingKey:“Error.*.Critial” Error ile başlasın.Farketmez.Critial ile bitenleri oku.
routingKey: “#” Tümünü oku.
routingKey: “#.Warning” Sonu Warning ile bitenleri oku.
Şimdi gelin kodlar üzerinde konuşarak konuyu daha iyi kavramaya çalışalım. Önceki makalede olduğu gibi log uygulamamız üzerinden ilerleyeceğiz.
Producer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public class ProducerService { public void Producer() { try { var logNames = Enum.GetValues(typeof(LogTypes)); ConnectionFactory factory = new ConnectionFactory(); string routeKey = string.Empty; factory.HostName = "localhost"; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "system-logs", durable: true, type: ExchangeType.Topic); IBasicProperties propertis = channel.CreateBasicProperties(); propertis.Persistent = true; for (int i = 0; i < 11; i++) { Random random = new Random(); LogTypes logType1 = (LogTypes)logNames.GetValue(random.Next(logNames.Length)); LogTypes logType2 = (LogTypes)logNames.GetValue(random.Next(logNames.Length)); LogTypes logType3 = (LogTypes)logNames.GetValue(random.Next(logNames.Length)); routeKey = $"{logType1}.{logType2}.{logType3}"; string message =String.Concat("log=Test",i.ToString()); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("system-logs", routingKey: routeKey, propertis, body: body); Console.WriteLine($"Mesaj=>{message} <=> Routing Key=>{routeKey}"); } Console.ReadLine(); } } } catch (Exception exception) { throw new Exception(exception.Message); } } } |
1 2 3 4 5 6 7 | public enum LogTypes { Ciritical = 1, Error = 2, Info = 3, Warning = 4 } |
Kodlarımızı inceleyecek olursak ConnectionFactory sınıfından bir nesne yaratılır. Yaratılan bu sınıf üzerinden HostName set edilir. ConnectionFactory sınıfı bizim RabbitMQ hostuna bağlanmamıza sağlayacak olan sınıftır. Daha sonra CreateConnection()methodu ile bağlantı gerçekleştirilir.
CreateModel() methodu ile yeni bir channel oluşturulur.Channel üzerinden ExchangeDeclare() methodu ile yeni bir exchange oluşturulur. Son olarak ilgili channel kullanılarak BasicPublish() methodu çağırılır ve oluşturulan routingkey parametre olarak geçilerek mesaj exchang’e gönderilir.
Consumer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public class ConsumerService { public void Consumer() { try { var factory = new ConnectionFactory(); factory.HostName = "localhost"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "system-logs", durable: true, type: ExchangeType.Topic); string routingKey = string.Empty; var queueName = channel.QueueDeclare().QueueName; routingKey = $"#"; channel.QueueBind(queue: queueName, exchange: "system-logs", routingKey: routingKey); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queueName, false, consumer: consumer); consumer.Received += (render, argument) => { string message = Encoding.UTF8.GetString(argument.Body.ToArray()); Console.WriteLine(message); channel.BasicAck(deliveryTag: argument.DeliveryTag, false); }; Console.ReadLine(); } } } catch (Exception exception) { throw new Exception(exception.Message); } } } |
QueueDeclare() methodu ile yeni bir Queue oluşturulur. Oluşturulan Queue QueueBind() methodu kullanılarak ilgili exchange bind edilir. Bind işlemi sırasında geçilen routingKey parametresi ile dinlenecek olan kuyrukları belirtmektedir.Tüm kuyrukları dinleyebilmek için routingKey’i “#” olarak belirliyoruz. EventingBasicConsumer sınıfından bir nesne yaratılır. Oluşturulan channel parametre olarak geçilir. Üretilen nesne üzerinden Received event’i tetiklenir ve kuyruk dinlemeye alınır. Son olarak BasicConsume methodu ile ilgili kuyruktan mesaj alınır ve işlenir.
Test 1
İlk görselde sonu Warning ile biten routingKey’e sahip kuyrukları dinlemeye aldık. Consumer tarafından routingKey’i “#.Warning” olarak belirledik.
Test 2

İkinci testimizde ise tüm kuyrukları dinleyeme aldık. Tüm kuyrukları dinleyebilmek için routingKey’i “#” şeklinde belirledik.
Test 3

Son olarak ise error ile başlayıp warning ile biten routingKey’e sahip kuyrukları dinlemeye aldık. Dinlemeye alırken routingKey’i “Error.*.Critial” şeklinde belirledik. Burada “*” karakteri ile o kısıma gelecek olan key değerinin ne olacağının farketmeyeceğini belirttik.
Bu makalede RabbitMQ içerisinde bulunan exchange türlerinden biri olanTopic Exchange türünü ele aldık. Umarım faydalı bir yazı olmuştur. Benim bu makalede anlatacaklarım bu kadar. Bir sonraki makalede Header exchange konusu ile devam ediyor olacağız. Hoşçakalın 🙂
Source
https://www.rabbitmq.com/documentation.html
https://github.com/eaktassssss/RabbitMQExchange
Son Yorumlar