Barrier를 사용한 동기화

Barrier는 동시성 프로그래밍에서 사용되는 동기화 수단 중 하나로 여러 스레드를 특정한 시점까지 기다린 후 한꺼번에 재개하는 방법이다. 비슷한  방식의 동기화 프리미티브로 이벤트(Event)가 있는데, 이벤트는 재개 시점을 판단하는 제 3의 스레드가 재개를 위한 시그널을 set해주어야 한다. 배리어는 그와 달리 미리 정해진 개수만큼의 스레드가 모이면 자동으로 해제된다.

따라서 배리어는 여러 워커 스레드로 데이터를 분산시켜 병렬로 처리하고 최종적인 작업 완료 이전에 모든 워커 스레드가 작업을 완료했는지를 기다리는 용도로 사용할 수 있다.

간단한 예를 생각해보자.  예를 들어 주어진 URL의 데이터를 저장하는 함수를 여러 워커 스레드에서 한 번에 실행하려고 한다. 보통은 이런 예제를 보면 스레드의 리스트를 만든 후 각각의 스레드에 대해 모두 start()를 호출한 후 다시 루프를 돌면서 각각의 스레드를 join()하는 식으로 구성해서 모든 워커가 동작을 마무리 하기를 기다린다.

하지만 배리어를 쓰는 것도 나쁘지 않은 방법이다. 전역 범위에 공통으로 사용할 배리어 객체를 두거나, 워커 스레드가 실행하는 함수가 배리어에 대한 참조를 인자로 받아서 사용하게 하는 것도 좋은 방법이다.

from threading import Thread, Barrier
from urllib.request import urlopen

NO_THREADS = 4
bar = Barrier(NO_THREADS+1)


def load(url: str):
    res = urlopen(url)
    if res.code != 200:
        bar.wait()
        return
    data = res.read()
    fname = url.rsplit('/', 1)[1]
    with open(fname, 'wb') as f:
        f.write(data)
    bar.wait()


def main():
    urls = ( ... # 웹 상의 이미지 경로 4 개 ... )
    ts = [Thread(target=load, args=(u,)) for u in urls]
    for t in ts:
        t.start()
    bar.wait()


if __name__ == '__main__':
    main()

참고로 threading이 아닌 multiprocessing을 사용하여 프로세스로 대체하는 경우에는 배리어나 락과 같은 동기화 수단을 전역 공간에 두는 방식으로 코드를 작성하면 안된다. 이 경우에는 앞서 언급한 후자의 케이스처럼, 각 워커의 target 함수가 메인 함수로부터 인자로 전달받아서 사용해야 한다.