首页 > Web开发 > 详细

NET下RabbitMQ实践[WCF发布篇]

时间:2014-01-24 19:28:51      阅读:471      评论:0      收藏:0      [点我收藏+]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
  在之前的两篇文章中,主要介绍了RabbitMQ环境配置,简单示例的编写。今天将会介绍如何使用WCF将RabbitMQ列队以服务的方式进行发布。
      
     注:因为RabbitMQ的官方.net客户端中包括了WCF的SAMPLE代码演示,很适合初学者,所以我就偷了个懒,直接对照它的SAMPLE来说明了,算是借花献佛吧,呵呵。
 
     首先我们下载相应源码(基于.NET 3.0),本文主要对该源码包中的代码进行讲解,链接如下:
     
    Binary, compiled for .NET 3.0 and newer (zip) - includes example code, the WCF binding and WCF examples
         
    当然官方还提供了基本.NET 2.0 版本的示例版本,但其中只是一些简单的示例,并不包括WCF部分,这里只发个链接,感兴趣的朋友可自行研究。
     
    Binary, compiled for .NET 2.0 (zip) - includes example code
         
     
    下载基于.NET 3.0的版本源码之后,解压其中的projects\examples\wcf目录,可看到如下的项目:
     
     
       
    几个文件夹分别对应如下应用场景:
    OneWay: 单向通信(无返回值)
    TwoWay: 双向通信(请求/响应)
    Session:会话方式
    Duplex: 双向通信(可以指定一个Callback回调函数)
    
    
    下面逐一进行介绍:
 
   
 
   OneWay  
    在OneWayTest示例中,演示了插入日志数据,因为日志操作一般只是单纯的写入操作,不考虑返回值,所以使用OneWay方式。
    
   下面是其WCF接口声明和实例代码,如下:  
     
 
 
    [ServiceContract]
    public interface ILogServiceContract
    {
        [OperationContract(IsOneWay=true)]
        void Log(LogData entry);
    }
    
    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single)]
    public class LogService : ILogServiceContract
    {
        public int m_i;
        public void Log(LogData entry)
        {
            Util.WriteLine(ConsoleColor.Magenta, "  [SVC] {3} [{0,-6}] {1, 12}: {2}", entry.Level, entry.TimeStamp, entry.Message, m_i++);
        }
    }
 
 
 
     
    其只包含一个方法:Log(LogData entry) ---用于添加日志记录,可以看出它与我们以往写WCF代码没什么两样。
     
    不过这里要说明一下,在类属性InstanceContextMode枚举类型中,使用了“Single”模式,而该枚举提供了如下三种情况:
        
    Single - 为所有客户端调用分配一个服务实例。
    PerCall – 为每个客户端调用分配一个服务实例。
    PerSession – 为每个客户端会话分配一个服务实例。每个Session内多线程操作实例的话会有并发问题。 
       
    InstanceContextMode 的默认设置为 PerSession
     
    这三个值通常是要与并发模式(ConcurrencyMode)搭配使用,以解决并发效率,共享资源等复杂场景下的问题的。下面是并发模式的说明:
    ConcurrencyMode 控制一次允许多少个线程进入服务。ConcurrencyMode 可以设置为以下值之一:
     
    Single - 一次可以有一个线程进入服务。
    Reentrant - 一次可以有一个线程进入服务,但允许回调。
    Multiple - 一次可以有多个线程进入服务。   
     
    ConcurrencyMode 的默认设置为 Single。
     
    InstanceContextMode 和 ConcurrencyMode 设置会相互影响,因此为了提升并发效能,必须协调这两项设置。
     
    例如,将 InstanceContextMode 设置为 PerCall 时,会忽略 ConcurrencyMode 设置。这是因为,每个客户端调用都将路由到新的服务实例,因此一次只会有一个线程在服务实例中运行。对于PerCall的实例模型,每个客户端请求都会与服务端的一个独立的服务实例进行交互,就不会出现多个客户端请求争用一个服务实例的情况,也就不会出现并发冲突,不会影响吞吐量的问题。但对于实例内部的共享变量(static)还是会可能出现冲突。
     
    但对于当前Single设置,原因很多,可能包括:
    1. 创建服务实例需要大量的处理工作。当多个客户端访问服务时,仅允许创建一个服务实例可以降低所需处理量。
    2. 可以降低垃圾回收成本,因为不必为每个调用创建和销毁服务创建的对象。
    3. 可以在多个客户端之间共享服务实例。
    4. 避免对static静态属性的访问冲突。
     
    但如果使用Single,问题也就出来了---就是性能,因为如果 ConcurrencyMode也同时设置成Single时,当前示例中的(唯一)服务实例不会同时处理多个(单线程客户端)请求。因为服务在处理请求时会对当前服务加锁,如果再有其它请求需要该服务处理的时候,需要排队等候。如果有大量客户端访问,这可能会导致较大的瓶颈。
 
    当然如果考虑到多线程客户端使用的情况,可能问题会更严重。
     
    聊了这些,无非就是要结合具体应用场景来灵活搭配ConcurrencyMode,InstanceContextMode这两个枚举值。
     
    下面言归正传,来看一下如何将该服务与RabbitMQ进行绑定,以实现以WCF方式访问RabbitMQ服务的效果。这里暂且略过LogData数据结构信息类,直接看一下如果绑定服务代码(位于OneWayTest.cs):
     
 
 
private ServiceHost m_host;
     
public void StartService(Binding binding)
{
 
    m_host = new ServiceHost(typeof(LogService), new Uri("soap.amqp:///"));
    ((RabbitMQBinding)binding).OneWayOnly = true;   
    m_host.AddServiceEndpoint(typeof(ILogServiceContract), binding, "LogService");
    m_host.Open();
    m_serviceStarted = true
}
 
 
  
    StartService方法的主体与我们平时启动WCF服务的方法差不多,只不过是将其中的URL协议部分换成了“soap.amqp”形式,而其中的传入参数binding则是RabbitMQBinding类型,该类型是rabbitmq客户端类库提供的用于对应Binding类的RabbitMQBinding实现。下面就是其类实始化代码:
     
 
     return new RabbitMQBinding(System.Configuration.ConfigurationManager.AppSettings["manual-test-broker-uri"],RabbitMQ.Client.Protocols.FromConfiguration("manual-test-broker-protocol"));
                                        
    其包括两个参数,一个是rabbitmq服务地址,一个是所用的协议,其对应示例app.config文件中的如下结点:   
  
<add key="manual-test-broker-uri" value="amqp://10.0.4.79:5672/"/><!--本系列第一篇中的环境设置-->
<add key="manual-test-broker-protocol" value="AMQP_0_8"/>
 
 
     
    这样,我们就完成了初始化服务实例工作。
     
    接着来构造客户端代码,如下:
     
 
 
private ChannelFactory<ILogServiceContract> m_factory;
 
private ILogServiceContract m_client;                         
     
public ILogServiceContract GetClient(Binding binding)
{
    ((RabbitMQBinding)binding).OneWayOnly = true;
    m_factory = new ChannelFactory<ILogServiceContract>(binding, "soap.amqp:///LogService");
    m_factory.Open();
    return m_factory.CreateChannel();
}
 
 
     
    与平时写的代码相似,但传入参数就是上面提到的那个RabbitMQBinding实例,这样通过下面代码访问WCF中的LOG方法:
    m_client = GetClient(Program.GetBinding());
    m_client.Log(new LogData(LogLevel.High, "Hello Rabbit"));
    m_client.Log(new LogData(LogLevel.Medium, "Hello Rabbit"));
    ....
 
     
    到这里,我们可以看出,它的实现还是很简单的。我们只要把10.0.4.79:5672上的rabbitmq的环境跑起来,就可以看出最终的效果了。
     
    之后我将C#的服务端(startservice)与客户端(getclient)分开布署到不同IP的主机上,也实现了示例中的结果。
     
     
    TwoWay   
    下面介绍一下 TwoWay双向通信示例,首先是WCF接口声明和实现:   
    
 
 
    [ServiceContract]
    public interface ICalculator
    {
        [OperationContract]
        int Add(int x, int y);
         
        [OperationContract]
        int Subtract(int x, int y);
    }
     
   [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerCall)] /*为每个客户端调用分配一个服务实例*/
    public sealed class Calculator : ICalculator
    {
        public int Add(int x, int y)
        {
            return x + y;
        }
 
        public int Subtract(int x, int y)
        {
            return x - y;
        }
    }
 
 
 
         
      因为其服务的启动startservice和客户端实例构造与oneway方法类似,为了节约篇幅,这时就略过了,下面是其最终调用代码(位于TwoWayTest.cs):   
  
 
 
  public void Run()
 {
     StartService(Program.GetBinding());
 
     ICalculator calc = GetClient(Program.GetBinding());
 
     int result = 0, x = 3, y = 4;
     Util.WriteLine(ConsoleColor.Magenta, "  {0} + {1} = {2}", x, y, result = calc.Add(x, y));
     if (result != x + y)
         throw new Exception("Test Failed");
 
    ......
 }  
 
 
 
  
    与普通的WCF TWOWAY 返回调用方式相同,就不多说了。
     
     
    Session   
    下面是基于Session会话方式的代码,WCF接口声明和实现:
    
 
 
    [ServiceContract(SessionMode= SessionMode.Required)]
    public interface ICart
    {
        [OperationContract]
        void Add(CartItem item);
 
        [OperationContract]
        double GetTotal();
         
        Guid Id { [OperationContract]get; }
    }
     
    [ServiceBehavior(InstanceContextMode=InstanceContextMode.PerSession)]
    public class Cart : ICart
    {
        public Cart()
        {
            Items = new List<CartItem>();
            m_id = Guid.NewGuid();
        }
         
        private Guid m_id;
        private List<CartItem> m_items;
 
        private List<CartItem> Items {
            get { return m_items; }
            set { m_items = value; }
        }
 
        public void Add(CartItem item)
        {
            Items.Add(item);
        }
         
        public double GetTotal()
        {
            double total = 0;
            foreach (CartItem i in Items)
                total += i.Price;
 
            return total;
        }
 
        public Guid Id { get { return m_id; } }
    }
 
 
 
     
    该接口实现一个购物车功能,可以添加商品并计算总价,考虑到并发场景,这里将其实例为PerSession枚举类型,即为每个客户端会话分配一个服务实例。这样就可以在用户点击购买一件商品里,为其购物车商品列表List<CartItem>添加一条信息,而不会与其它用户的购物车商品列表相冲突。
     
    其最终的调用方法如下:
     
 
 
public void Run()
{
    StartService(Program.GetBinding());
 
    ICart cart = GetClient(Program.GetBinding());
 
    AddToCart(cart, "Beans", 0.49);//添加商品到购物车
    AddToCart(cart, "Bread", 0.89);
    AddToCart(cart, "Toaster", 4.99);
 
    double total = cart.GetTotal();//计算总价
    if (total != (0.49 + 0.89 + 4.99))
        throw new Exception("Incorrect Total");
    ......
}
 
    
     
     
    Duplex   
    最后,再介绍一下如何基于Duplex双向通信模式进行开发,DuplexTest这是个“PIZZA订单”的场景,用户下单之后,等待服务端将PIZZA加工完毕,然后服务端用callback方法通知客户端PIZZA已做好,相应WCF接口声明和实现如下:
    
 
 
   [ServiceContract(CallbackContract=typeof(IPizzaCallback))] /*绑定回调接口*/
    public interface IPizzaService
    {
        [OperationContract(IsOneWay=true)]
        void PlaceOrder(Order order);
    }
 
    [ServiceContract]
    public interface IPizzaCallback
    {
        [OperationContract(IsOneWay=true)]
        void OrderReady(Guid id); /*用于通知客户端*/
    }   
     
    public class PizzaService : IPizzaService
    {
        public void PlaceOrder(Order order)
        {
            foreach (Pizza p in order.Items)
            {
                Util.WriteLine(ConsoleColor.Magenta, "  [SVC] Cooking a {0} {1} Pizza...", p.Base, p.Toppings);
            }
            Util.WriteLine(ConsoleColor.Magenta, "  [SVC] Order {0} is Ready!", order.Id);
 
            Callback.OrderReady(order.Id);
        }
 
        IPizzaCallback Callback
        {
            get { return OperationContext.Current.GetCallbackChannel<IPizzaCallback>(); } //当前上下文中调用客户端绑定的回调方法
        }
    }
 
 
 
         
    这里要说明的是IPizzaCallback接口的OrderReady方法被绑定了IsOneWay=true属性,主要是因为如果使用“请求-响应”模式,客户端必须等服务端“响应”完成上一次“请求”后才能发出下一步“请求”。因此虽然客户端可以使用多线程方式来调用服务,但最后的执行结果仍然表现出顺序处理(效率低)。要想使服务端能够并行处理客户端请求的话,那我们就不能使用“请求-响应”的调用模式,所以这里使用One-Way的方式来调用服务。
     
    下面是客户端回调接口实现:   
     
 
 
    public class PizzaClient : DuplexClientBase<IPizzaService>, IPizzaService
    {
        public PizzaClient(InstanceContext context, Binding binding, EndpointAddress remoteAddress)
            : base(context, binding, remoteAddress) { }
 
        public void PlaceOrder(Order order)
        {
            Channel.PlaceOrder(order);
        }
    }
 
 
 
     
    最终客户端实例化(startservice)略过,因与之前示例类似。
     
 
 
    public IPizzaService GetClient(Binding binding)
    {
        PizzaClient client = new PizzaClient(new InstanceContext(this), binding, new EndpointAddress(serverUri.ToString()));
        client.Open();
        return client;
    }
 
 
 
    
    
     上面的方法中将当前客户端实例this(实现了IServiceTest<IPizzaService>, IPizzaCallback接口)注册到上下文中,目的是为了将其方法的回传调用传递到服务端(还记得服务端的这行代码吗?=>Callback.OrderReady(order.Id))
public void OrderReady(Guid id)
{
    Util.WriteLine(ConsoleColor.Magenta, "  [CLI] Order {0} has been delivered.",id);
    mre.Set();
}
 
 
 
 
    这样,服务端完成pizza时,就可以调用客户端的OrderReady方法来实现通知功能了。
    
    下面就是一个整个的下单流程实现:
 
 
public void Run()
{
       ......
       StartService(Program.GetBinding());
 
       IPizzaService client = GetClient(Program.GetBinding());
       Order lunch = new Order();
       lunch.Items = new List<Pizza>();
       lunch.Items.Add(new Pizza(PizzaBase.ThinCrust, "Meat Feast"));
       client.PlaceOrder(lunch);
       ......
}

NET下RabbitMQ实践[WCF发布篇]

原文:http://www.cnblogs.com/systemnet123/p/3532209.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!