Параллелизм / Многозадачность / Concurrency
Multiprocessing / Threading / Async
Виды многозадачности
- Вытесняющая - на уровне ОС - треды/процессы
- Кооперативная - на уровне кода = асинхронщина
CPU-bound vs IO-bound
- CPU-bound: когда нужно считать
- IO-bound: когда нужно ходить в бд, по сети
Multiprocessing
- Нужно для cpu-bound задач
- Под капотом используется fork
Threading
- Нужно для IO-bound задач
GIL
- Mutex ~ Lock
Зачем
- Защита от конкурентного доступа (Race Condition / Вытесняющая многозадачность) внутри интерпретатора - чтоб интерпретатор не падал
- Упрощение разработки интерпретатора, С-расширения, безопасное/простое управление памятью, скорость, подсчет ссылок
Как работает
- Gil делает все потоки спящими кроме текущего
- Каждые 5 мс спящие потоки проверяются хотят ли они работать
- Если да, то текущий поток, заканчивая свой тик, засыпает и управление передается ожидающему потоку
- Gil отключается когда идут io-операции - когда данные интерпретатора не меняются - поднятие Gil
Как быть
- Логическая защита: Lock, Semaphore, Event, Queue
concurent.futures.ThreadPoolExecutor
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import time
def func1():
print('func1')
time.sleep(2)
return 1
pool = ThreadPoolExecutor(max_workers=3)
for _ in range(10):
future = pool.submit(func1)
print('hello') # вывод сразу (pool еще продолжает работать)
print(future.done()) # проверка состояние джобы, вывод сразу
print(future.result()) # вывод после выполнения
pool.shutdown() # ожидаем завершения всех джобов
# с этой строки pool.submit не будет работать
# в виде контекст-менеджера
with ThreadPoolExecutor(max_workers=3) as pool:
...
# в рандомном порядке итерируемся по выполненным фьючам
for future in concurrent.futures.as_completed(futures):
print(future.result())
futures = pool.map(func, iterable) # выполняет func на каждом элементе iterable
# futures - список выполненных фьючей с сохранением порядка
Asyncio
- Нужно для IO-bound задач
Как устроено
- Event Loop - гоняет таски
asyncio.run
- запуск в лупе
await
- выполнение корутины (сама по себе корутина ничо не делает)
Примитивы
- Coroutine - функция с
async
/ объект, возвращаемый вызовом такой функции - Task - корутина, которая выполняется без
await
asyncio.create_task
- Future - аналог Promise
future = asyncio.get_running_loop().create_future()
future.set_result
- выставляем значение фьюче в другой корутине- При этом таска может еще продолжать выполняться
Несколько корутин
asyncio.gather
- запуск нескольких тасок параллельно- Минусы
- Выводит ошибку только первой таски
- Не отменяет другие таски, если одна упала
- Минусы
async with asyncio.TaskGroup() as tg: tg.create_task() -> {result()}
- Отменяет другие таски, если одна упала
- Выводит все ошибки
- Никаких
await
кста
- too much coro > problems > aio jobs / semaphore, socket open file limit
Примитивы синхронизации
asyncio.Lock
async with lock
- Когда lock заэквайрили, другие корутины не выполняют lock-блок
asyncio.Semaphore
async with semaphore
- Ограничивает кол-во одновременно выполняемых коро
- Типа заспавнили 100 тасок на вызов апи, но выполняем их по 2 одновременно, иначе апи нас заблочит/сдохнет
asyncio.Event
- Что-то типа флажка
- Передаем его в 2 коро:
event.wait
- ждем пока другая коро не вызоветevent.set
Как запустить синхронный код асинхронно?
future = asyncio.get_running_loop().run_in_executor(executor | None, func)
asyncio.as_completed(futures)
import asyncio
from concurrent.futures import ThreadPoolExecutor
def sync_function_1():
asyncio.sleep(2)
return "Result from sync function 1"
async def main():
loop = asyncio.get_running_loop()
with ThreadPoolExecutor() as executor:
tasks = [
loop.run_in_executor(executor, sync_function_1),
loop.run_in_executor(executor, sync_function_1),
loop.run_in_executor(executor, sync_function_1)
]
for completed_task in asyncio.as_completed(tasks):
result = await completed_task
print(result)
asyncio.run(main())
Материалы
- https://youtu.be/Qb9s3UiMSTA?si=5W2dvkU5cFqIAyot
- https://www.youtube.com/watch?v=AWX4JnAnjBE
- https://www.youtube.com/watch?v=zrA9WpSXrQE