首页 > 系统服务 > 详细

并发编程 进程基础

时间:2018-09-20 23:15:13      阅读:245      评论:0      收藏:0      [点我收藏+]

操作系统

  • 多道 、分时、实时

  • 同步异步

    • 同步:一件事情完成后再做另一件事
    • 异步:同时做多件事
  • 阻塞和非阻塞

    • 阻塞:recv,accept,recvfrom
      • 会让整个进程进入阻塞队列
    • 非阻塞:进程只会在就绪和 运行状态中切换
  • 进程三状态:就绪 运行 阻塞

  • 并发并行

    • 并发是包含并行的
    • 并发:宏观上多个程序同时运行,实际是同一时间只运运行了一次
    • 并行:微观上多个程序同时运行
  • 子进程和主进程

    • pid ppid
  • 多并发的tcp服务端

    • import socket
      from multiprocessing import Process
      def communicate(conn):
          while True:
              conn.send("hello".encode("utf-8"))
              print(conn.recv(1024))
      if __name__ == ‘__main__‘:
          sk = socket.socket()
          sk.bind((‘127.0.0.1‘,9001))
          sk.listen()
          while True:
              conn,addr = sk.accept()
              Process(target=communicate,args=(conn,)).start()
      import socket
      sk = socket.socket()
      sk.connect((‘127.0.0.1‘,9001))
      while True:
          print(sk.recv(1024))
          mv = input(">>>>>>>>>>:").strip()
          sk.send(mv.encode("utf-8"))
  • 进程是操作系统中最小的资源分配单位

  • 进程

    • multiprocessing
    • multiprocessing.Process
    • 如何开启一个子进程
  • Process 开启子进程

    • 第二种开启子进程的方式

      • def func(index):
            time.sleep(random.random())
            print(‘第%s个邮件已经发送完毕‘%index)
        if __name__ == ‘__main__‘:
            p_lst = []
            for i in range(10):
                p = Process(target=func,args=(i,))
                p.start()
                p_lst.append(p)
            for p in p_lst:
                p.join()
            print(‘全部发送完毕‘)
    • join控制子进程

      • #子进程同步,执行完毕后才执行主程序后面的程序
        # import time
        # from multiprocessing import Process
        # def f(name):
        #     print("hello",name)
        #     time.sleep(1)
        # if __name__ == ‘__main__‘:
        #     p_list = []
        #     for i in range(5):
        #         p = Process(target=f,args=(i,))
        #         p.start()
        #         p_list.append(p)
        #         p.join()      #阻塞,
        #     print("主进程执行")
        
        #子程序异步执行,执行完了阻塞结束
        import time
        from multiprocessing import Process
        def f(name):
            print("hello",name)
            time.sleep(1)
        if __name__ == ‘__main__‘:
            p_list = []
            for i in range(10):
                p = Process(target=f,args=(i,))
                p.start()
                p_list.append(p)
            for i in p_list:
                i.join()
            print("主进程执行完毕")
    • 守护进程 daemon

      • 守护进程会随着主进程代码执行完毕而结束

      • 守护进程内无法再开启子进程,否则会抛出异常

      • 注意:进程之间是相互独立的,主进程代码运行结束,守护进程也会随即终止

      • import time
        from multiprocessing import Process
        def func1():
            count = 1
            while True:
                time.sleep(0.5)
                print(count*"*")
                count += 1
        def func2():
            print("func strat")
            time.sleep(5)
            print("func2 end")
        if __name__ == ‘__main__‘:
            p1 = Process(target=func1)
            p1.daemon = True      #定义为守护进程
            p1.start()          #执行
            Process(target=func2).start()
            time.sleep(3)
            print("主进程")
        #输出
        # func strat
        # *
        # **
        # ***
        # ****
        # *****
        # 主进程
        # func2 end

        如果主进程执行完毕那么守护进程也会结束,但是其他子进程如果没执行完还会继续执行

    • 作业:在进程之间保证数据安全性

    • from multiprocessing import Process,Lock

    • lock= Lock()实例对象

    • lock.acquire() 取钥匙开门

    • lock.release() 关门放钥匙

    • 例题 模拟抢票

    • import time
      import json
      from multiprocessing import Process,Lock
      def search(person):         #查票
          with open("ticket") as f:   #文件中保存着一个字典{"count":4}
              dic = json.load(f)   #读出文件中的字典
          time.sleep(0.2)
          print("%s查询余票"%person,dic["count"])
      def get_ticket(person):         #抢票
          with open("ticket") as f:
              dic = json.load(f)
          time.sleep(0.2)             #模拟延迟
          if dic["count"] >0:
              print("%s买到票了"%person)
              dic["count"] -= 1
              time.sleep(0.2)
              with open("ticket","w") as f:
                  json.dump(dic,f)    #写回文件
          else:
              print("%s没买到票"%person)
      def ticket(person,lock):
          search(person)
          lock.acquire()      #开门,一次只能进一个
          get_ticket(person)
          lock.release()      #关门
      if __name__ == ‘__main__‘:
          lock = Lock()
          for i in range(10):
              p = Process(target=ticket,args=("person%s"%i,lock))
              p.start()

      为了保证数据的安全,在异步的情况下,多个进程又可能同时修改同一份数据的时候,需要给这个数据上锁

    • 加锁的作用

      • 降低了程序的效率,让原来能够同时执行的代码编程顺序执行了,异步变同步的过程,保证了数据的安全
  • 同步控制

    • import time
      from multiprocessing import Process,Lock
      def func(num,lock):
          time.sleep(1)
          print("异步执行",num)
          lock.acquire()
          time.sleep(0.5)
          print("同步执行",num)
          lock.release()      #同步执行是依次执行,间隔0.5秒
      if __name__ == ‘__main__‘:
          lock = Lock()
          for i in range(10):
              p = Process(target=func,args=(i,lock))
              p.start()
  • 信号量 机制:计数器+锁实现的 Semaphore

    • 主程序控制一定数量的子程序同时执行,这些数量的子程序执行完一个就会有下一个子程序补充进来

    • import time
      import random
      from multiprocessing import Process,Semaphore
      def ktv(person,sem):
          sem.acquire()       #进
          print("%s走进KTV"%person)
          time.sleep(random.randint(1,3))     #随机延迟一到三秒
          print("%s走出ktv"%person)
          sem.release()       #出
      if __name__ == ‘__main__‘:
          sem = Semaphore(4)      #信号量为4,默认为1
          for i in range(10):
              Process(target=ktv,args=(i,sem)).start()
  • 事件 Event

    • 阻塞事件 wait() 方法

      • wait 是否阻塞是看event对象你不的一个属性
    • 控制这个属性的值

      • set()将这个属性的值改成True

      • clear() 将这个属性的值改成False

      • is_set() 判断当前属性是否为True

      • #模拟红绿灯,只有全部车通过后才停止
        import time
        import random
        from multiprocessing import Process,Event
        def traffic_light(e):
            print("红灯亮")
            while True:
                if e.is_set():
                    time.sleep(2)
                    print("红灯亮")
                    e.clear()
                else:
                    time.sleep(2)
                    print("绿灯亮")
                    e.set()
        def car(e,i):
            if not e.is_set():
                print("car%s在等待"%i)
                e.wait()
            print("car%s通过了"%i)
        if __name__ == ‘__main__‘:
            e = Event()
            p = Process(target=traffic_light,args=(e,))
            p.daemon =True    #变成守护进程
            p.start()
            p_list = []
            for i in range(10):
                time.sleep(random.randrange(0,3,2))
                p = Process(target=car,args=(e,i))
                p.start()
                p_list.append(p)
            for p in p_list:p.join()
  • 进程之间的通信(IPC)
    • 多个进程之间有一些固定的通信内容

    • socket给予文件家族通信

    • 进程之间虽然内存不共享,但是可以通信,

    • 进程队列 Queue
      • 进程之间数据是安全的
    • 队列是基于管道实现的

    • 管道是基于socket实现的

    • 队列 + 锁 简便的IPC机制 使得进程之间的数据安全

    • def consume(q):
          print(‘son-->‘,q.get())
          q.put(‘abc‘)
      if __name__ == ‘__main__‘:
          q = Queue()
          p = Process(target=consume,args=(q,))
          p.start()
          q.put({‘123‘:123})
          p.join()
          print(‘Foo-->‘,q.get())

      简单的生产消费模型

      def consume(q):
          print(‘son-->‘,q.get())
          q.put(‘abc‘)
      if __name__ == ‘__main__‘:
          q = Queue()
          p = Process(target=consume,args=(q,))
          p.start()
          q.put({‘123‘:123})
          p.join()
          print(‘Foo-->‘,q.get())
      • 相同的原理 JoinableQueue

      • task_done 通知队列已经有一个数据被处理了

      • q.join() 阻塞直到放入队列中所有的数据都被处理掉(有多少个数据就接受到多少taskdone)

      • import time
        import random
        from multiprocessing import Process,JoinableQueue
        def consumer(q,name):
            while True:
                food = q.get()
                time.sleep(random.uniform(0.3,0.8))
                print("%s吃了一个%s"%(name,food))
                q.task_done()
        def producer(q,name,food):
            for i in range(10):
                time.sleep(random.uniform(0.3,0.8))
                print("%s生产了%s%s"%(name,food,i))
                q.put(food+str(i))
        if __name__ == ‘__main__‘:
            jq = JoinableQueue()
            c1 = Process(target=consumer,args=(jq,"alex"))
            c1.daemon = True
            p1 = Process(target=producer,args=(jq,"libai","包子"))
            c1.start()
            p1.start()
            p1.join()
            jq.join()
  • 管道 进程之间数据不安全 且存取数据复杂

  • 进程池

    • 开启过多的进程并不能提高你的效率,反而会降低效率

    • 计算密集型 充分占用CPU 多进程可以充分利用多核 适合开启多进程,但是不适合开启很多多进程

    • IO密集型 大部分时间都在阻塞队列,而不是在运行状态 根本不太适合开启多进程

    • 提交任务:

      • 同步提交 apply

        • 返回值:子进程对应函数的返回值

        • 一个一个顺序执行的,并没有任何的并发效果

        • # import os
          # import time
          # from multiprocessing import Process,Pool
          # def task(num):
          #     time.sleep(0.5)
          #     print("%s: %s"%(num,os.getpid()))
          #     return num ** 2
          # if __name__ == ‘__main__‘:
          #     p = Pool(4)
          #     for i in range(20):
          #         res = p.apply(task,args=(i,)) #apply   提交任务方法,同步提交
          #         print("--->",res)
          #四个任务依次执行,轮换
      • 异步提交 apply_async

        • 没有返回值,要想所有任务能够顺利的执行完毕

          • p.close()
          • p.join() 必须先close在join,阻塞直到进程池中所有任务都执行完毕
        • 有返回值的情况下

          • res.get() #get不能再提交任务之后立刻执行,应该是先提交所有的任务再通过get获取结果

          • map()方法

            • 异步提交的简化版本
            • 自带close和join方法
          • import os
            import time
            from multiprocessing import Pool
            def task(num):
                time.sleep(1)
                print("%s: %s"%(num,os.getpid()))
                return num **2
            if __name__ == ‘__main__‘:
                p = Pool(4)
                for i in range(20):
                    res = p.apply_async(task,args=(i,))     #apply_async   异步提交
                p.close()
                p.join()
            #输出结果同时四个认识执行

并发编程 进程基础

原文:https://www.cnblogs.com/yuncong/p/9683881.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!