Python의 Thread 사용 예제

1. 단순 연산 프로그램

numbers 리스트 안에 있는 숫자들을 인수분해 하는 프로그램을 작성해보자.

from time import time

def factorize(number):
    for i in range(1, number):
        if number % i == 0:
            yield i

def main():
    numbers = [2139079, 1214759, 1416637, 1839485]
    start = time()
    
    # 순서대로 계산
    for number in numbers:
        list(factorize(number))

    print(time()-start)

if __name__ == "__main__":
    main()
from time import time
from threading import Thread

class FactorizedThread(Thread):
    def __init__(self, number):
        Thread.__init__(self)
        self.number = number

    def run(self):
        self.factors = list(factorize(self.number))

def factorize(number):
    for i in range(1, number):
        if number % i == 0:
            yield i 

def main():
    numbers = [2139079, 1214759, 1416637, 1839485]
    start = time()
    
    # numbers 갯수만큼 theread 만들어서 실행
    threads = []
    for number in numbers:
        thread = FactorizedThread(number)
        thread.start()
        threads.append(thread)

    # 스레드 완료 기다림 
    for thread in threads :
        thread.join()

    print(time()-start)

if __name__ == "__main__":
    main()



2. Blocking I/O 에 사용

여기서는 유닉스 select() 시스템 콜을 사용해서 Blocking I/O 를 구현해본다.

do_systemcall이 실행되는 동안에는 제어권이 넘어가 있기 때문에 프로그램이 다른 일을 할 수 없다. 그러나 우리는 Blocking I/O도 사용하면서 동시에 연산도 하고싶다면, 시스템 호출 작업을 스레드로 옮기는 방법을 활용해야 한다.

import select 
from time import time

def do_systemcall():
    # 운영체제에 1초간 block 한 후 제어를 프로그램에 돌려달라고 요청
    select.select([], [], [], 1)

def compute(index): 
    index += 1
    print(index)

def main():
    # 이 시스템콜을 연속으로 실행한다면?
    start = time()
    for _ in range(5):
        do_systemcall()

    for i in range(5):
        compute(i)
    
    print(time()-start)

if __name__ == "__main__":
    main()

아래 코드는 do_systemcall 함수를 별도의 스레드를 두고 여러번 호출한다. 이때 파이썬 스레드가 시스템 콜을 만들기 전에 GIL 을 풀고, 시스템 콜이 끝나는 대로 GIL을 다시 얻기 때문에 빠른 속도로 처리할 수 있다.

import select 
from time import time
from threading import Thread

class SytemcallThread(Thread):
    def __init__(self):
        Thread.__init__(self)
    def run(self):
        do_systemcall()

def do_systemcall():
    select.select([],[],[],1)

def compute(index): 
    index += 1
    print(index)

def main():
    start = time()
    threads = []
    for _ in range(5):
        thread = SytemcallThread()
        thread.start()
        threads.append(thread)

    for i in range(5):
        compute(i)

    for thread in threads:
        thread.join()

    print(time()-start)

if __name__ == "__main__":
    main()



3. http request

import requests
from time import time

URL = 'https://httpbin.org/uuid'

def fetch(session, url):
    with session.get(url) as response:
        print(response.json()['uuid'])

if __name__ == "__main__":
    start = time()
    with requests.Session() as session:
        for _ in range(50):
            fetch(session, URL)

    print(time()-start)
import requests
from time import time
from concurrent.futures import ThreadPoolExecutor

URL = 'https://httpbin.org/uuid'
def fetch(session, url):
   with session.get(url) as response:
       print(response.json()['uuid'])

if __name__ == "__main__":
    start = time()
    with ThreadPoolExecutor(max_workers=10) as executor:
        with requests.Session() as session:
            executor.map(fetch, [session] * 50, [URL] * 50)
            executor.shutdown(wait=True) 

    print(time()-start)

이 버전에는 ThreadPoolExecutor를 사용한다. ThreadPoolExecutor를 살펴보면 ThreadPoolExecutor = Thread + Pool + Executor로 분리해서 볼 수 있다.

ThreadPoolExecutor 객체는 Thread의 Pool을 만들고, 각각의 Thread를 동시에 실행한다. 마지막으로 Executor는 Pool에 있는 각각의 Thread가 언제, 어떻게 실행하는지 제어하는 부분이다. 즉, Executor는 Pool에서 요청을 실행한다.



참고링크

참고1

synchronous VS multiprocessing VS multithreading VS asyncio참고2

Why is a Python I/O bound task not blocked by the GIL?