网络通讯是大家在开发过程中常见的问题。
今天我针对项目中遇到的一个实际问题探讨一下高性能通讯模式。
网络通讯中常见到以下情况:线程以循环的方式等待接收信息。
接收完毕,sleep一下,释放CPU,sleep结束,再接收信息。
伪代码如下:
0001> while(...){
0002> sleep(...);
0003>
0004> accept(waitTime);
0005> ...
0006> }
乍看上去这种方式没有什么不妥。其实,这种方式有隐患。
第二行的 sleep周期很难调整。
如果需要最好的响应速度,就需要设置的很小。但是占用CPU就多;
如果需要少占CPU,就要设置的大些,可是响应速度(单位时间接收信息量的峰值)会下降。
该设计思路下的极限处理速度有限制。
修正方案思路如下:
1 采用观察者模式替代以前的轮循模式,释放CPU。
2 由于观察者的update函数是阻塞的,所以在update函数中启动单独的线程处理接收到的单元。
3 采用一个缓冲队列临时放数据,处理数据的线程通过事件驱动(Observer模式支持)工作。
4 以下分别列出各部分伪代码
1001> // 信息接收部分。
1002> while(...){
1003> sleep(0);
1004>
1005> pdu = accept(waitTime);
1006> if(pdu!=null){
1007> queue.push(pdu);
1008> }
1009> ...
1010> }
解释:
1 sleep等待的时限为0,就是说,仅仅在必要的时候可以释放CPU调度时间片,
2 只要CPU不忙,就不会等待。
3 在 accept 函数中采取的方式是不占用CPU的阻塞等待,当前MFC和Java中都支持。
4 等待多长时间以后返回,这个时间对其他进程的影响明显缩小。
5 如果有接收到数据,就放入缓冲区中。
2000> // 缓冲队列,从Observable继承。
2001> public final class BQueue extends Observable{
2002>
2003> // 数据放入缓冲区,就自动通知Observer进行处理。
2004> public synchronized boolean push(Object ms) {
2005> // 放入缓冲空间。
2006> ....
2007>
2008> // 这里通知队列的观察者,有数据加入。
2009> this.setChanged();
2010> this.notifyObservers();
2011>
2012> ....
2013> }
2014> }
解释:
1 缓冲队列继承Observable,队列观察者实现Observer接口。
2 在启动核心工作线程以前,把SendObserver(下面随即介绍)作为一个Observer注册给缓冲队列。
3 缓冲队列有数据的时候,通过 9、10 两行通知 SendObserver
3000> // 队列的观察者,实现Observer接口。
3001> public class SendObserver implements Observer{
3002>
3003> public void update(Observable queue, Object obj) {
3004>
3005> BQueue q = (BQueue)queue;
3006> .....
3007> while(q.size()>0){
3008> pdu = q.pop();
3009>
3010> // 这里对每个PDU,单独启动一个线程进行处理,可以达到响应速度最快
3011> BHandler handler = new BHandler(pdu);
3012> handler.startHandle();
3013> }
3014> }
3015> }
解释:
1 这里对每个数据的处理采用非阻塞的方式:启动一个线程。
2 已完成的测试结果:(本人在用机 高阳配置最差机器代表)
用 LinkedList(链表)做为缓冲队列的数据结构,每秒入队和出队的速度在 亿条量级
从缓冲队列中取出数据后,并启动线程进行处理,不考虑完成时间,可接收的浪涌:百万条量级。
4000> // 可以处理信息单元的线程。
4001> public class BHandler implements Runnable{
4002>
4003> // 在构造函数中把PDU保存下来,随后等待线程启动把PDU处理完毕。
4004> private RTCS_CTpdu pduForSend;
4005> BHandler(RTCS_CTpdu pdu){
4006> pduForSend = pdu;
4007> }
4008>
4009> public void run(){
4010> // ... 处理
4011> .....
4012> }
4013>
4014> public void startHandle(){
4015> new Thread(this).start();
4016> }
4017> }
总结
1 本文主要针对我们常见的一些通讯缓冲机制进行改良的方案。
全文引用的全是伪代码。仅说明编程思路。
不足之处,欢迎大家探讨指正 ludj@hisunsray.com 卢德君 。
2 我部门祝鹏先生曾提出动态调整sleep周期的方案,也很可取,
探讨请联系:zhupeng@hisunsray.com