728x90

기초

동기와 비동기

  • 동기 : 호출대상(=함수나 메서드)를 호출했을 때, 그 처리가 완료될 때까지 호출자는 다음 처리를 하지 않는 것
  • 비동기 : 호출대상(=함수나 메서드)를 호출했을 때, 호출자는 다음 처리를 진행할 수 있는 것
    • 스레드 기반 비동기를 다중스레드라 한다.
    • 프로세스 기반 비동기를 다중프로세스라 한다.
    • asyncio를 기반으로도 비동기가 가능하다.

다중스레드의 문제점

  • thread-safe 보장 필요

다중프로세스의 문제점

  • 오버헤드가 다중스레드에 비해 큼
  • 호출대상과 반환값은 피클가능한 객체만 가능
    • 이는 multiprocessing 모듈의 Queue를 내부적으로 사용하기 때문
  • OS가 subprocess 생성 시 부모 process를 fork하는게 기본인 OS에서는 난수생성시 같은 값을 그대로 사용할 수 있음
    • random seed를 초기화하거나 파이썬의 경우 built-in random을 사용하면 알아서 random seed 초기화함

다중프로세스vs다중스레드vsAsyncio

  • 다중프로세스가 유용할 때
    • CPU-bound
  • 다중스레드가 유용할 때
    • I/O-bound
  • Asyncio가 유용할 때
    • I/O-bound
    • thread-safe를 비교적 덜 생각해서, 유지보수가 잘 되어야하는 부분
      • 다중스레드의 경우 lock을 걸어야하는 경우가 많아지면 유지보수가 힘들어짐

concurrent.futures

  • 동시 처리를 수행하기 위한 표준 라이브러리
  • 예전에는 threading, multiprocessing 라이브러리를 활용했지만, 지금은 concurrent.futures로 둘 다 구현 가능하고 권장함

concurrent.futures.Future와 concurrent.futures.Executor

concurrent.futures.Executor는 추상 클래스

  • 구현한 서브 클래스로는 ThreadPoolExecutor, ProcessPoolExecutor가 있다.
  • ThreadPoolExecutor나 ProcessPoolExecutor나 API 사용법이 유사하여 서로간 변경이 간단하다.
  • Executor 클래스에 비동기로 수행하고 싶은 callable 객체를 전달하면(submit메서드), 처리 실행을 스케줄링한 future(=Future 객체)를 반환한다.
    • 처리 실행을 스케줄링했다란, 여러 스레드에 실행을 위임하는 것
    • 첫 callable은 반드시 submit과 동시에 실행되지만, 그 이후 callable은 여유 worker가 있으면 submit과 동시에 실행되지만, 여유 worker가 없으면 pending으로 시작됨
    • future의 메서드
      • return값 확인은 result
      • 상태 확인용 메서드(done, running, cancelled)
  • max_workers의 기본값은 코어수 * 5
  • concurrent.futures.Future는
    • 테스트를 위해 직접 만들 수는 있지만, executor.submit을 통해 만들어 사용하기를 권장한다.(공식문서)
    • submit과 동시에 실행할 수 있으면 하고 안되면 pending으로, 상태값을 갖는 객체이다.

concurrent.futures.wait

  • futures, timeout, return_when을 받아 지정한 return_when 규칙과 timeout에 따라 완료된 futures와 완료되지 않은 futures를 나눠 반환한다.

concurrent.futures.as_completed

  • futures, timeout을 받아 완료된 순으로 반환하는 iterators over futures를 받는다.
  • as_completed를 호출하기 전에 완료된 futures가 argument에 있었다면 그것을 먼저 반환함
  • 메인 스레드에서 blocking된 상태로 완료되는 future 순으로 받는 형태이다.

asyncio

  • async/await 문법을 사용하여 동시성 코드를 작성하는 라이브러리
  • I/O-bound and 고수준의 정형화된 네트워크를 작성하는데 적합할 때가 많다.
  • 동시성은 task단위로 이루어진다. coro단위가 아니다.
    • 즉, await가 붙는 순간 await의 오른쪽을 현재 task에서 실행시키고 제어권을 other task로 이동시킨다.
    • 다수의 task로 코드를 작성하지 않는다면 asyncio는 의미가 없다.
  • thread-safe일 경우가 많다. 단일 스레드 기반이므로
  • asyncio로 작성되지 않은 타 라이브러리와 함께 사용할 경우에는 loop.run_in_executor를 통해 다중스레드 형태로 사용한다.

High-level APIs

Coroutines and Tasks

  • Coroutines
    • coroutine function
      • async def로 작성된 함수
    • coroutine object
      • coroutine function을 호출하여 얻은 객체
      • coro라고 부를 때가 많다.
      • coro자체가 function body를 실행하지는 않는다.
    • coro(coroutine function body)를 실행하는 방법
      • asyncio.run()에 넘긴다.
        • 대게는 asyncio.run(main()) 형태로 top-level entry point로서 사용
      • await coro
        • 이것은 coro가 완료될 때까지 current task에서는 기다리고 제어권은 other task가 가져간다.
      • asyncio.create_task(coro, *, name=None):
        • coro를 실행과 동시에 task로 변환하여 반환한다.(task_1)
          • coro를 실행하고 내부에 제어권을 계속 가져가다가 await를 만나면 그 때 task_1의 제어권을 other task에 넘긴다.
      • asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
        • coro들은 task로 wrap된다. 꼭 입력받은 순으로 wrap되지는 않음
          • coro1의 body를 실행하다가 await를 만나면 coro1을 task1으로 바꾸고 이벤트 루프에 등록되고, 다른 argument에 대해서도 똑같이 한다. 하지만 순서는 보장안됨
        • one future를 반환한다. 따라서 awaitable
        • res = await asyncio.gather(coro1, coro2, ...) 의 형태로 사용
        • res는 완료순이 아니라 입력받은 순의 결과로 정렬된다.
      • asyncio.wait(fs, *, timeout=None, return_when=ALL_COMPLETED)
        • awaitable objects(=fs)를 동시적으로 실행한다.
        • timeout, return_when을 고려하여 wait
        • two sets of Future를 반환(done, not_done)
      • asyncio.as_completed(fs, *, loop=None, timeout=None)
        • awaitable objects(=fs)를 동시적으로 실행하고
        • iterator over coro
          • 각 coro는 fs에 있는 coro가 아니라, fs에 있는 것 중 return이 먼저 나오는 것을 받아오는 새로운 coro인 것이다.(wait for one)
  • Awaitables
    • 키워드 await가 붙을 수 있는 객체를 awaitable object라 한다.
    • await “something”은 current task에서 something이 완료될 때까지 기다린다는 것이다. 그리고 제어권은 other task로 간다.
      • other task가 없으면 current task에서 그냥 기다릴 뿐
      • 동시에 여러 coros을 실행케하려면 asyncio.gather나 asyncio.wait, asyncio.as_completed 를 실행한다.
      • 완전 동시는 아니지만 제어권을 금방 다시 가져와서 여러 coro를 순차적으로 실행시키는 것은 asyncio.create_task를 사용한다.
    • awaitable은
      • coro
      • task/future
      • __await__가 정의된 클래스의 객체
  • current task에서 주도적으로 other task로 동작시키는 방법(제어권을 넘기는 방법)
    • asyncio.sleep(0)을 사용한다.
      • asyncio.sleep은 coro를 반환한다.
      • await asyncio.sleep(0)을 하면 current task에서는 0초만큼 block하고 other task로 제어권을 넘김
  • event loop
    • 이벤트 루프는 다수의 tasks를 가질 수 있고 각 task는 각자의 call stack을 갖는다. 하지만 각 시점마다 한개의 task만 처리한다.
    • coroutine function 내부에서 현재 event loop를 얻으려면 asyncio.get_running_loop()를 호출
    • loop.run_in_executor를 통해 동기 함수도 다중스레드로 처리하여 동시성을 얻을 수 있다.
    • asyncio.{run, gather, wait} 등으로 coro를 실행시킨게 아니라, get_event_loop로 얻은 loop를 두고 loop.create_task했을 때는 coro가 즉시 동작하지 않는다.
    • get_event_loop는 하나의 스레드 사용을 위해 만들어진 메소드다. 다수 스레드 경우 스레드 내부에 new_event_loop 호출하고 set_event_loop해서 스레드에 새 이벤트 루프를 매핑하고 new event loop를 해당 스레드에서 사용하면 된다.
    • asyncio.create_task(coro)도 결국 loop.create_task(coro)을 쓰는 것이고, 후자의 경우 loop를 get_running_loop를 통해 얻은 loop여야 한다.
      • asyncio.create_task(coro)을 쓰는 게 현대식 방법

asyncio.Future

  • concurrent.futures.Future처럼, 언젠가 완료될 작업 클래스를 가리킨다.

asyncio.Task

  • asyncio.Future의 subclass
  • coroutin을 wraps했을 때의 future를 task라 한다.

asyncio.run

  • parameter로 받은 coro를 실행하고 result를 반환함
  • 호출하면 이벤트 루프를 생성하고 이 이벤트 루프가 coro의 실행을 제어한다.
  • 단일 이벤트 루프로 돌기 때문에, asyncio.run(main()) 형태로 한번의 호출만 존재한다.

asyncio.ensure_future

  • future(혹은 task)를 넣으면 그대로 반환
  • coro를 넣으면 task로 만들어 반환
  • 프레임워크 설계자를 위한 함수(최종 사용자는 create_task 사용하면 됨)
    • 무조건 결과가 future임을 만들기 위한 함수이다.
    • asyncio.gather(*aws, …)도 내부에 보면 awaitable 객체가 오면 다 future가 되게 만드는 ensure_future를 호출한다.

loop.run_in_executor

  • asyncio를 활용하다가 동기 I/O를 동시적으로 처리하려고 할 때 사용
  • parameter로 concurrent.futures.Executor 구현 클래스 객체를 넣든가, None(default executor를 사용하며, set_default_executor를 따로 하지 않았다면 ThreadPoolExecutor를 사용)
  • executor, func, *args를 parameter로 받는다.

async with

  • 기존 동기 context manager의 with 사용과 다른 점은 딱 하나다.
    • enter와 exit의 동작이 coro로 수행된다는 점이다.
      • 즉, enter와 exit가 I/O-bound여서 event loop가 그 시간에 다른 task를 했으면할 때 사용한다.
  • 즉, aenter와 aexit가 I/O-bound일 때, async로 둬서 다른 task가 cpu사용할 수 있게끔하기 위함이다.

async for

  • 기존 동기 이터레이터 만드는 것과 차이점은 딱 하나다.
    • anext가 coro로 동작한다는 것
    • 대표적인 예로 next 원소를 db에서 가져오는 경우, I/O-bound이므로 이 때 event loop가 타 task를 다룰 수 있게 하는 상황에 필요

예시

순차 처리와 멀티스레딩 기본 예제

"""
아래 max_workers를 바꿔가며 테스트한다.
max_workers=1이면 첫 download url은 submit과 동시에 running이지만
다음 url부터는 submit해도 pending으로 시작됨

max_workers=3으로 두면 sequential보다 다중스레드가 낫다는걸 확인가능
"""
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor
import time
from hashlib import md5
from pathlib import Path
from urllib import request

def elapsed_time(f):
  def wrapper(*args, **kwargs):
    st = time.time()
    v = f(*args, **kwargs)
    print(f"{f.__name__}: {time.time() - st}")
    return v
  return wrapper

urls = [
  '<https://twitter.com>',
  '<https://facebook.com>',
  '<https://instagram.com>'
]

def download(url):
  print(f"DOWNLOAD START, url = {url}")
  req = request.Request(url)

  # 파일 이름에 / 등이 포함되지 않도록 함
  name = md5(url.encode('utf-8')).hexdigest()
  file_path = './' + name
  with request.urlopen(req) as res:
    Path(file_path).write_bytes(res.read())
    return url, file_path

@elapsed_time
def get_sequential():
  for url in urls:
    print(download(url))

@elapsed_time
def get_multi_thread():
  with ThreadPoolExecutor(max_workers=1) as executor:
    futures = [executor.submit(download, url) for url in urls]
    print(futures)
    for future in as_completed(futures):
      print(future.result())

if __name__ == '__main__':
  get_sequential()
  get_multi_thread()

thread-unsafe 예시와 thread-safe 수정 예시

from concurrent.futures import ThreadPoolExecutor, wait

# thread-unsafe
class Counter:
  def __init__(self):
    self.count = 0
  def increment(self):
    self.count += 1

def count_up(counter):
  for _ in range(1_000_000):
    counter.increment()

if __name__ == "__main__":
  counter = Counter()
  thread = 2
  with ThreadPoolExecutor() as e:
    futures = [e.submit(count_up, counter) for _ in range(thread)]
    done, not_done = wait(futures)  # (*)

  print(f'{counter.count=:,}')  # 2,000,000이 표시되지 않음

"""(*)
wait(fs, timeout=None, return_when=ALL_COMPLETED)
futures를 받아 기다린다.
	- parameter timeout
		timeout(seconds)까지 완료된 것과 완료되지 않은 것을 
		tuple로 반환한다.
	- parameter return_when
		FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED 상수 설정
		모두 concurrent.futures에 존재하는 상수들
		default는 ALL_COMPLETED
"""
import threading
from concurrent.futures import ThreadPoolExecutor, wait

# thread-unsafe
class ThreadSafeCounter:
  lock = threading.Lock()  # 
  def __init__(self):
    self.count = 0
  def increment(self):
    with self.lock:
      self.count += 1

def count_up(counter):
  for _ in range(1_000_000):
    counter.increment()

if __name__ == "__main__":
  counter = ThreadSafeCounter()
  thread = 2
  with ThreadPoolExecutor() as e:
    futures = [e.submit(count_up, counter) for _ in range(thread)]
    done, not_done = wait(futures)  # (*)

  print(f'{counter.count=:,}')  # 2,000,000

피보나치수열 - Sequential, 다중스레드, 다중프로세스 비교

"""
피보나치 수열 - Sequential
"""

import sys
import os
import time

def elapsed_time(f):
  def wrapper(*args, **kwargs):
    st = time.time()
    v = f(*args, **kwargs)
    print(f"{f.__name__}: {time.time() - st}")
    return v
  return wrapper

def fibonacci(n):
  a, b = 0, 1
  for _ in range(n):
    a, b = b, b + a
  else:
    return a

@elapsed_time
def get_sequential(nums):
  for num in nums:
    _ = fibonacci(num)

def main():
  n = 1_000_000
  nums = [n] * os.cpu_count()
  get_sequential(nums)

if __name__ == '__main__':
  main()  # 168초(cpu 개수마다 다를 값)
"""
피보나치 수열 - multi-process
"""

import os
import sys
import time

from concurrent.futures import ProcessPoolExecutor, as_completed

def elapsed_time(f):
  def wrapper(*args, **kwargs):
    st = time.time()
    v = f(*args, **kwargs)
    print(f"{f.__name__}: {time.time() - st}")
    return v
  return wrapper

def fibonacci(n):
  a, b = 0, 1
  for _ in range(n):
    a, b = b, b + a
  else:
    return a

@elapsed_time
def get_multi_process(nums):
  with ProcessPoolExecutor() as e:
    futures = [e.submit(fibonacci, num) for num in nums]
    for future in as_completed(futures):
      _ = future.result()

def main():
  n = 1_000_000
  nums = [n] * os.cpu_count()
  get_multi_process(nums)

if __name__ == '__main__':
  main()  # 약 14초
"""
피보나치 수열 - multi-thread
"""

import os
import sys
import time

from concurrent.futures import ThreadPoolExecutor, as_completed

def elapsed_time(f):
  def wrapper(*args, **kwargs):
    st = time.time()
    v = f(*args, **kwargs)
    print(f"{f.__name__}: {time.time() - st}")
    return v
  return wrapper

def fibonacci(n):
  a, b = 0, 1
  for _ in range(n):
    a, b = b, b + a
  else:
    return a

@elapsed_time
def get_multi_thread(nums):
  with ThreadPoolExecutor() as e:
    futures = [e.submit(fibonacci, num) for num in nums]
    for future in as_completed(futures):
      _ = future.result()

def main():
  n = 1_000_000
  nums = [n] * os.cpu_count()
  get_multi_thread(nums)

if __name__ == '__main__':
  main()

unpickable callable을 다중프로세스에 사용시 에러와 해결

from concurrent.futures import ProcessPoolExecutor, wait

func = lambda: 1
# def func():
#   return 1

def main():
  with ProcessPoolExecutor() as e:
    future = e.submit(func)
    done, not_done = wait([future])
  print(future.result())  # (*) 

if __name__ == "__main__":
  main()

"""
(*) 여기서 error raised, multiprocessing.Queue에서 반환값을 가져올 때 
pickle dump를 사용하는데 lambda가 pickle가능하지 않아 에러 발생
"""
from concurrent.futures import ThreadPoolExecutor, wait

func = lambda: 1
# def func():
#   return 1

def main():
  with ThreadPoolExecutor() as e:
    future = e.submit(func)
    done, not_done = wait([future])
  print(future.result())  # (*) 

if __name__ == "__main__":
  main()

"""
ThreadPoolExecutor는 에러 발생하지 않음
"""
from concurrent.futures import ProcessPoolExecutor, wait

def func():
  return 1

def main():
  with ProcessPoolExecutor() as e:
    future = e.submit(func)
    done, not_done = wait([future])
  print(future.result())  # (*) 

if __name__ == "__main__":
  main()

"""
일반함수는 ProcessPoolExecutor여도 pickle가능하므로 error not raised
"""

다중프로세스에서 fork 방식의 난수 생성 문제와 해결

# np_random_multiprocess.py

from concurrent.futures import ProcessPoolExecutor, as_completed

import numpy as np

def use_numpy_random():
  return np.random.random()

def main():
  with ProcessPoolExecutor() as e:
    futures = [e.submit(use_numpy_random) for _ in range(3)]
    for future in as_completed(futures):
      print(future.result())

if __name__ == "__main__":
  main()

"""
해당코드를 WINDOWS, MAC에서는 문제 없음
다만 UNIX 환경에서 실행하면 같은 값이 중복해서 나온다.
이는 UNIX에서는 자식 프로세스 만드는 방식이 부모 프로세스를 복제하는 fork방식이 기본값
"""
# 해결방법 1 np.random.seed() 추가

from concurrent.futures import ProcessPoolExecutor, as_completed

import numpy as np

def use_numpy_random():
	np.random.seed()  # (*)
  return np.random.random()

def main():
  with ProcessPoolExecutor() as e:
    futures = [e.submit(use_numpy_random) for _ in range(3)]
    for future in as_completed(futures):
      print(future.result())

if __name__ == "__main__":
  main()

"""
(*) np.random.seed()를 통해 난수 생성기를 초기화해서 해결
"""
# 해결방법 2 np.random말고 빌트인 random 사용

from concurrent.futures import ProcessPoolExecutor, as_completed

import numpy as np

def use_random():  # (*)
  return random.random()

def main():
  with ProcessPoolExecutor() as e:
    futures = [e.submit(use_random) for _ in range(3)]
    for future in as_completed(futures):
      print(future.result())

if __name__ == "__main__":
  main()

"""
(*) built-in random의 경우 fork할 떄 자동으로 난수생성기를 초기화함
"""

asyncio의 helloworld

import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(10)
    print('... World!')

# Python 3.7+
asyncio.run(main())  # Hello ...  출력 후 10초 후에 ... World!가 출력

asyncio.gather 예제

import asyncio
import random

async def call_web_api(url):
  # Web API 처리를 sleep으로 대체
  print(f'send a request: {url}')
  await asyncio.sleep(random.random())
  print(f'got a response: {url}')
  return url

async def async_download(url):
  # await를 사용해 코루틴을 호출
  response = await call_web_api(url)
  return response

async def main():
  task = asyncio.gather(
    async_download('<https://twitter.com/>'),
    async_download('<https://facebook.com/>'),
    async_download('<https://instagram.com/>'),
  )
  return await task

result = asyncio.run(main())

asyncio.create_task를 통해 코루틴 함수 내부도 동시적으로 실행하기

# create_task를 쓰지 않은 예
# 6초 걸려 끝남
import asyncio

async def coro(n):
  await asyncio.sleep(n)
  return n

async def main():
  print(await coro(3))
  print(await coro(2))
  print(await coro(1))

asyncio.run(main())
# create_task를 사용한 예
# 3초만에 끝남
import asyncio

async def coro(n):
  await asyncio.sleep(n)
  return n

async def main():
  task1 = asyncio.create_task(coro(3))
  task2 = asyncio.create_task(coro(2))
  task3 = asyncio.create_task(coro(1))  
  print(await task1)
  print(await task2)
  print(await task3)
  

asyncio.run(main())

"""
3
2
1
"""

loop.run_in_executor를 통해 동기 I/O를 동시적으로 처리하기

import asyncio
from concurrent.futures import as_completed
from concurrent.futures import ThreadPoolExecutor
import time
from hashlib import md5
from pathlib import Path
from urllib import request

urls = [
  '<https://twitter.com>',
  '<https://facebook.com>',
  '<https://instagram.com>'
]

def download(url):
  print(f"DOWNLOAD START, url = {url}")
  req = request.Request(url)

  # 파일 이름에 / 등이 포함되지 않도록 함
  name = md5(url.encode('utf-8')).hexdigest()
  file_path = './' + name
  with request.urlopen(req) as res:
    Path(file_path).write_bytes(res.read())
    return url, file_path

async def main():
  loop = asyncio.get_running_loop()
  # 동기 I/O를 이용하는 download를 동시적으로 처리
  futures = []
  for url in urls:
    future = loop.run_in_executor(None, download, url)
    futures.append(future)

  for result in await asyncio.gather(*futures):
    print(result)

asyncio.run(main())

ForLoop도 task마다 돌게 만든 예제

import asyncio

async def counter(name: str):
    for i in range(0, 100):
        print(f"{name}: {i}")
        await asyncio.sleep(0)

async def main():
    tasks = []
    for n in range(0, 4):
        tasks.append(asyncio.create_task(counter(f"task{n}")))

    while True:
        tasks = [t for t in tasks if not t.done()]
        if len(tasks) == 0:
            return

        await tasks[0]

asyncio.run(main())

"""
CounterStart of task0
task0: 0
CounterStart of task1
task1: 0
CounterStart of task2
task2: 0
CounterStart of task3
task3: 0
task0: 1
task1: 1
task2: 1
task3: 1
task0: 2
task1: 2
task2: 2
task3: 2
task0: 3
task1: 3
task2: 3
task3: 3
task0: 4
task1: 4
task2: 4
task3: 4
task0: 5
task1: 5
task2: 5
task3: 5
task0: 6
task1: 6
task2: 6
task3: 6
task0: 7
task1: 7
task2: 7
task3: 7
task0: 8
task1: 8
task2: 8
task3: 8
task0: 9
task1: 9
task2: 9
task3: 9
"""

async with 예제

import asyncio
import sys

async def log(msg, l=10, f='.'):
  for i in range(l*2+1):
    if i == l:
      for c in msg:
        sys.stdout.write(c)
        sys.stdout.flush()
        await asyncio.sleep(0.05)
    else:
      sys.stdout.write(f)
      sys.stdout.flush()
    await asyncio.sleep(0.2)
  sys.stdout.write('\\n')
  sys.stdout.flush()

class AsyncCM:
  def __init__(self, i):
    self.i = i
  async def __aenter__(self):
    await log('Entering Context')
    return self
  async def __aexit__(self, *args):
    await log('Exiting Context')
    return self

async def main1():
  '''Test Async Context Manager'''
  async with AsyncCM(10) as c:
    for i in range(c.i):
      print(i)
## 실행

# loop = asyncio.get_event_loop()
# loop.run_until_complete(main1())
async def main():
  task = asyncio.gather(main1(), main1())
  return await task

asyncio.run(main())

"""
....................EEnntteerriinngg  CCoonntteexxtt....................
0
1
2
3
4
5
6
7
8
9
.
0
1
2
3
4
5
6
7
8
9
...................EExxiittiinngg  CCoonntteexxtt....................
"""
"""
__aenter__와 __aexit__가 각각 다른 task로 동작함을 알 수 있다.
"""

async for 예제

# 블로킹 이터레이터
class A:
	def __iter__(self):
		self.x = 0
		return self
	def __next__(self):
		if self.x > 2:
			raise StopIteration
		else:
			self.x += 1
			return self.x

for i in A():
	print(i)  

"""
1
2
3
"""

# 비동기(논블로킹) 이터레이터
import asyncio
from aioredis import create_redis

async def main():
	redis = await create_redis(('localhost', 6379))
	keys = ["Americas", "Africa", "Europe", "Asia"]
	async for value in OneAtATime(redis, keys):  # (1)
		await do_something_with(value)  # (2)

class OneAtATime:
	def __init__(self, redis, keys):
		self.redis = redis
		self.keys = keys
	def __aiter__(self):
		self.ikeys = iter(self.keys)
		return self
	async def __anext__(self):
		try:
			k = next(self.ikeys)
		except StopIteration:
			raise StopAsyncIteration  # (3)
		value = await self.redis.get(k)  # (4)
		return value

asyncio.run(main())

"""
- def __aiter__를 구현해야한다.(not async def)
- __aiter__()는 async def __anext__()를 구현한 객체를 반환해야한다.
- __anext__()는 반복의 각 단계에 대한 값을 반환하고, 반복이 끝나면 StopAsyncIteraction을 발생시켜야 한다.

(1) async for를 사용한다. 중요한 점은 반복 중에 다음 데이터를 얻기 전까지 반복 자체를 일시 정지할 수 있다는 점이다.
(2) I/O 동작을 수행한다고 하자. 예를 들면 데이터를 변환하고 다른 데이터베이스에 전달하는 동작
(3) 일반적인 iteration이 끝나서 StopIteration을 발생시키고 그것을 StopAsyncIteration으로 변환시키는 방법
(4) redis에서 값을 가져올 때도 await를 줘서 이벤트 루프가 다른 작업을 할 수 있게 했다.
"""

# 비동기(논블로킹) 제너레이터로 바꾼 예제
import asyncio
from aioredis import create_redis

async def main():
	redis = await create_redis(('localhost', 6379))
	keys = ["Americas", "Africa", "Europe", "Asia"]
	async for value in one_at_a_time(redis, keys):
		await do_something_with(value)

async def one_at_a_time(redis, keys):  # (1)
	for k in keys:
		value = await redis.get(k)
		yield value  # (2)

asyncio.run(main())

"""
- 코루틴과 제너레이터는 완전 다른 개념이다.
- 비동기 제너레이터는 일반 제너레이터와 유사하게 작동한다.
- 반복 수행 시, for 대신 async for를 사용한다.
(1) 비동기 제너레이터는 async def로 정의한다.
(2) 비동기 제너레이터는 제너레이터처럼 yield를 쓴다.

"""

 

contextlib 사용한 async with 예제

# 먼저 일반적인 블로킹 방식
from contextlib import contextmanager

@contextmanager
def web_page(url):
	data = download_webpage(url)  # (1)
	yield data
	update_stats(url)  # (2)

with web_page('google.com') as data:
	process(data)

"""
contextmanager 데커레이터는 제너레이터함수를 콘텍스트 관리자로 변환한다.
yield한 것이 as data의 data로 들어간다.
with문이 끝나면 update_stats(url)이 실행된다.

(1) 위 블로킹 방식에서는 (1)을 수행하는 동안 프로그램이 중지 된다.
download_webpage(url)이 coro가 되게 수정을 하든가, 아래 executor 방식을 사용한다.
coro가 되게 수정하면 베스트겠지만, third-pary library인 경우 수정이 쉽지 않다.

(2) URL을 통해 전달받은 데이터를 처리할 때마다 다운로드 횟수와 같은 통계를 갱신하는 상황을 가정한 것이다. 만약 이 함수가 데이터베이스를 갱신하는 것과 같은 I/O 동작을 내부적으로 포함하고 있다면 마찬가지로 블로킹 호출이 되므로, coro가 되게 하든 executor를 활용한다.
"""

# 논블로킹(단, download_webpage, update_stats이 coroutine function이어야함)
from contextlib import asynccontextmanager

@asynccontextmanager
async def web_page(url):
	data = await download_webpage(url)
	yield data
	await update_stats(url)

async with web_page('google.com') as data:
	process(data)

"""
download_webpage와 update_stats를 coroutine function으로 수정했을 때의 예제이다.
@asynccontextmanager를 사용하려면 async def로 정의해야한다.
yield가 있으니 제너레이터 함수인데, async def까지 사용했으니, 이 함수를 호출하면 비동기 제너레이터 객체를 반환한다.
비동기 제너레이터 함수/객체임을 확인하는 방법은 
inspect 모듈의 isasyncgenfunction()/isasyncgen()가 있다.
asynccontextmanager를 활용하려면 async with로 시작해야한다.
"""

# executor를 활용한 논블로킹(download_webpage, update_stats를 coro로 바꾸기 힘들 때)
from contextlib import asynccontextmanager

@asynccontextmanager
async def web_page(url):
	loop = asyncio.get_event_loop()
	data = await loop.run_in_executor(None, download_webpage, url)
	yield data
	await loop.run_in_executor(None, update_stats, url)

async with web_page('google.com') as data:
	process(data)

"""
별도의 스레드에서 executor로 블로킹 호출함수를 넘겨 논블로킹을 구현한 방식

"""

728x90

+ Recent posts