首页 > 编程语言 > 详细

Python第十周 学习笔记(2)_多线程

时间:2018-05-27 16:16:29      阅读:272      评论:0      收藏:0      [点我收藏+]
多线程

并发

  • concurrency
    • 同一时间内出现多个请求,高并发就是短时间内出现大量请求

并行

  • parallel
    • 并行是解决并发的一个方法

并发的解决

  • 食堂打饭模型

    1、队列、缓冲区

  • queue(或LifoQueue、PriorityQueue)先进先出缓冲区(排队打饭),可以设置一个优先队列(女生优先)

2、争抢

  • 一旦一个请求获得资源后,会产生排他锁(不排队,谁抢到就是轮到谁打饭)
  • 缺点:有可能存在某些请求一直获得不到资源

3、预处理

  • 经常请求的资源可以预处理(缓存)。减少请求占用资源的时间(经常被点的菜品可以提前做)

4、并行

  • 通过购买服务器、或多开进程、线程实现并行处理,属于水平扩展(增加打饭窗口)

5、提速

  • 提高单个CPU性能,货单个服务器安装更多CPU,属于垂直扩展(提高单个窗口打饭速度)

6、消息中间件

  • 起缓冲作用,常见消息中间件:RabbitMQ、ActiveMQ、RocketMQ、kafka

进程

  • 进程间不可随意共享数据
  • 进程是线程的容器
  • linux存在父进程、子进程(fork)

线程

  • 一个进程可拥有多个线程
  • 线程可以共享进程的资源
  • 每一个线程都拥有线程ID、当前指令指针、寄存器集合与独立的堆栈
  • 创建线程比创建进程快10-100倍

线程的状态

ready

  • 线程能够运行,但在等待被调度。可能线程刚刚创建启动,或刚刚从阻塞中恢复,或者被其他线程抢占

running

  • 线程正在运行

blocked

  • 线程等待外部事件发生而无法运行,如IO操作

terminated

  • 线程完成,或退出被取消

技术分享图片

Python线程

  • 进程会启动一个解释器进程,线程共享一个解释器进程

Thread类

def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
  • target

    • 线程调用的对象,就是目标函数
  • name

    • 为线程起个名字
  • args

    • 为目标函数传递实参,参数类型需为元组
  • kwargs

    • 为目标函数关键字传参,需为字典
  • daemon
    • 下文详述

启动线程

threading.Thread(target=fn, name=‘fn‘).start()

线程退出

  • Python没有提供线程退出的方法,线程在下面情况时退出
    • 1、线程函数内语句执行完毕
    • 2、线程函数中抛出未处理异常

Python线程没有优先级、没有线程组的概念,不能被销毁、停止、挂起、恢复、中断

threading属性、方法

  • current_thread()

    • 返回当前线程对象
  • main_thread()

    • 返回主线程对象
  • active_count()

    • 当前处于alive状态的线程个数,包括主线程
  • enumerate()

    • 返回所有活着的线程的列表,不包括已经终止的线程和未开始的线程,包括主线程
  • get_ident()
    • 返回当前线程的ID,非0整数

Thread实例属性、方法

  • name

    • 线程实例的名称 初始化中的name参数传入,getName()、setName()可获取、设置(一般不改)
  • ident

    • 线程ID 线程启动后才有ID,否则为None。线程退出此ID依旧可以访问。ID可重复使用
  • is_alive()

    • 返回线程是否存活
  • start()

    • 启动线程。每个线程只能执行此方法一次
    • start()会调用run()
  • start函数的实现:
if not self._initialized:
    raise RuntimeError("thread.__init__() not called")

if self._started.is_set():
    raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
    _limbo[self] = self
try:
    _start_new_thread(self._bootstrap, ())
except Exception:
    with _active_limbo_lock:
        del _limbo[self]
    raise
self._started.wait()
  • run()

    • 运行线程函数
    • 单运行run()不会建立新线程,只是单纯的函数调用
  • run函数的实现:
try:
    if self._target:
        self._target(*self._args, **self._kwargs)
finally:
    # Avoid a refcycle if the thread is running a function with
    # an argument that has a member that points to the thread.
    del self._target, self._args, self._kwargs

每次调用run都会删除引用self._target, self._args, self._kwargs

  • 示例:
import threading
import time

class MyThread(threading.Thread):
    def start(self):
        print(‘-----start-----‘)
        super().start()

    def run(self):
        print(‘---------run----------‘)
        super().run()

def pr1(lst):
    for _ in lst:
        print(‘{} is running‘.format(threading.current_thread().name))

def worker():
    name = ‘worker{}‘.format(1)
    t = MyThread(target=pr1, name=name, args=([1, 2],))
    # time.sleep(.5)
    t.start()
    # time.sleep(1)
    t.run()

if __name__ == ‘__main__‘:
    worker()
  • 结果(可能发生):
    技术分享图片

  • 由于多线程,有时start()中调用run()的del删除引用来不及,如上图结果所示在删除引用前第二次的run()依然可以调用,打印出两次MainThread is running

多线程


  • 一个进程中至少有一个主线程作为程序入口,其他线程称作工作线程

线程安全

  • 线程执行一段代码,不会产生不确定行结果,那这段代码是线程安全的

  • 示例:
import threading

def worker():
    for x in range(100):
        print("{} is running.".format(threading.current_thread().name))

for x in range(5):
    name="worker{}".format(x)
    t=threading.Thread(name=name,target=worker)
    t.start()
  • 结果

技术分享图片

  • 原因
    • print函数打印时先打印指定的内容再打印默认的换行符,多线程中,一个print的调用中可能会出现线程的切换
    • print函数是线程不安全的

如何解决?

  • 1.print只处理输入的字符
    • 改为

print("{} is running.\n".format(threading.current_thread().name), end=‘‘)
  • 2.logging

    • 日志处理模块,线程安全

    • 示例:
import threading
import logging

def worker():
    for x in range(100):
        logging.warning("{} is running.".format(threading.current_thread().name))

for i in range(1,6):
    name="worker{}".format(i)
    t=threading.Thread(name=name,target=worker)
    t.start()

daemon和non-daemon线程

  • daemon=True

    • 当主线程执行完,如果不存在正在执行的non-daemon线程,则强制所有线程同主线程结束
  • daemon=False

    • 即使主线程执行完,会等待non-daemon线程执行完毕
  • daemon=None
    • 与当前调用线程(父线程)的daemon类型相同

daemon必须在start()之前设置,否则引发RuntimeError异常

  • isDaemon()

    • 是否是daemon线程
  • setDaemon

    • 设置为daemon线程,必须在start之前设置
  • join(timeout=None)
    • 一个线程中调用另一个线程的join方法,调用者将变为阻塞状态,直到被调用线程终止
    • 一个线程可以被join多次

daemon线程应用场景

  • 1.后台任务,如心跳包、监控
  • 2.主线程才有用的线程,如主线程中维护公共资源,当主线程已经清理了,准备退出,而工作线程使用这些资源工作也已经没有意义。一起退出最合适

threading.local类

  • 将这个类实例化得到一个全局对象,但不同的线程使用这个对象存储的数据其他线程看不见。

threading.Timer

  • 继承自Thread,用来定义多久执行一个函数

  • threading.Timer(interval, function, args=None, kwargs=None)
    • start方法执行后,Timer对象等待interval时间后,执行function,
    • 如果在执行函数之前的等待阶段,使用了cancel方法,就会跳过函数执行

线程同步


概念

  • 线程同步,线程间协同,通过锁,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作

  • 不同操作系统实现技术有所不同,有临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件(Event)等

Event

  • 是线程间通信机制中最简单的实现,使用一个内部的标记flag的变化来操作

  • set()

    • 将标记设为True
  • clear()

    • 将标记设为False
  • is_set()

    • 标记是否为True
  • wait(timeout=None)
    • 设置等待标记为True的时长,None为无限等待,直到等到返回True,未等到超时返回False

使用同一个Event对象的标记flag
不限制等待的个数

wait效率优于time.sleep,它会更快切换线程,提高并发效率

Lock

  • 一旦线程获得锁,其他试图获取锁的线程将被阻塞

  • acquire(blocking=True, timeout=-1)

    • 默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout禁止设置。成功获取锁,返回True,否则返回False
  • release()
    • 释放锁。可以从任何线程调用释放
    • 已上锁的锁,会被重置为unlocked未上锁的锁上调用,抛RuntimeError异常

锁保护了数据完整性,但是性能下降很多

加锁、解锁常用语句:

  • 1.使用try...finallly语句
  • 2.with上下文

锁的应用场景

  • 锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候
  • 如果只读取同一个共享资源不需要锁

非阻塞锁

import threading
import time
import logging

FORMAT = ‘%(asctime)-15s\t [%(threadName)s %(thread)8d] %(message)s‘
logging.basicConfig(format=FORMAT, level=logging.INFO)

def worker(tasks):
    for task in tasks:
        time.sleep(0.001)
        if task.lock.acquire(False):
            logging.info(‘{} {} begin to start‘.format(threading.current_thread(), task.name))
        else:
            logging.info(‘{} {} is working‘.format(threading.current_thread(), task.name))

class Task:
    def __init__(self, name):
        self.name = name
        self.lock = threading.Lock()

tasks = [Task(‘task-{}‘.format(x)) for x in range(10)]

for i in range(5):
    threading.Thread(target=worker, name=‘worker-{}‘.format(i), args=(tasks,)).start()

可重入锁

  • 线程相关锁
  • 线程A获得可重复锁,并可以多次成功获取,不会阻塞。最后要在线程A中做和acquire次数相同的release

  • 可重入锁,与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞获取锁。当锁未释放完,其他线程获取锁就会阻塞,直到当前持有锁的线程释放完锁

Condition

  • Condition(lock=None),可以传入一个Lock或Rlock对象,默认是Rlock

  • acquire(*args)

    • 获取锁
  • wait(self, timeout=None)

    • 等待或超时
  • notify(n=1)

    • 唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作
  • notify_all()

    • 唤醒所有等待的线程
  • 用于生产者、消费者模型,解决生产者消费者速度匹配问题
  • 采用了通知机制,非常有效率

使用方式

  • 使用Condition,必须先acquire,用完了要release,因为内部使用了锁,最好的方式是使用with上下文
  • 消费者wait,等待通知
  • 生产者生产好消息,对消费者发通知,可以使用notify或notify_all

Python第十周 学习笔记(2)_多线程

原文:http://blog.51cto.com/11281400/2120804

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!