from random import randint from threading import Thread from time import time, sleep def download(filename): print(‘开始下载%s...‘ % filename) time_to_download = randint(5, 10) sleep(time_to_download) print(‘%s下载完成! 耗费了%d秒‘ % (filename, time_to_download)) def main(): start = time() t1 = Thread(target=download, args=(‘Python从入门到住院.pdf‘,)) t1.start() t2 = Thread(target=download, args=(‘Peking Hot.avi‘,)) t2.start() t1.join() t2.join() end = time() print(‘总共耗费了%.3f秒‘ % (end - start)) if __name__ == ‘__main__‘: main()
from multiprocessing import Process from os import getpid from random import randint from time import time, sleep def download_task(filename): print(‘启动下载进程,进程号[%d].‘ % getpid()) print(‘开始下载%s...‘ % filename) time_to_download = randint(5, 10) sleep(time_to_download) print(‘%s下载完成! 耗费了%d秒‘ % (filename, time_to_download)) def main(): start = time() p1 = Process(target=download_task, args=(‘Python从入门到住院.pdf‘, )) p1.start() p2 = Process(target=download_task, args=(‘Peking Hot.avi‘, )) p2.start() p1.join() p2.join() end = time() print(‘总共耗费了%.2f秒.‘ % (end - start)) if __name__ == ‘__main__‘: main()
import concurrent.futures import urllib.request URLS = [‘http://www.foxnews.com/‘, ‘http://www.cnn.com/‘, ‘http://europe.wsj.com/‘, ‘http://www.bbc.co.uk/‘, ‘http://some-made-up-domain.com/‘] # Retrieve a single page and report the url and contents def load_url(url, timeout): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() # We can use a with statement to ensure threads are cleaned up promptly with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # Start the load operations and mark each future with its URL future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print(‘%r generated an exception: %s‘ % (url, exc)) else: print(‘%r page is %d bytes‘ % (url, len(data)))
import concurrent.futures import math PRIMES = [ 112272535095293, 112582705942171, 112272535095293, 115280095190773, 115797848077099, 1099726899285419] def is_prime(n): if n % 2 == 0: return False sqrt_n = int(math.floor(math.sqrt(n))) for i in range(3, sqrt_n + 1, 2): if n % i == 0: return False return True def main(): with concurrent.futures.ProcessPoolExecutor() as executor: for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)): print(‘%d is prime: %s‘ % (number, prime)) if __name__ == ‘__main__‘: main()
from multiprocessing import Process, Queue from random import randint from time import time def task_handler(curr_list, result_queue): total = 0 for number in curr_list: total += number result_queue.put(total) def main(): processes = [] number_list = [x for x in range(1, 100000001)] result_queue = Queue() index = 0 # 启动8个进程将数据切片后进行运算 for _ in range(8): p = Process(target=task_handler, args=(number_list[index:index + 12500000], result_queue)) index += 12500000 processes.append(p) p.start() # 开始记录所有进程执行完成花费的时间 start = time() for p in processes: p.join() # 合并执行结果 total = 0 while not result_queue.empty(): total += result_queue.get() print(total) end = time() print(‘Execution time: ‘, (end - start), ‘s‘, sep=‘‘) if __name__ == ‘__main__‘: main()
import asyncio async def main(): print(‘Hello ...‘) await asyncio.sleep(1) print(‘... World!‘) # Python 3.7+ asyncio.run(main())
asyncio is a library to write concurrent code using the async/await syntax.
asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc.
asyncio is often a perfect fit for IO-bound and high-level structured network code.
asyncio provides a set of high-level APIs to:
run Python coroutines concurrently and have full control over their execution;
perform network IO and IPC;
control subprocesses;
distribute tasks via queues;
synchronize concurrent code;
Additionally, there are low-level APIs for library and framework developers to:
create and manage event loops, which provide asynchronous APIs for
, runningsubprocesses
, handlingOS signals
, etc;implement efficient protocols using transports;
bridge callback-based libraries and code with async/await syntax.
import asyncio import concurrent.futures def blocking_io(): # File operations (such as logging) can block the # event loop: run them in a thread pool. with open(‘/dev/urandom‘, ‘rb‘) as f: return f.read(100) def cpu_bound(): # CPU-bound operations will block the event loop: # in general it is preferable to run them in a # process pool. return sum(i * i for i in range(10 ** 7)) async def main(): loop = asyncio.get_running_loop() ## Options: # 1. Run in the default loop‘s executor: result = await loop.run_in_executor( None, blocking_io) print(‘default thread pool‘, result) # 2. Run in a custom thread pool: with concurrent.futures.ThreadPoolExecutor() as pool: result = await loop.run_in_executor( pool, blocking_io) print(‘custom thread pool‘, result) # 3. Run in a custom process pool: with concurrent.futures.ProcessPoolExecutor() as pool: result = await loop.run_in_executor( pool, cpu_bound) print(‘custom process pool‘, result) asyncio.run(main())
import asyncio def callback(loop, i): print("success time {} {}".format(i, loop.time())) async def get_html(url): print("start get url") await asyncio.sleep(1) print("end get url") # 两种创建的方法 if __name__ == "__main__": loop = asyncio.get_event_loop() # get_future = asyncio.ensure_future(get_html("http://www.imooc.com")) task = loop.create_task(get_html("http://www.imooc.com")) loop.run_until_complete(task) # 接收的是一个future对象
import asyncio import random import time async def worker(name, queue): while True: # Get a "work item" out of the queue. sleep_for = await queue.get() # Sleep for the "sleep_for" seconds. await asyncio.sleep(sleep_for) # Notify the queue that the "work item" has been processed. queue.task_done() print(f‘{name} has slept for {sleep_for:.2f} seconds‘) async def main(): # Create a queue that we will use to store our "workload". queue = asyncio.Queue() # Generate random timings and put them into the queue. total_sleep_time = 0 for _ in range(20): sleep_for = random.uniform(0.05, 1.0) total_sleep_time += sleep_for queue.put_nowait(sleep_for) # Create three worker tasks to process the queue concurrently. tasks = [] for i in range(3): task = asyncio.create_task(worker(f‘worker-{i}‘, queue)) tasks.append(task) # Wait until the queue is fully processed. started_at = time.monotonic() await queue.join() total_slept_for = time.monotonic() - started_at # Cancel our worker tasks. for task in tasks: task.cancel() # Wait until all worker tasks are cancelled. await asyncio.gather(*tasks, return_exceptions=True) print(‘====‘) print(f‘3 workers slept in parallel for {total_slept_for:.2f} seconds‘) print(f‘total expected sleep time: {total_sleep_time:.2f} seconds‘) asyncio.run(main())
- 计算机的cpu物理核数是同时可以并行的线程数量(cpu只能看到线程,线程是cpu调度分配的最小单位),由于超线程技术,实际上可以并行的线程数量通常是物理核数的两倍,这也是操作系统看到的核数。我们只care可以并行的线程数量,所以之后所说的核数是操作系统看到的核数,所指的核也是超线程技术之后的那个核(不是物理核)。
- 进程是操作系统资源分配(内存,显卡,磁盘)的最小单位,线程是执行调度(即cpu调度)的最小单位(cpu看到的都是线程而不是进程),一个进程可以有一个或多个线程,线程之间共享进程的资源,通过这样的范式,就可以减少进程的创建和销毁带来的代价,可以让进程少一点,保持相对稳定,不断去调度线程就好。如果计算机有多个cpu核,且计算机中的总的线程数量小于核数,那线程就可以并行运行在不同的核中,如果是单核多线程,那多线程之间就不是并行,而是并发,即为了均衡负载,cpu调度器会不断的在单核上切换不同的线程执行,但是我们说过,一个核只能运行一个线程,所以并发虽然让我们看起来不同线程之间的任务是并行执行的,但是实际上却由于增加了线程切换的开销使得代价更大了。如果是多核多线程,且线程数量大于核数,其中有些线程就会不断切换,并发执行,但实际上最大的并行数量还是当前这个进程中的核的数量,所以盲目增加线程数不仅不会让你的程序更快,反而会给你的程序增加额外的开销。
- 任务可以分为计算密集型和IO密集型,假设我们现在使用一个进程来完成这个任务,对计算密集型任务,可以使用【核心数】个线程,就可以占满cpu资源,进而可以充分利用cpu,如果再多,就会造成额外的开销;对于IO密集型任务(涉及到网络、磁盘IO的任务都是IO密集型任务),线程由于被IO阻塞,如果仍然用【核心数】个线程,cpu是跑不满的,于是可以使用更多个线程来提高cpu使用率。
- 实现并行计算有三种方式,多线程,多进程,多进程+多线程。如果是多进程,因为每个进程资源是独立的(地址空间和数据空间),就要在操作系统层面进行通信,如管道,队列,信号等;多线程的话会共享进程中的地址空间和数据空间,一个线程的数据可以直接提供给其他线程使用,但方便的同时会造成变量值的混乱,所以要通过线程锁来限制线程的执行
- 其他语言,CPU 是多核时是支持多个线程同时执行。但在 Python 中,无论是单核还是多核,一个进程同时只能由一个线程在执行。其根源是 GIL 的存在。GIL 的全称是 Global Interpreter Lock(全局解释器锁),来源是 Python 设计之初的考虑,为了数据安全所做的决定。某个线程想要执行,必须先拿到 GIL,我们可以把 GIL 看作是“通行证”,并且在一个 Python 进程中,GIL 只有一个。拿不到通行证的线程,就不允许进入 CPU 执行。所以多线程在python中很鸡肋。