Параллелизм
Multiprocessing / Threading / Async
CPU-bound vs IO-bound
- CPU-bound: когда нужно считать
- IO-bound: когда нужно ходить в бд, по сети
Multiprocessing
- Нужно для cpu-bound задач
- Под капотом используется fork
Threading
- Нужно для IO-bound задач
GIL
- Mutex ~ Lock
Зачем
- Защита от конкурентного доступа (Race Condition / Вытесняющая многозадачность) внутри интерпретатора - чтоб интерпретатор не падал
- Упрощение разработки интерпретатора, С-расширения, безопасное/простое управление памятью, скорость, подсчет ссылок
Как работает
- Gil делает все потоки спящими кроме текущего
- Каждые 5 мс спящие потоки проверяются хотят ли они работать
- Если да, то текущий поток, заканчивая свой тик, засыпает и управление передается ожидающему потоку
- Gil отключается когда идут io-операции - когда данные интерпретатора не меняются - поднятие Gil
Как быть
- Логическая защита: Lock, Semaphore, Event, Queue
concurent.futures.ThreadPoolExecutor
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import time
def func1():
print('func1')
time.sleep(2)
return 1
pool = ThreadPoolExecutor(max_workers=3)
for _ in range(10):
future = pool.submit(func1)
print('hello') # вывод сразу (pool еще продолжает работать)
print(future.done()) # проверка состояние джобы, вывод сразу
print(future.result()) # вывод после выполнения
pool.shutdown() # ожидаем завершения всех джобов
# с этой строки pool.submit не будет работать
# в виде контекст-менеджера
with ThreadPoolExecutor(max_workers=3) as pool:
...
# в рандомном порядке итерируемся по выполненным фьючам
for future in concurrent.futures.as_completed(futures):
print(future.result())
futures = pool.map(func, iterable) # выполняет func на каждом элементе iterable
# futures - список выполненных фьючей с сохранением порядка
Asyncio
- Нужно для IO-bound задач
Как устроено
- Event Loop - гоняет таски
asyncio.run
- запуск в лупе
await
- выполнение корутины (сама по себе корутина ничо не делает)
Примитивы
- Coroutine - функция с
async
/ объект, возвращаемый вызовом такой функции - Task - корутина, которая выполняется без
await
asyncio.create_task
- Future - аналог Promise
future = asyncio.get_running_loop().create_future()
future.set_result
- выставляем значение фьюче в другой корутине- При этом таска может еще продолжать выполняться
Несколько корутин
asyncio.gather
- запуск нескольких тасок параллельно- Минусы
- Выводит ошибку только первой таски
- Не отменяет другие таски, если одна упала
- Минусы
async with asyncio.TaskGroup() as tg: tg.create_task() -> {result()}
- Отменяет другие таски, если одна упала
- Выводит все ошибки
- Никаких
await
кста
- too much coro > problems > aio jobs / semaphore, socket open file limit
Примитивы синхронизации
asyncio.Lock
async with lock
- Когда lock заэквайрили, другие корутины не выполняют lock-блок
asyncio.Semaphore
async with semaphore
- Ограничивает кол-во одновременно выполняемых коро
- Типа заспавнили 100 тасок на вызов апи, но выполняем их по 2 одновременно, иначе апи нас заблочит/сдохнет
asyncio.Event
- Что-то типа флажка
- Передаем его в 2 коро:
event.wait
- ждем пока другая коро не вызоветevent.set
Материалы
- https://youtu.be/Qb9s3UiMSTA?si=5W2dvkU5cFqIAyot
- https://www.youtube.com/watch?v=AWX4JnAnjBE
- https://www.youtube.com/watch?v=zrA9WpSXrQE