首页 > 编程语言 > 详细

# 进程/线程/协程 # IO:同步/异步/阻塞/非阻塞 # greenlet gevent # 事件驱动与异步IO # Select\Poll\Epoll异步IO 以及selectors模块 # Python队列/RabbitMQ队列

时间:2017-07-17 10:55:13      阅读:530      评论:0      收藏:0      [点我收藏+]
   1 # 进程/线程/协程
   2 # IO:同步/异步/阻塞/非阻塞
   3 #     greenlet gevent
   4 # 事件驱动与异步IO
   5 # Select\Poll\Epoll异步IO 以及selectors模块
   6 # Python队列/RabbitMQ队列  
   7 
   8 ##############################################################################################
   9 1.什么是进程?进程和程序之间有什么区别?
  10     进程:一个程序的执行实例称为进程;
  11     每个进程都提供执行程序所需的资源。
  12     进程有一个虚拟地址空间、可执行代码、对系统对象的开放句柄、一个安全上下文、一个惟一的进程标识符、环境变量、一个优先级类、最小和最大工作集大小,以及至少一个执行线程;
  13     每个进程都由一个线程启动,这个线程通常被称为主线程,但是可以从它的任何线程中创建额外的线程;
  14     程序并不能单独运行,只有将程序装载到内存中,系统为它分配资源才能运行,而这种执行的程序就称之为进程;
  15     程序和进程的区别就在于:程序是指令的集合,它是进程运行的静态描述文本;进程是程序的一次执行活动,属于动态概念。
  16     在多道编程中,我们允许多个程序同时加载到内存中,在操作系统的调度下,可以实现并发地执行,大大提高了CPU的利用率
  17 2.什么是线程?
  18     进程的缺点有:
  19         进程只能在一个时间干一件事,如果想同时干两件事或多件事,进程就无能为力了。
  20         进程在执行的过程中如果阻塞,例如等待输入,整个进程就会挂起,即使进程中有些工作不依赖于输入的数据,也将无法执行。
  21     线程是操作系统能够进行运算调度的最小单位。
  22     它被包含在进程之中,是进程中的实际运作单位。
  23     一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务
  24     线程是一个执行上下文,它是一个CPU用来执行指令流的所有信息。
  25 3.进程和线程之间的关系?
  26     线程共享创建它的进程的地址空间;进程有自己的地址空间。(内存地址)
  27     线程可以直接访问其进程的数据段;进程有自己的父进程数据段的副本。
  28     线程可以直接与进程的其他线程通信;进程必须使用进程间通信来与兄弟进程通信。
  29     新线程很容易创建;新进程需要复制父进程。
  30     线程可以对同一进程的线程进行相当大的控制;进程只能对子进程执行控制。
  31     对主线程的更改(取消、优先级更改等)可能会影响流程的其他线程的行为;对父进程的更改不会影响子进程。
  32 4.python GIL全局解释器锁
  33     无论你启多少个线程,你有多少个cpu, Python在执行的时候会淡定的在同一时刻只允许一个线程运行
  34     http: // www.dabeaz.com / python / UnderstandingGIL.pdf
  35 5.Python threading模块的使用
  36     基本调用方式1
  37         import threading
  38         import time
  39 
  40 
  41         def sayhi(num):  # 定义每个线程要运行的函数
  42 
  43             print("running on number:%s" % num)
  44 
  45             time.sleep(3)
  46 
  47 
  48         if __name__ == __main__:
  49             t1 = threading.Thread(target=sayhi, args=(1,))  # 生成一个线程实例
  50             t2 = threading.Thread(target=sayhi, args=(2,))  # 生成另一个线程实例
  51 
  52             t1.start()  # 启动线程
  53             t2.start()  # 启动另一个线程
  54 
  55             print(t1.getName())  # 获取线程名
  56             print(t2.getName())
  57     基本调用方式2
  58         import threading
  59         import time
  60 
  61 
  62         class MyThread(threading.Thread):
  63             def __init__(self, num):
  64                 threading.Thread.__init__(self)
  65                 self.num = num
  66 
  67             def run(self):  # 定义每个线程要运行的函数
  68 
  69                 print("running on number:%s" % self.num)
  70 
  71                 time.sleep(3)
  72 
  73 
  74         if __name__ == __main__:
  75             t1 = MyThread(1)
  76             t2 = MyThread(2)
  77             t1.start()
  78             t2.start()
  79 6.守护线程Daemon:
  80     非守护进程线程退出,就可以将守护线程杀死。
  81     # _*_coding:utf-8_*_
  82 
  83     import time
  84     import threading
  85 
  86 
  87     def run(n):
  88         print([%s]------running----\n % n)
  89         time.sleep(2)
  90         print(--done--)
  91 
  92 
  93     def main():
  94         for i in range(5):
  95             t = threading.Thread(target=run, args=[i, ])
  96             t.start()
  97             t.join(1)
  98             print(starting thread, t.getName())
  99 
 100 
 101     m = threading.Thread(target=main, args=[])
 102     m.setDaemon(True)  # 将main线程设置为Daemon线程,它做为程序主线程的守护线程,当主线程退出时,m线程也会退出,由m启动的其它子线程会同时退出,不管是否执行完任务
 103     m.start()
 104     m.join(timeout=2)
 105     print("---main thread done----")
 106 7.线程锁(互斥锁)
 107     一个进程下可以启动多个线程,多个线程共享父进程的内存空间,也就意味着每个线程可以访问同一份数据,此时,如果2个线程同时要修改同一份数据,可能会导致数据被同时修改而使得计算结果不准确(重复赋值)
 108     import time
 109     import threading
 110 
 111 
 112     def addNum():
 113         global num  # 在每个线程中都获取这个全局变量
 114         print(--get num:, num)
 115         time.sleep(1)
 116         num -= 1  # 对此公共变量进行-1操作
 117 
 118 
 119     num = 100  # 设定一个共享变量
 120     thread_list = []
 121     for i in range(100):
 122         t = threading.Thread(target=addNum)
 123         t.start()
 124         thread_list.append(t)
 125 
 126     for t in thread_list:  # 等待所有线程执行完毕
 127         t.join()
 128 
 129     print(final num:, num)
 130     加上线程锁
 131     import time
 132     import threading
 133 
 134 
 135     def addNum():
 136         global num  # 在每个线程中都获取这个全局变量
 137         print(--get num:, num)
 138         time.sleep(1)
 139         lock.acquire()  # 修改数据前加锁
 140         num -= 1  # 对此公共变量进行-1操作
 141         lock.release()  # 修改后释放
 142 
 143 
 144     num = 100  # 设定一个共享变量
 145     thread_list = []
 146     lock = threading.Lock()  # 生成全局锁
 147     for i in range(100):
 148         t = threading.Thread(target=addNum)
 149         t.start()
 150         thread_list.append(t)
 151 
 152     for t in thread_list:  # 等待所有线程执行完毕
 153         t.join()
 154 
 155     print(final num:, num)
 156 8.线程锁与GIL之间的关系?
 157     加入GIL主要的原因是为了降低程序的开发的复杂度,
 158     比如现在的你写python不需要关心内存回收的问题,因为Python解释器帮你自动定期进行内存回收,你可以理解为python解释器里有一个独立的线程,
 159     每过一段时间它起wake up做一次全局轮询看看哪些内存数据是可以被清空的,此时你自己的程序里的线程和 py解释器自己的线程是并发运行的,
 160     假设你的线程删除了一个变量,py解释器的垃圾回收线程在清空这个变量的过程中的clearing时刻,
 161     可能一个其它线程正好又重新给这个还没来及得清空的内存空间赋值了,结果就有可能新赋值的数据被删除了,
 162     为了解决类似的问题,python解释器简单粗暴的加了锁,即当一个线程运行时,其它人都不能动
 163 9.递归锁(Rlock 不用递归锁而多重加lock锁会导致被锁住,程序卡死)
 164     import threading, time
 165 
 166 
 167     def run1():
 168         print("grab the first part data")
 169         lock.acquire()
 170         global num
 171         num += 1
 172         lock.release()
 173         return num
 174 
 175 
 176     def run2():
 177         print("grab the second part data")
 178         lock.acquire()
 179         global num2
 180         num2 += 1
 181         lock.release()
 182         return num2
 183 
 184 
 185     def run3():
 186         lock.acquire()
 187         res = run1()
 188         print(--------between run1 and run2-----)
 189         res2 = run2()
 190         lock.release()
 191         print(res, res2)
 192 
 193 
 194     if __name__ == __main__:
 195 
 196         num, num2 = 0, 0
 197         lock = threading.RLock() #注意递归锁是Rlock
 198         for i in range(10):
 199             t = threading.Thread(target=run3)
 200             t.start()
 201 
 202     while threading.active_count() != 1:
 203         print(threading.active_count())
 204     else:
 205         print(----all threads done---)
 206         print(num, num2)
 207 10.Semaphore(信号量):
 208     同时允许一定数量的线程更改数据
 209     import threading, time
 210 
 211     def run(n):
 212         semaphore.acquire()
 213         time.sleep(1)
 214         print("run the thread: %s\n" % n)
 215         semaphore.release()
 216 
 217     if __name__ == __main__:
 218 
 219         num = 0
 220         semaphore = threading.BoundedSemaphore(5)  # 最多允许5个线程同时运行
 221         for i in range(20):
 222             t = threading.Thread(target=run, args=(i,))
 223             t.start()
 224 
 225     while threading.active_count() != 1:
 226         pass  # print threading.active_count()
 227     else:
 228         print(----all threads done---)
 229         print(num)
 230 11.Events事件:通过Event来实现两个或多个线程间的交互
 231     event = threading.Event()
 232     # a client thread can wait for the flag to be set
 233     event.wait()
 234     # a server thread can set or reset it
 235     event.set()
 236     event.clear()
 237     一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则:
 238     import threading,time
 239     import random
 240     def light():
 241         if not event.isSet():
 242             event.set() #wait就不阻塞 #绿灯状态
 243         count = 0
 244         while True:
 245             if count < 10:
 246                 print(\033[42;1m--green light on---\033[0m)
 247             elif count <13:
 248                 print(\033[43;1m--yellow light on---\033[0m)
 249             elif count <20:
 250                 if event.isSet():
 251                     event.clear()
 252                 print(\033[41;1m--red light on---\033[0m)
 253             else:
 254                 count = 0
 255                 event.set() #打开绿灯
 256             time.sleep(1)
 257             count +=1
 258     def car(n):
 259         while 1:
 260             time.sleep(random.randrange(10))
 261             if  event.isSet(): #绿灯
 262                 print("car [%s] is running.." % n)
 263             else:
 264                 print("car [%s] is waiting for the red light.." %n)
 265     if __name__ == __main__:
 266         event = threading.Event()
 267         Light = threading.Thread(target=light)
 268         Light.start()
 269         for i in range(3):
 270             t = threading.Thread(target=car,args=(i,))
 271             t.start()
 272 12.python queue队列(线程队列)
 273     class queue.Queue(maxsize=0) #先入先出
 274     class queue.LifoQueue(maxsize=0) #last in fisrt out
 275     class queue.PriorityQueue(maxsize=0) #存储数据时可设置优先级的队列
 276     Queue.qsize()
 277     Queue.empty() #return True if empty
 278     Queue.full() # return True if full
 279     Queue.put(item, block=True, timeout=None) #将项目放入队列中。如果可选的args块是true,则超时为None(缺省值),如果需要则阻塞,直到空闲槽可用。如果超时是一个正数,它会在大多数超时秒中阻塞,如果在那个时间内没有空闲槽,则会引发完全的异常。否则(块是false),如果一个空闲槽立即可用,则在队列上放置一个项,否则就会抛出完全异常(在这种情况下会忽略超时)。
 280     Queue.put_nowait(item) #Equivalent to put(item, False).
 281     Queue.get(block=True, timeout=None) #从队列中删除并返回一个项目。如果可选的args块是true,则超时为None(缺省值),如果需要则阻塞,直到有可用的项。如果超时是一个正数,它会在大多数超时秒中阻塞,如果在那个时间内没有可用的项,则会抛出空的异常。否则(块是false),如果立即可用,返回一个项目,否则将抛出空异常(在这种情况下忽略超时)。
 282     Queue.get_nowait() #Equivalent to get(False).
 283     两种方法来支持跟踪队列的任务是否已经被守护进程的消费者线程完全地处理。
 284     Queue.task_done()
 285     Queue.join() block直到queue被消费完毕
 286 13.生产者消费者模型
 287     生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。
 288     生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,
 289     所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,
 290     消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
 291     import threading
 292     import queue
 293 
 294 
 295     def producer():
 296         for i in range(10):
 297             q.put("骨头 %s" % i)
 298 
 299         print("开始等待所有的骨头被取走...")
 300         q.join()
 301         print("所有的骨头被取完了...")
 302 
 303 
 304     def consumer(n):
 305         while q.qsize() > 0:
 306             print("%s 取到" % n, q.get())
 307             q.task_done()  # 告知这个任务执行完了
 308 
 309 
 310     q = queue.Queue()
 311 
 312     p = threading.Thread(target=producer, )
 313     p.start()
 314 
 315     c1 = consumer("李闯")
 316 
 317 
 318     import time,random
 319     import queue,threading
 320     q = queue.Queue()
 321     def Producer(name):
 322       count = 0
 323       while count <20:
 324         time.sleep(random.randrange(3))
 325         q.put(count)
 326         print(Producer %s has produced %s baozi.. %(name, count))
 327         count +=1
 328     def Consumer(name):
 329       count = 0
 330       while count <20:
 331         time.sleep(random.randrange(4))
 332         if not q.empty():
 333             data = q.get()
 334             print(data)
 335             print(\033[32;1mConsumer %s has eat %s baozi...\033[0m %(name, data))
 336         else:
 337             print("-----no baozi anymore----")
 338         count +=1
 339     p1 = threading.Thread(target=Producer, args=(A,))
 340     c1 = threading.Thread(target=Consumer, args=(B,))
 341     p1.start()
 342     c1.start()
 343 14.多进程模块multiprocessing
 344     from multiprocessing import Process
 345     import time
 346 
 347 
 348     def f(name):
 349         time.sleep(2)
 350         print(hello, name)
 351 
 352 
 353     if __name__ == __main__:
 354         p = Process(target=f, args=(bob,))
 355         p.start()
 356         p.join()
 357     14.1展示进程号:
 358     from multiprocessing import Process
 359     import os
 360 
 361 
 362     def info(title):
 363         print(title)
 364         print(module name:, __name__)
 365         print(parent process:, os.getppid())
 366         print(process id:, os.getpid())
 367         print("\n\n")
 368 
 369 
 370     def f(name):
 371         info(\033[31;1mfunction f\033[0m)
 372         print(hello, name)
 373 
 374 
 375     if __name__ == __main__:
 376         info(\033[32;1mmain process line\033[0m)
 377         p = Process(target=f, args=(bob,))
 378         p.start()
 379         p.join()
 380     14.2进程间通讯
 381         14.2.1Queues方法
 382             from multiprocessing import Process, Queue
 383 
 384 
 385             def f(q):
 386                 q.put([42, None, hello])
 387 
 388 
 389             if __name__ == __main__:
 390                 q = Queue()
 391                 p = Process(target=f, args=(q,))
 392                 p.start()
 393                 print(q.get())  # prints "[42, None, ‘hello‘]"
 394                 p.join()
 395         14.2.2Pipes方法
 396             from multiprocessing import Process, Pipe
 397 
 398 
 399             def f(conn):
 400                 conn.send([42, None, hello])
 401                 conn.close()
 402 
 403 
 404             if __name__ == __main__:
 405                 parent_conn, child_conn = Pipe()
 406                 p = Process(target=f, args=(child_conn,))
 407                 p.start()
 408                 print(parent_conn.recv())  # prints "[42, None, ‘hello‘]"
 409                 p.join()
 410         14.2.3Managers方法
 411             from multiprocessing import Process, Manager
 412 
 413 
 414             def f(d, l):
 415                 d[1] = 1
 416                 d[2] = 2
 417                 d[0.25] = None
 418                 l.append(1)
 419                 print(l)
 420 
 421 
 422             if __name__ == __main__:
 423                 with Manager() as manager:
 424                     d = manager.dict()
 425 
 426                     l = manager.list(range(5))
 427                     p_list = []
 428                     for i in range(10):
 429                         p = Process(target=f, args=(d, l))
 430                         p.start()
 431                         p_list.append(p)
 432                     for res in p_list:
 433                         res.join()
 434 
 435                     print(d)
 436                     print(l)
 437     14.3进程同步
 438         from multiprocessing import Process, Lock
 439 
 440 
 441         def f(l, i):
 442             l.acquire()
 443             try:
 444                 print(hello world, i)
 445             finally:
 446                 l.release()
 447 
 448 
 449         if __name__ == __main__:
 450             lock = Lock()
 451 
 452             for num in range(10):
 453                 Process(target=f, args=(lock, num)).start()
 454 15.进程池
 455     进程池中有两个方法:
 456     apply;
 457     apply_async;
 458     from  multiprocessing import Process, Pool
 459     import time
 460     def Foo(i):
 461         time.sleep(2)
 462         return i + 100
 463     def Bar(arg):
 464         print(-->exec done:, arg)
 465     pool = Pool(5)
 466     for i in range(10):
 467         pool.apply_async(func=Foo, args=(i,), callback=Bar)
 468     # pool.apply(func=Foo, args=(i,))
 469     print(end)
 470     pool.close()
 471     pool.join()  # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
 472 
 473 
 474 16.协程:
 475     协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
 476     协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
 477     协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
 478     协程的好处:
 479         无需线程上下文切换的开销
 480         无需原子操作锁定及同步的开销
 481         原子操作(atomic operation)是不需要同步的,所谓原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何 context switch (切换到另一个线程)。
 482         原子操作可以是一个步骤,也可以是多个操作步骤,但是其顺序是不可以被打乱,或者切割掉只执行部分。视作整体是原子性的核心。
 483         方便切换控制流,简化编程模型
 484         高并发+高扩展性+低成本:一个CPU支持上万的协程都不是问题。所以很适合用于高并发处理
 485     缺点:
 486         无法利用多核资源:协程的本质是个单线程,它不能同时将 单个CPU 的多个核用上,协程需要和进程配合才能运行在多CPU上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu密集型应用。
 487         进行阻塞(Blocking)操作(如IO时)会阻塞掉整个程序
 488     16.1利用yield实现伪协程
 489         import time
 490         import queue
 491 
 492 
 493         def consumer(name):
 494             print("--->starting eating baozi...")
 495             while True:
 496                 new_baozi = yield
 497                 print("[%s] is eating baozi %s" % (name, new_baozi))
 498             # time.sleep(1)
 499 
 500 
 501         def producer():
 502             r = con.__next__()
 503             r = con2.__next__()
 504             n = 0
 505             while n < 5:
 506                 n += 1
 507                 con.send(n)
 508                 con2.send(n)
 509                 print("\033[32;1m[producer]\033[0m is making baozi %s" % n)
 510 
 511 
 512         if __name__ == __main__:
 513             con = consumer("c1")
 514             con2 = consumer("c2")
 515             p = producer()
 516     16.2协程的特点
 517         必须在只有一个单线程里实现并发
 518         修改共享数据不需加锁
 519         用户程序里自己保存多个控制流的上下文栈
 520         一个协程遇到IO操作自动切换到其它协程
 521     16.3Greenlet实现协程(手动)
 522         # -*- coding:utf-8 -*-
 523         from greenlet import greenlet
 524         def test1():
 525             print(12)
 526             gr2.switch()
 527             print(34)
 528             gr2.switch()
 529         def test2():
 530             print(56)
 531             gr1.switch()
 532             print(78)
 533         gr1 = greenlet(test1)
 534         gr2 = greenlet(test2)
 535         gr1.switch()
 536     16.4Gevent 协程自动切换
 537         import gevent
 538 
 539 
 540         def func1():
 541             print(\033[31;1m李闯在跟海涛搞...\033[0m)
 542             gevent.sleep(2)
 543             print(\033[31;1m李闯又回去跟继续跟海涛搞...\033[0m)
 544 
 545 
 546         def func2():
 547             print(\033[32;1m李闯切换到了跟海龙搞...\033[0m)
 548             gevent.sleep(1)
 549             print(\033[32;1m李闯搞完了海涛,回来继续跟海龙搞...\033[0m)
 550 
 551 
 552         gevent.joinall([
 553             gevent.spawn(func1),
 554             gevent.spawn(func2),
 555             # gevent.spawn(func3),
 556         ])
 557     16.5 比较同步与异步的性能差别
 558         from gevent import monkey;
 559 
 560         monkey.patch_all()
 561         import gevent
 562         from  urllib.request import urlopen
 563 
 564 
 565         def f(url):
 566             print(GET: %s % url)
 567             resp = urlopen(url)
 568             data = resp.read()
 569             print(%d bytes received from %s. % (len(data), url))
 570 
 571 
 572         gevent.joinall([
 573             gevent.spawn(f, https://www.python.org/),
 574             gevent.spawn(f, https://www.yahoo.com/),
 575             gevent.spawn(f, https://github.com/),
 576         ])
 577 17.通过gevent实现单线程下的多socket并发
 578     17.1server side :
 579         import sys
 580         import socket
 581         import time
 582         import gevent
 583 
 584         from gevent import socket, monkey
 585 
 586         monkey.patch_all()
 587 
 588 
 589         def server(port):
 590             s = socket.socket()
 591             s.bind((0.0.0.0, port))
 592             s.listen(500)
 593             while True:
 594                 cli, addr = s.accept()
 595                 gevent.spawn(handle_request, cli)
 596 
 597 
 598         def handle_request(conn):
 599             try:
 600                 while True:
 601                     data = conn.recv(1024)
 602                     print("recv:", data)
 603                     conn.send(data)
 604                     if not data:
 605                         conn.shutdown(socket.SHUT_WR)
 606 
 607             except Exception as  ex:
 608                 print(ex)
 609             finally:
 610                 conn.close()
 611 
 612 
 613         if __name__ == __main__:
 614             server(8001)
 615     17.2client side :
 616         import socket
 617 
 618         HOST = localhost  # The remote host
 619         PORT = 8001  # The same port as used by the server
 620         s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 621         s.connect((HOST, PORT))
 622         while True:
 623             msg = bytes(input(">>:"), encoding="utf8")
 624             s.sendall(msg)
 625             data = s.recv(1024)
 626             # print(data)
 627 
 628             print(Received, repr(data))
 629         s.close()
 630 18.事件驱动与异步IO
 631     方式一:创建一个线程,该线程一直循环检测是否有鼠标点击,那么这个方式有以下几个缺点:
 632     1. CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢?
 633     2. 如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
 634     3. 如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题;
 635     所以,该方式是非常不好的。
 636 
 637     方式二:就是事件驱动模型
 638     目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:
 639     1. 有一个事件(消息)队列;
 640     2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
 641     3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
 642     4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;
 643 
 644     当我们面对如下的环境时,事件驱动模型通常是一个好的选择:
 645     1.程序中有许多任务,而且…
 646     2.任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
 647     3.在等待事件到来时,某些任务会阻塞。
 648     4.当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。
 649     网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。
 650 19.select多并发socket
 651     #_*_coding:utf-8_*_
 652     __author__ = Alex Li
 653 
 654     import select
 655     import socket
 656     import sys
 657     import queue
 658 
 659 
 660     server = socket.socket()
 661     server.setblocking(0)
 662 
 663     server_addr = (localhost,10000)
 664 
 665     print(starting up on %s port %s % server_addr)
 666     server.bind(server_addr)
 667 
 668     server.listen(5)
 669 
 670 
 671     inputs = [server, ] #自己也要监测呀,因为server本身也是个fd
 672     outputs = []
 673 
 674     message_queues = {}
 675 
 676     while True:
 677         print("waiting for next event...")
 678 
 679         readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里
 680 
 681         for s in readable: #每个s就是一个socket
 682 
 683             if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了,
 684                 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀
 685                 #新连接进来了,接受这个连接
 686                 conn, client_addr = s.accept()
 687                 print("new connection from",client_addr)
 688                 conn.setblocking(0)
 689                 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
 690                 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到
 691                 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的
 692 
 693                 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送
 694 
 695             else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了
 696                 #客户端的数据过来了,在这接收
 697                 data = s.recv(1024)
 698                 if data:
 699                     print("收到来自[%s]的数据:" % s.getpeername()[0], data)
 700                     message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端
 701                     if s not  in outputs:
 702                         outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
 703 
 704 
 705                 else:#如果收不到data代表什么呢? 代表客户端断开了呀
 706                     print("客户端断开了",s)
 707 
 708                     if s in outputs:
 709                         outputs.remove(s) #清理已断开的连接
 710 
 711                     inputs.remove(s) #清理已断开的连接
 712 
 713                     del message_queues[s] ##清理已断开的连接
 714 
 715 
 716         for s in writeable:
 717             try :
 718                 next_msg = message_queues[s].get_nowait()
 719 
 720             except queue.Empty:
 721                 print("client [%s]" %s.getpeername()[0], "queue is empty..")
 722                 outputs.remove(s)
 723 
 724             else:
 725                 print("sending msg to [%s]"%s.getpeername()[0], next_msg)
 726                 s.send(next_msg.upper())
 727 
 728 
 729         for s in exeptional:
 730             print("handling exception for ",s.getpeername())
 731             inputs.remove(s)
 732             if s in outputs:
 733                 outputs.remove(s)
 734             s.close()
 735 
 736             del message_queues[s]
 737 
 738     #_*_coding:utf-8_*_
 739     __author__ = Alex Li
 740 
 741 
 742     import socket
 743     import sys
 744 
 745     messages = [ bThis is the message. ,
 746                  bIt will be sent ,
 747                  bin parts.,
 748                  ]
 749     server_address = (localhost, 10000)
 750 
 751     # Create a TCP/IP socket
 752     socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM),
 753               socket.socket(socket.AF_INET, socket.SOCK_STREAM),
 754               ]
 755 
 756     # Connect the socket to the port where the server is listening
 757     print(connecting to %s port %s % server_address)
 758     for s in socks:
 759         s.connect(server_address)
 760 
 761     for message in messages:
 762 
 763         # Send messages on both sockets
 764         for s in socks:
 765             print(%s: sending "%s" % (s.getsockname(), message) )
 766             s.send(message)
 767 
 768         # Read responses on both sockets
 769         for s in socks:
 770             data = s.recv(1024)
 771             print( %s: received "%s" % (s.getsockname(), data) )
 772             if not data:
 773                 print(sys.stderr, closing socket, s.getsockname() )
 774 
 775 20.selectors模块
 776     import selectors
 777     import socket
 778 
 779     sel = selectors.DefaultSelector()
 780 
 781 
 782     def accept(sock, mask):
 783         conn, addr = sock.accept()  # Should be ready
 784         print(accepted, conn, from, addr)
 785         conn.setblocking(False)
 786         sel.register(conn, selectors.EVENT_READ, read)
 787 
 788 
 789     def read(conn, mask):
 790         data = conn.recv(1000)  # Should be ready
 791         if data:
 792             print(echoing, repr(data), to, conn)
 793             conn.send(data)  # Hope it won‘t block
 794         else:
 795             print(closing, conn)
 796             sel.unregister(conn)
 797             conn.close()
 798 
 799 
 800     sock = socket.socket()
 801     sock.bind((localhost, 10000))
 802     sock.listen(100)
 803     sock.setblocking(False)
 804     sel.register(sock, selectors.EVENT_READ, accept)
 805 
 806     while True:
 807         events = sel.select()
 808         for key, mask in events:
 809             callback = key.data
 810             callback(key.fileobj, mask)
 811 
 812 21.RabbitMQ队列
 813     安装 http://www.rabbitmq.com/install-standalone-mac.html
 814     安装python rabbitMQ module
 815     pip install pika
 816     or
 817     easy_install pika
 818     or
 819     源码
 820     https://pypi.python.org/pypi/pika
 821 
 822     21.1send端
 823     # !/usr/bin/env python
 824     import pika
 825 
 826     connection = pika.BlockingConnection(pika.ConnectionParameters(
 827         localhost))
 828     channel = connection.channel()
 829 
 830     # 声明queue
 831     channel.queue_declare(queue=hello)
 832 
 833     # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
 834     channel.basic_publish(exchange=‘‘,
 835                           routing_key=hello,
 836                           body=Hello World!)
 837     print(" [x] Sent ‘Hello World!‘")
 838     connection.close()
 839 
 840     21.2receive端
 841     # _*_coding:utf-8_*_
 842     __author__ = Alex Li
 843     import pika
 844 
 845     connection = pika.BlockingConnection(pika.ConnectionParameters(
 846         localhost))
 847     channel = connection.channel()
 848 
 849     # You may ask why we declare the queue again ? we have already declared it in our previous code.
 850     # We could avoid that if we were sure that the queue already exists. For example if send.py program
 851     # was run before. But we‘re not yet sure which program to run first. In such cases it‘s a good
 852     # practice to repeat declaring the queue in both programs.
 853     channel.queue_declare(queue=hello)
 854 
 855 
 856     def callback(ch, method, properties, body):
 857         print(" [x] Received %r" % body)
 858 
 859 
 860     channel.basic_consume(callback,
 861                           queue=hello,
 862                           no_ack=True)
 863 
 864     print( [*] Waiting for messages. To exit press CTRL+C)
 865     channel.start_consuming()
 866 
 867     21.3 Work Queues
 868         21.3.1消息提供者代码
 869             import pika
 870             import time
 871 
 872             connection = pika.BlockingConnection(pika.ConnectionParameters(
 873                 localhost))
 874             channel = connection.channel()
 875 
 876             # 声明queue
 877             channel.queue_declare(queue=task_queue)
 878 
 879             # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
 880             import sys
 881 
 882             message =  .join(sys.argv[1:]) or "Hello World! %s" % time.time()
 883             channel.basic_publish(exchange=‘‘,
 884                                   routing_key=task_queue,
 885                                   body=message,
 886                                   properties=pika.BasicProperties(
 887                                       delivery_mode=2,  # make message persistent
 888                                   )
 889                                   )
 890             print(" [x] Sent %r" % message)
 891             connection.close()
 892         21.3.2消费者代码
 893             # _*_coding:utf-8_*_
 894 
 895             import pika, time
 896 
 897             connection = pika.BlockingConnection(pika.ConnectionParameters(
 898                 localhost))
 899             channel = connection.channel()
 900 
 901 
 902             def callback(ch, method, properties, body):
 903                 print(" [x] Received %r" % body)
 904                 time.sleep(20)
 905                 print(" [x] Done")
 906                 print("method.delivery_tag", method.delivery_tag)
 907                 ch.basic_ack(delivery_tag=method.delivery_tag)
 908 
 909 
 910             channel.basic_consume(callback,
 911                                   queue=task_queue,
 912                                   no_ack=True
 913                                   )
 914 
 915             print( [*] Waiting for messages. To exit press CTRL+C)
 916             channel.start_consuming()
 917         21.3.3消息持久化 
 918             First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
 919             --channel.queue_declare(queue=hello, durable=True)
 920             Although this command is correct by itself, it wont work in our setup. Thats because weve already defined a queue called hello which is not durable. RabbitMQ doesnt allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - lets declare a queue with different name, for exampletask_queue:
 921             --channel.queue_declare(queue=task_queue, durable=True)
 922             This queue_declare change needs to be applied to both the producer and consumer code.
 923             At that point were sure that the task_queue queue wont be lost even if RabbitMQ restarts. Now we need to mark our messages as persistent - by supplying a delivery_mode property with a value 2.
 924             --channel.basic_publish(exchange=‘‘,
 925                                   routing_key="task_queue",
 926                                   body=message,
 927                                   properties=pika.BasicProperties(
 928                                       delivery_mode=2,  # make message persistent
 929                                   ))
 930         21.3.4消息公平分发
 931             channel.basic_qos(prefetch_count=1)
 932 
 933         21.3.5带消息持久化+公平分发的完整代码
 934             生产者端
 935             # !/usr/bin/env python
 936             import pika
 937             import sys
 938 
 939             connection = pika.BlockingConnection(pika.ConnectionParameters(
 940                 host=localhost))
 941             channel = connection.channel()
 942 
 943             channel.queue_declare(queue=task_queue, durable=True)
 944 
 945             message =  .join(sys.argv[1:]) or "Hello World!"
 946             channel.basic_publish(exchange=‘‘,
 947                                   routing_key=task_queue,
 948                                   body=message,
 949                                   properties=pika.BasicProperties(
 950                                       delivery_mode=2,  # make message persistent
 951                                   ))
 952             print(" [x] Sent %r" % message)
 953             connection.close()
 954             消费者端
 955             # !/usr/bin/env python
 956             import pika
 957             import time
 958 
 959             connection = pika.BlockingConnection(pika.ConnectionParameters(
 960                 host=localhost))
 961             channel = connection.channel()
 962 
 963             channel.queue_declare(queue=task_queue, durable=True)
 964             print( [*] Waiting for messages. To exit press CTRL+C)
 965 
 966 
 967             def callback(ch, method, properties, body):
 968                 print(" [x] Received %r" % body)
 969                 time.sleep(body.count(b.))
 970                 print(" [x] Done")
 971                 ch.basic_ack(delivery_tag=method.delivery_tag)
 972 
 973 
 974             channel.basic_qos(prefetch_count=1)
 975             channel.basic_consume(callback,
 976                                   queue=task_queue)
 977 
 978             channel.start_consuming()
 979         21.3.5Publish\Subscribe(消息发布\订阅) 
 980             之前的例子都基本都是1对1的消息发送和接收,即消息只能发送到指定的queue里,但有些时候你想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了,
 981 
 982             An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
 983 
 984             Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息
 985 
 986 
 987             fanout: 所有bind到此exchange的queue都可以接收消息
 988             direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
 989             topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
 990 
 991                表达式符号说明:#代表一个或多个字符,*代表任何字符
 992                   例:#.a会匹配a.a,aa.a,aaa.a等
 993                       *.a会匹配a.a,b.a,c.a等
 994                  注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout 
 995 
 996             headers: 通过headers 来决定把消息发给哪些queue
 997 
 998         21.3.6消息publisher
 999 
1000             import pika
1001             import sys
1002 
1003             connection = pika.BlockingConnection(pika.ConnectionParameters(
1004                 host=localhost))
1005             channel = connection.channel()
1006 
1007             channel.exchange_declare(exchange=logs,
1008                                      type=fanout)
1009 
1010             message =  .join(sys.argv[1:]) or "info: Hello World!"
1011             channel.basic_publish(exchange=logs,
1012                                   routing_key=‘‘,
1013                                   body=message)
1014             print(" [x] Sent %r" % message)
1015             connection.close()
1016 
1017         21.3.7消息subscriber
1018             # _*_coding:utf-8_*_
1019             __author__ = Alex Li
1020             import pika
1021 
1022             connection = pika.BlockingConnection(pika.ConnectionParameters(
1023                 host=localhost))
1024             channel = connection.channel()
1025 
1026             channel.exchange_declare(exchange=logs,
1027                                      type=fanout)
1028 
1029             result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
1030             queue_name = result.method.queue
1031 
1032             channel.queue_bind(exchange=logs,
1033                                queue=queue_name)
1034 
1035             print( [*] Waiting for logs. To exit press CTRL+C)
1036 
1037 
1038             def callback(ch, method, properties, body):
1039                 print(" [x] %r" % body)
1040 
1041 
1042             channel.basic_consume(callback,
1043                                   queue=queue_name,
1044                                   no_ack=True)
1045 
1046             channel.start_consuming()
1047         21.3.8有选择的接收消息(exchange type=direct) 
1048             RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1049             publisher:
1050             import pika
1051             import sys
1052 
1053             connection = pika.BlockingConnection(pika.ConnectionParameters(
1054                 host=localhost))
1055             channel = connection.channel()
1056 
1057             channel.exchange_declare(exchange=direct_logs,
1058                                      type=direct)
1059 
1060             severity = sys.argv[1] if len(sys.argv) > 1 else info
1061             message =  .join(sys.argv[2:]) or Hello World!
1062             channel.basic_publish(exchange=direct_logs,
1063                                   routing_key=severity,
1064                                   body=message)
1065             print(" [x] Sent %r:%r" % (severity, message))
1066             connection.close()
1067 
1068             subscriber :
1069             import pika
1070             import sys
1071 
1072             connection = pika.BlockingConnection(pika.ConnectionParameters(
1073                 host=localhost))
1074             channel = connection.channel()
1075 
1076             channel.exchange_declare(exchange=direct_logs,
1077                                      type=direct)
1078 
1079             result = channel.queue_declare(exclusive=True)
1080             queue_name = result.method.queue
1081 
1082             severities = sys.argv[1:]
1083             if not severities:
1084                 sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
1085                 sys.exit(1)
1086 
1087             for severity in severities:
1088                 channel.queue_bind(exchange=direct_logs,
1089                                    queue=queue_name,
1090                                    routing_key=severity)
1091 
1092             print( [*] Waiting for logs. To exit press CTRL+C)
1093 
1094 
1095             def callback(ch, method, properties, body):
1096                 print(" [x] %r:%r" % (method.routing_key, body))
1097 
1098 
1099             channel.basic_consume(callback,
1100                                   queue=queue_name,
1101                                   no_ack=True)
1102 
1103         21.3.9 更细致的消息过滤
1104             Although using the direct exchange improved our system, it still has limitations - it cant do routing based on multiple criteria.
1105             In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).
1106             That would give us a lot of flexibility - we may want to listen to just critical errors coming from cron but also all logs from kern.
1107 
1108             publisher:
1109             import pika
1110             import sys
1111 
1112             connection = pika.BlockingConnection(pika.ConnectionParameters(
1113                 host=localhost))
1114             channel = connection.channel()
1115 
1116             channel.exchange_declare(exchange=topic_logs,
1117                                      type=topic)
1118 
1119             routing_key = sys.argv[1] if len(sys.argv) > 1 else anonymous.info
1120             message =  .join(sys.argv[2:]) or Hello World!
1121             channel.basic_publish(exchange=topic_logs,
1122                                   routing_key=routing_key,
1123                                   body=message)
1124             print(" [x] Sent %r:%r" % (routing_key, message))
1125             connection.close()
1126 
1127             ------------------------------------------------------------------
1128             subscriber:
1129             import pika
1130             import sys
1131 
1132             connection = pika.BlockingConnection(pika.ConnectionParameters(
1133                 host=localhost))
1134             channel = connection.channel()
1135 
1136             channel.exchange_declare(exchange=topic_logs,
1137                                      type=topic)
1138 
1139             result = channel.queue_declare(exclusive=True)
1140             queue_name = result.method.queue
1141 
1142             binding_keys = sys.argv[1:]
1143             if not binding_keys:
1144                 sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
1145                 sys.exit(1)
1146 
1147             for binding_key in binding_keys:
1148                 channel.queue_bind(exchange=topic_logs,
1149                                    queue=queue_name,
1150                                    routing_key=binding_key)
1151 
1152             print( [*] Waiting for logs. To exit press CTRL+C)
1153 
1154 
1155             def callback(ch, method, properties, body):
1156                 print(" [x] %r:%r" % (method.routing_key, body))
1157 
1158 
1159             channel.basic_consume(callback,
1160                                   queue=queue_name,
1161                                   no_ack=True)
1162 
1163             channel.start_consuming()
1164 
1165             ---------------------------------------------------------------
1166             To receive all the logs run:
1167 
1168             python receive_logs_topic.py "#"
1169             To receive all logs from the facility "kern":
1170 
1171             python receive_logs_topic.py "kern.*"
1172             Or if you want to hear only about "critical" logs:
1173 
1174             python receive_logs_topic.py "*.critical"
1175             You can create multiple bindings:
1176 
1177             python receive_logs_topic.py "kern.*" "*.critical"
1178             And to emit a log with a routing key "kern.critical" type:
1179 
1180             python emit_log_topic.py "kern.critical" "A critical kernel error"

 

# 进程/线程/协程 # IO:同步/异步/阻塞/非阻塞 # greenlet gevent # 事件驱动与异步IO # Select\Poll\Epoll异步IO 以及selectors模块 # Python队列/RabbitMQ队列

原文:http://www.cnblogs.com/fuyuhao/p/7193170.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!