Producer notify: item N° 126 appended to queue by Thread-1 Consumer notify : 126 popped from queue by Thread-2 Producer notify: item N° 36 appended to queue by Thread-1 Consumer notify : 36 popped from queue by Thread-3 Producer notify: item N° 205 appended to queue by Thread-1 Consumer notify : 205 popped from queue by Thread-2 Producer notify: item N° 55 appended to queue by Thread-1 Consumer notify : 55 popped from queue by Thread-3 Producer notify: item N° 152 appended to queue by Thread-1 Consumer notify : 152 popped from queue by Thread-2 Producer notify: item N° 70 appended to queue by Thread-1 Consumer notify : 70 popped from queue by Thread-3 Producer notify: item N° 168 appended to queue by Thread-1 Consumer notify : 168 popped from queue by Thread-2 Producer notify: item N° 74 appended to queue by Thread-1 Consumer notify : 74 popped from queue by Thread-3 Producer notify: item N° 87 appended to queue by Thread-1 Consumer notify : 87 popped from queue by Thread-2 Producer notify: item N° 59 appended to queue by Thread-1 Consumer notify : 59 popped from queue by Thread-3 Producer done Thread-1 Consumer done Thread-2 Consumer done Thread-3
Process
basic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
import multiprocessing from datetime import datetime, timedelta
deftask(task_id: int): print('task {} process id {}, {}'.format( task_id, multiprocessing.current_process().pid, datetime.utcnow() + timedelta(hours=8))) return
if __name__ == '__main__': processes = []
for i inrange(5): p = multiprocessing.Process(target=task, args=(i, )) processes.append(p) p.start()
for p in processes: p.join()
输出:
1 2 3 4 5
task 0 process id 7651, 2022-07-25 11:54:22.553022 task 3 process id 7654, 2022-07-25 11:54:22.554469 task 1 process id 7652, 2022-07-25 11:54:22.555789 task 4 process id 7655, 2022-07-25 11:54:22.564164 task 2 process id 7653, 2022-07-25 11:54:22.567585
Producer notify: item N° 119 appended to queue by producer-1 Consumer notify : 119 popped from queue by consumer-2 Producer notify: item N° 250 appended to queue by producer-1 Consumer notify : 250 popped from queue by consumer-2 Producer notify: item N° 215 appended to queue by producer-1 Consumer notify : 215 popped from queue by consumer-2 Producer notify: item N° 210 appended to queue by producer-1 Consumer notify : 210 popped from queue by consumer-2 Producer notify: item N° 70 appended to queue by producer-1 Consumer notify : 70 popped from queue by consumer-2 Producer notify: item N° 29 appended to queue by producer-1 Consumer notify : 29 popped from queue by consumer-2 Producer notify: item N° 237 appended to queue by producer-1 Consumer notify : 237 popped from queue by consumer-2 Producer notify: item N° 55 appended to queue by producer-1 Consumer notify : 55 popped from queue by consumer-2 Producer notify: item N° 10 appended to queue by producer-1 Consumer notify : 10 popped from queue by consumer-2 Producer notify: item N° 224 appended to queue by producer-1 Consumer notify : 224 popped from queue by consumer-2 Producer done producer-1 Consumer done consumer-2 Consumer done consumer-3
if __name__ == '__main__': task_lists = [ ('regex101.txt', 'https://regex101.com/'), ('morning.txt', 'https://www.politico.com/tipsheets/morning-money'), ('economics.txt', 'https://www.bloomberg.com/markets/economics') ] future_lists: list[Future] = [] pool = ThreadPoolExecutor(10) for file_name, url in task_lists: future = pool.submit(task, file_name, url) future.add_done_callback(done_callback) future_lists.append(future)
# 在所有待执行的future对象完成执行且释放已分配的资源后才会返回 pool.shutdown(True) response_lists = [] for f in future_lists: response_lists.append(f.result()) print(response_lists)
输出:
1 2 3 4 5 6 7 8 9
economics.txt 2022-07-25 09:49:59.536341 task done[thread name=ThreadPoolExecutor-0_2]: task thread name ThreadPoolExecutor-0_2 regex101.txt 2022-07-25 09:49:59.766962 task done[thread name=ThreadPoolExecutor-0_0]: task thread name ThreadPoolExecutor-0_0 morning.txt 2022-07-25 09:50:00.081044 task done[thread name=ThreadPoolExecutor-0_1]: task thread name ThreadPoolExecutor-0_1 ['task thread name ThreadPoolExecutor-0_0', 'task thread name ThreadPoolExecutor-0_1', 'task thread name ThreadPoolExecutor-0_2']
if __name__ == '__main__': task_lists = [ ('regex101.txt', 'https://regex101.com/'), ('morning.txt', 'https://www.politico.com/tipsheets/morning-money'), ('economics.txt', 'https://www.bloomberg.com/markets/economics') ] future_lists: list[Future] = [] pool = ProcessPoolExecutor(10) for file_name, url in task_lists: future = pool.submit(task, file_name, url) future.add_done_callback(done_callback) future_lists.append(future)
# 在所有待执行的future对象完成执行且释放已分配的资源后才会返回 pool.shutdown(True) response_lists = [] for f in future_lists: response_lists.append(f.result()) print(response_lists)
输出:
1 2 3 4 5 6 7 8 9
morning.txt 2022-07-25 09:43:24.237463 task done [process id=6920]: task process id 6924 economics.txt 2022-07-25 09:43:24.389305 task done [process id=6920]: task process id 6923 regex101.txt 2022-07-25 09:43:24.849390 task done [process id=6920]: task process id 6922 ['task process id 6922', 'task process id 6924', 'task process id 6923']
import multiprocessing as mp import time import random import os
defcalculate(func, args): result = func(*args) return'%s says that %s%s = %s' % ( mp.current_process().name, func.__name__, args, result )
defcalculatestar(args): return calculate(*args)
defmul(a, b): time.sleep(0.5 * random.random()) return a * b
defplus(a, b): time.sleep(0.5 * random.random()) return a + b
TASKS = [(mul, (i, 7)) for i inrange(10)] + \ [(plus, (i, 8)) for i inrange(10)]
if __name__ == '__main__': pool = mp.Pool(os.cpu_count()) results = [pool.apply_async(calculate, t) for t in TASKS] for r in results: print(r.get()) pool.close() pool.join()
if __name__ == '__main__': pool = mp.Pool(os.cpu_count()) [pool.apply_async(calculate, t, callback=on_success, error_callback=on_error) for t in TASKS] pool.close() pool.join()
if __name__ == '__main__': pool = mp.Pool(os.cpu_count()) r = pool.apply_async(f, (5,), callback=on_success, error_callback=on_error) pool.close() pool.join()
# 输出: # on_error: float division by zero
捕获进程抛出的异常
1 2 3 4 5 6 7 8 9 10 11
if __name__ == '__main__': pool = mp.Pool(os.cpu_count()) try: r = pool.map(f, [1, 2, 3, 4, 5, 6, 7, 8, 9]) except ZeroDivisionError as e: print(e) pool.close() pool.join()