Skip to content

Параллелизм / Многозадачность / Concurrency

Multiprocessing / Threading / Async

Виды многозадачности

  • Вытесняющая - на уровне ОС - треды/процессы
  • Кооперативная - на уровне кода = асинхронщина

CPU-bound vs IO-bound

  • CPU-bound: когда нужно считать
  • IO-bound: когда нужно ходить в бд, по сети

Multiprocessing

  • Нужно для cpu-bound задач
  • Под капотом используется fork

Threading

  • Нужно для IO-bound задач

GIL

  • Mutex ~ Lock

Зачем

  • Защита от конкурентного доступа (Race Condition / Вытесняющая многозадачность) внутри интерпретатора - чтоб интерпретатор не падал
  • Упрощение разработки интерпретатора, С-расширения, безопасное/простое управление памятью, скорость, подсчет ссылок

Как работает

  • Gil делает все потоки спящими кроме текущего
  • Каждые 5 мс спящие потоки проверяются хотят ли они работать
  • Если да, то текущий поток, заканчивая свой тик, засыпает и управление передается ожидающему потоку
  • Gil отключается когда идут io-операции - когда данные интерпретатора не меняются - поднятие Gil

Как быть

  • Логическая защита: Lock, Semaphore, Event, Queue

concurent.futures.ThreadPoolExecutor

import concurrent.futures
from concurrent.futures import ThreadPoolExecutor
import time


def func1():
    print('func1')
    time.sleep(2)
    return 1


pool = ThreadPoolExecutor(max_workers=3)
for _ in range(10):
    future = pool.submit(func1)

print('hello')  # вывод сразу (pool еще продолжает работать)
print(future.done())  # проверка состояние джобы, вывод сразу
print(future.result())  # вывод после выполнения

pool.shutdown()  # ожидаем завершения всех джобов
# с этой строки pool.submit не будет работать

# в виде контекст-менеджера
with ThreadPoolExecutor(max_workers=3) as pool:
    ...

# в рандомном порядке итерируемся по выполненным фьючам
for future in concurrent.futures.as_completed(futures):
    print(future.result())

futures = pool.map(func, iterable)  # выполняет func на каждом элементе iterable
# futures - список выполненных фьючей с сохранением порядка

Asyncio

  • Нужно для IO-bound задач

Как устроено

  • Event Loop - гоняет таски
    • asyncio.run - запуск в лупе
  • await - выполнение корутины (сама по себе корутина ничо не делает)

Примитивы

  • Coroutine - функция с async / объект, возвращаемый вызовом такой функции
  • Task - корутина, которая выполняется без await
    • asyncio.create_task
  • Future - аналог Promise
    • future = asyncio.get_running_loop().create_future()
    • future.set_result - выставляем значение фьюче в другой корутине
      • При этом таска может еще продолжать выполняться

Несколько корутин

  • asyncio.gather - запуск нескольких тасок параллельно
    • Минусы
      • Выводит ошибку только первой таски
      • Не отменяет другие таски, если одна упала
  • async with asyncio.TaskGroup() as tg: tg.create_task() -> {result()}
    • Отменяет другие таски, если одна упала
    • Выводит все ошибки
    • Никаких await кста
  • too much coro > problems > aio jobs / semaphore, socket open file limit

Примитивы синхронизации

  • asyncio.Lock
    • async with lock
    • Когда lock заэквайрили, другие корутины не выполняют lock-блок
  • asyncio.Semaphore
    • async with semaphore
    • Ограничивает кол-во одновременно выполняемых коро
    • Типа заспавнили 100 тасок на вызов апи, но выполняем их по 2 одновременно, иначе апи нас заблочит/сдохнет
  • asyncio.Event
    • Что-то типа флажка
    • Передаем его в 2 коро: event.wait - ждем пока другая коро не вызовет event.set

Как запустить синхронный код асинхронно?

  • future = asyncio.get_running_loop().run_in_executor(executor | None, func)
  • asyncio.as_completed(futures)
import asyncio
from concurrent.futures import ThreadPoolExecutor

def sync_function_1():
    asyncio.sleep(2)
    return "Result from sync function 1"

async def main():
    loop = asyncio.get_running_loop()

    with ThreadPoolExecutor() as executor:
        tasks = [
            loop.run_in_executor(executor, sync_function_1),
            loop.run_in_executor(executor, sync_function_1),
            loop.run_in_executor(executor, sync_function_1)
        ]

        for completed_task in asyncio.as_completed(tasks):
            result = await completed_task
            print(result)

asyncio.run(main())

Материалы

ThreadPoolExecutor