컨디션을 통한 스레드 동기화 예제

동시성을 다룰 때에는 특정한 자원을 동시에 액세스하지 못하도록 관리하거나 여러 작업들이 시작되는 시점을 맞추는 동기화 수단이 필요할 수 있다. Lock은 특정 코드 영역을 동시에 여러 스레드가 실행하지 못하도록 보호할 때 사용하며, 이벤트는 여러 스레드들이 특정 이벤트가 발생할 때까지 기다리다가 동시에 시작될 수 있도록 한다. 컨디션(Condition)은 락과 이벤트가 결합되어 있는 동기화 수단이다.

컨디션은 락을 내재하고 있는 이벤트라 할 수 있다. 락과 마찬가지로 acquire() ~ release() 구간이 있어 한 번에 하나의 스레드/프로세스가 실행되는 영역을 만들 수 있는데, 그 사이에 wait()를 통해서 이벤트를 기다릴 수 있다. 이때 한 스레드가 락을 잠근 상태에서 wait()를 호출하여 이벤트를 기다리게 되면, 같은 컨디션 객체를 점유하고자 하는 스레드가 다시 락을 얻어서 크리티컬 영역에 진입할 수 있다. 이와 같은 방식으로 여러 스레드가 크리티컬 영역에서 이벤트를 기다리는 상태가 될 때, 누군가가 해당 컨디션 이벤트를 set()하게 되면 대기 중인 모든 스레드가 깨어나게 된다. 하지만 이들은 모두 같은 크리티컬 영역에서 대기 중이었기 때문에 일반 이벤트와 달리 한꺼번에 동시에 시작하지 않고, 한 번에 하나씩 크리티컬 영역의 코드를 실행한다. 깨어난 스레드가 락을 릴리즈하는 시점에 wait()를 끝낸 다른 스레드가 실행되는 식으로 순차적으로 크리티컬 구간을 지나게 된다.

컨디션을 통한 스레드 동기화 예제 더보기

스레드를 이용한 데몬 만들기 – Python

이 블로그를 통해서 파이썬에서의 병렬처리에 대해서는 명시적으로 threading.Thread 대신에 concurrent.futures 에서 제공하는 API를 사용할 것을 여러 차례 권장해 왔다. 여기서 주목할 것은 바로 “병렬처리”라는 조건이다. 즉 concurrent.futures의 API는 일련의 데이터에 대해서 동일한 처리를 하려할 때, 이 “동일한 처리”를 여러 스레드 혹은 프로세스로 나눠서 동시에 진행하는 상황에 어울리는 기능이다. 하지만 실제 상황에서는 동시에 서로 다른 작업이 진행되어야 하는 경우가 존재한다.주로 메인 스레드와 백그라운드 스레드 (혹은 작업 스레드)에서 하는 일이 서로 다른 경우에 이러한 패턴이 필요할 수 있다.

이번 글에서는 파이썬의 threading 모듈을 사용해서 멀티스레드 프로그램을 어떻게 쉽게 작성하는지에 대해서 예제와 같이 살펴보도록 하겠다.

먼저 어떤 파일을 하나 열어서 한줄씩 읽는 프로그램을 생각해보자. 보통 파일을 열고 file.readline()을 사용했을 때, 파일의 마지막 라인을 읽은 후라면 None이 리턴되며 더 이상 내용을 읽지 않는다. 하지만 우리 컴퓨터에는 로그 파일과 같이 지속적으로 뭔가 내용이 계속 늘어나는 파일들이 있다. 따라서 파일을 열어 놓고 특정 시간 주기마다 파일로부터 한줄씩 읽어서 그 내용을 처리하는 프로그램을 작성한다고 생각해보자.

import time

def process_line(l):
  print(l)

with open('message.log') as f:
  while True:
    line = f.readline()
    if not line:
      time.sleep(0.5)
      continue
    process_line(line)

위 코드는 이렇게 동작한다.

  1. 파일 하나를 읽기 모드로 열었다.
  2. 한 줄씩 읽어서 process_line() 함수로 보내어 처리한다.
  3. 만약 마지막 줄을 읽은 후 더 읽을 내용이 없다면 0.5초 후에 재시도한다.

이 프로그램은 분명 무한루프를 돌게되지만, CPU 사용량이 치솟거나 하지는 않는다. 0.5초의 대기시간은 CPU 입장에서 봤을 때에는 무척이나 길고 지루한 시간이다. (쿨타임을 0.1초로 잡아도 마찬가지다.)

다음과 같이 빈 파일을 만든 후, 파이썬 코드를 백그라운드에서 실행해놓자. 이 상태에서 파일에 메시지를 추가하면 곧 추가된 라인들이 출력되는 것을 알 수 있다. (백그라운드 실행은 윈도 명령줄에서는 지원되지 않는다. 윈도에서 따라해본다면, 터미널 창을 2개 열어서 한쪽은 파이썬을 실행하고 다른 한쪽은 echo 명령으로 파일에 라인을 추가해보면 된다.)

$ touch message.log
$ python sample.py&
$ echo "hello" >> message.log

자 이 프로그램의 처리흐름을 다시 살펴보도록하자.  먼저 프로그램은 기본적으로 지정된 파일을 열어서 한 줄씩 읽고 그 내용을, 내용 처리를 담당하는 별도의 함수 (여기서는 데이터를 다루는 역할을 하고 있으니 ‘핸들러’라고 하자)로 넘겨준다. 핸들러 함수의 동작이 끝나면 다시 그 다음 줄을 읽는 식으로 루프를 도는데, 더 이상 읽어들일 내용이 없으면 지정된 시간(0.5초)만큼 기다린 후에 재시도 한다.

이 때 핸들러 함수의 실행에 필요한 시간이 매우 짧다면 문제가 되지 않겠지만 항상 그러하지는 못할 것이다. 하지만 텍스트 파일의 내용이 특정한 URL이고 핸들러가 해당 URL의 데이터를 다운로드 받는다면 어떨까? 위 예제의 구조에서는 핸들러는 블럭킹 함수로 동작하므로 현재 데이터를 처리하는 동안에는 추가된 데이터가 계속 남아있으리라고는 보장할 수 없다. 왜냐하면 해당 파일은 계속 내용이 추가되기 보다는 별도의 에디터에서 추가되었던 내용이 삭제처리 될 수도 있기 때문이다. 다음 코드와 그 실행 시나리오를 생각해보자.

from urllib.request import rulopen
import time

def handler(url, filename):
  with urlopen(url) as res:
    with open(filename, 'wb') as f:
      f.write(urlopen(url))

with open('inputs.txt') as f:
  while True:
    line = f.readline()
    if not line:
       time.sleep(0.1)
       continue
    handler(*(line.split()[:2]))

위에서 소개한 코드와 완전히 동일한데, 한가지 차이는 핸들러가 URL요청과 디스크IO를 수행하는 제법 시간이 오래걸릴 수 있는 작업이라는 점에서 차이가 있다. inputs.txt의 파일의 내용이 실시간으로 편집되고 있고, 네트워크 사정이 그리 좋지 못하다면 다음의 시나리오가 가능해질 수 있다.

  1. 파일에 데이터 A, B, C가 추가됨
  2. 프로그램은 데이터 A를 처리함 (그리고 처리되는 동안 블럭됨)
  3. 그 사이에 외부 프로그램에 의해서 데이터 B, C가 제거됨
  4. A를 처리하고 난 후 파일을 읽으려 시도하면 더 이상 남은 데이터가 없거나 세로운 데이터 D, E 등을 처리하게 됨

이 시나리오에서는 핸들러가 블럭킹으로 동작하는 동안 이후에 추가된 데이터를 확보하지 못한다는 문제가 있다. 이를 해결하기 위해서는 핸들러가 논블럭킹(non-blocking)으로 동작해야 할 필요가 있다. 즉 핸들러 함수를 메인 스레드가 아닌 별개의 스레드에서 실행하게하면 핸들러를 호출하는 구문은 즉시 다름 라인으로 진행이 가능해진다.

threading 모듈

threading 모듈은 파이썬에서 스레드를 사용할 수 있게 해주는 API를 담고 있다. 스레드를 위한 클래스인 Thread와 여러 가지 동기화 도구인 Lock, RLock, Condition, Semaphore 등이 지원된다. 스레드를 사용하는 방법에 대해서 오랜된 문서나 책에서는 Thread 클래스를 오버라이드하여 사용하는 방법을 소개하고 있는데, 스레드를 통한 분기가 함수의 실행단위로 쓰여진다면 굳이 번거롭게 오버라이드를 할 필요는 없다.

Thread 객체를 생성할 때 target= 이라는 인자가 있다. 이 인자로 백그라운드 스레드에서 실행할 함수를 넘겨주면 해당 함수가 별도의 스레드에서 실행될 수 있다. 위의 ‘파일 처리’ 코드를 스레드를 사용하는 방식으로 수정해보면 다음과 같이 작성될 수 있다. 시나리오를 잠시 수정해보자면, 파일에는 한 줄에 공백으로 구분되는 URL과 파일명이 들어온다. 우리의 프로그램은 각 라인을 읽어서 주어진 URL의 데이터를 주어진 파일 이름으로 저장하는 동작을 처리할 것이다.

from threading import Thread
from urllib.request import urlopen
import time

## 1
def hanlder(url, filename):
  with urlopen(url) as res:
    with open(filename, 'wb') as f:
      f.write(res.read())

## 2
with open('inputs.txt') as f:
  f.seek(2) ## 3
  while True:
    line = f.readline()
    if not line: ## 4
      time.sleep(0.1)
      continue
    data = line.split()[:2] 
    t = Thread(target=handler, args=data) ## 5
    t.start() ## 6

그다지 복잡한 코드는 아니다. 코드는 다음과 같이 설명된다.

  1. handler 함수는 URL과 파일이름을 인자로 받아서 주어진 URL의 콘텐츠를 다운로드 받아 해당 파일 이름으로 저장한다.
  2. 파일의 내용을 조사하는 루프를 시작한다.
  3. 현재 시점까지의 파일의 내용은 무시해야 할 것이므로 seek(2)를 호출해서 파일 스트림의 끝으로 커서를 옮긴다.
  4. 새로운 라인이 없으면 0.1초를 대기한다.
  5. 새로운 스레드에서 핸들러 함수를 실행하도록 설정한다.
  6. 만들어진 스레드를 실행한다.

과도한 스레드 생성을 피하는 법

위 코드는 다시 새로운 문제를 야기한다. 스레드를 만든 후 시작하는 t.start() 부분은 논블럭킹이기 때문에 호출하게 되면 새로운 스레드에서 네트워크 요청을 생성하는 일련의 작업을 처리하는데, 이 작업의 완료여부와 별개로 스레드가 시작되면 t.start()는 그 즉시 리턴된다. 따라서 inputs.txt 파일에 새로운 입력이 수십~수백개가 쏟아져 들어갔다면 이 프로그램은 그만큼 많은 스레드를 생성하면서 전체적으로 매우 느려지게 될 것이다.

이 상황에서 어쩌면 누군가는 세마포어같은 동기화 도구를 생각할지도 모르겠다. 혹은 스레드 풀을 구현하여 한 번에 동작하는 스레드의 수를 제한할 수도 있을 것이다. 하지만 이때 가장 중요한 것은 ‘입력으로 들어온 데이터를 나중에 처리하더라도 잃지는 말자’는 것이다. 따라서 가장 간단히 처리하는 방법은 큐를 사용하는 것이다.

  1. 파일로부터 데이터를 입력받는 무한루프는 계속 파일을 체크하여, 추가로 입력된 데이터가 있으면 큐에 넣는다.
  2. 큐를 watching하는 무한루프는 큐에 들어온 데이터를 순서대로 꺼내어 하나씩 처리한다.

여기서 중요한 점은 두 개의 무한루프가 독립적으로 돌아간다는 것이다. 이를 위해서는 둘 중의 하나의 무한루프는 별도의 스레드에서 돌릴 필요가 있다. 그리고 두 스레드 사이의 데이터 교환은 동기화된 상태여야 한다. (한쪽에서 큐에서 데이터를 빼는 동시에 데이터를 집어넣어서는 곤란하다.) 스레드 사이의 동기화된 데이터 교환은 queue 모듈 내의 Queue를 사용하면 된다.

또한 별도 스레드에서의 동작이 무한루프가 되어야 하므로 해당 스레드는 ‘데몬’으로서 동작해야 한다. 데몬은 그 수명이 메인 스레드에 의존적인 스레드로, 그 자신이 무한루프를 돌고있다 하더라도 메인스레드가 종료되면 (즉 프로세스가 종료되면) 자동으로 종료되면서 리소스를 반납한다. 스레드를 데몬으로 만들고 싶다면 Thread 를 생성할 때 daemon=True 옵션을 지정하면 된다.

위 구조에서 어떤 것이 메인 스레드가 되고 어떤 것이 데몬이 될지는 본인의 판단에 달렸다. 여기서는 데이터 핸들링을 메인스레드에서 하고 데이터 마이닝을 데몬에서 수행하도록 구조를 만들어 보도록 하자.

아래는 최종 코드이며, 이 프로그램은 2개의 스레드를 써서 파일 내용을 감시하고, 추가 입력된 데이터를 처리한다.

from threading import Thread
from queue import Queue
from urllib.request import urlopen
from time import sleep

queue = Queue()

def daemon(filename):
  with open(filename) as f:
    f.seek(2)
    while True:
      line = f.readline()
      if not line:
        sleep(0.5)
        continue
      data = line.split()[:2]
      queue.put(data)

def mainLoop():
   while True:
     url, filename = queue.get()
     with urlopen(url) as res:
       with open(filenamem, 'wb') as f:
         f.write(res.read())

if __name__ == '__main__':
   t = Thread(target=daemon, args=('inputs.txt',), daemon=True)
   t.start()
   mainLoop()

정리

프로그램 내에서 두 개 이상의 무한루프가 독립적으로 동작하게 하려면 각각의 루프를 서로 다른 스레드에서 돌려야 한다. 이 때 프로세스 종료 시 작업 스레드들을 안전하게 종료하기 위해서는 이들 스레드는 생성 시 데몬으로 생성한다. 또한 보통 데몬을 사용하는 경우에는 한쪽 스레드에서 수집한 데이터를 다른 스레드(들)에게 넘겨주어야 할텐데, 이 때는 동기화된 큐를 쓰는 것이 권장되며 이 방법이 가장 간편하기도 하다.

더보기

threading 모듈을 통해서 멀티스레드를 사용하는 대표적인 예는 소켓통신의 다중접속 구현일 것이다. 소켓으로부터 접속을 받아들이는 코드는 보통 다음과 같은 모양이다.

sock = ... # 소켓 초기화 및 설정
sock.listen(host, port)
while True:
  conn, client = sock.accept() # 1
  ## 2
  recivedData = conn.recv(1024)
  ## ... 데이터를 처리하고 응답을 보내기

위 코드 중에서 sock.accept()에 의해서 리턴되는 값 중 첫번째 인자는 연결을 표현하는 소켓객체이다.  이 단일 루프 구조는 소켓이 연결을 기다리고, 연결을 받아들이면 해당 연결로부터 송수신을 처리하고 다시 루프의 시작으로 돌아가서 새로운 연결을 기다린다. 즉 한 번에 하나의 연결만 허용하는 구조가 된다. 하지만 이것은 소켓의 구조적인 한계라기보다는 그냥 코드가 선형으로 루프를 돌기 때문이다. 다음와 같이 처리한다면 어떨까?

while True:
  conn, client = sock.accept()
  t = Thread(target=handler, args=(conn, client), daemon=True)
  t.start()

위 코드에서는 서버 소켓이 연결을 받을 때마다 접속 소켓과 접속 정보를 핸들러에게 넘기고, 핸들러는 새로운 스레드에서 돌아가게 된다. 그리고 그 즉시 메인 루프는 다시 처음으로 돌아가서 새로운 연결을 기다릴 수 있다. 이 말은 방금 전에 접속한 소켓의 통신을 별도 스레드에서 처리하도록 하고 (소켓 통신을 주고/받기의 반복으로 돌아가는 일종의 무한 루프(최소한 끝이 정해지지 않은)라 볼 수 있으므로) 개별 접속 소켓의 처리를 데몬으로 만들어주는 것이다.

ZMQ 디바이스 사용하기

일반적인 소켓 연결의 경우, 주로 서버는 bind()를 통해서 포트에 연결하고 클라이언트는 connect()를 사용해서 포트에 연결한다. ZMQ에서는 이 방식이 절대적인 규칙이 아니다. 간단한 소켓 통신의 예에서 양 끝단 중 상대적으로 안정적인 쪽이 서버인 경우가 많기 때문에 bind() 하는 것이며 클라이언트는 서버보다는 동적이기 때문에 connect() 하는 경우가 많을 뿐이다. 간단한 피어 통신의 예제에서는 사실 양 끝단이 모두 ‘고정’되어 있고, ZMQ에서는 연결의 순서에 구애받지 않으므로 클라이언트가 bind()를 하고 서버가 connect()를 해도 문제 없다.

유념해야하는 원칙 한가지는 안정적인 쪽이 bind()를, 그렇지 않은 쪽이 connect()를 해야 한다는 점이다.

.

하나의 서버에 여러 개의 클라이언트가 붙어서 통신하는 경우를 생각해보자. 이 때 클라이언트는 서버보다 가변적이다. 따라서 서버가 바인드하는 쪽이되고 각각의 클라이언트가 해당 포트에 connect를 사용하여 접속하는 것이 적절한 선택이 될 수 있다. 하지만 만약 서버가 여러 개로 가변적인 상황이 된다면 어떨까? 즉 서버도 네트워크 그래프 상에서 가변적인 요소가 되었으므로 connect()를 해야 하는 상황이다. 이런 상황에서는 특정 클라이언트들을 특정한 서버로 할당하는 작업이 수행되어야 하고 서버의 수가 바뀔 때마다 코드를 수정해야하는 번거로움과 복잡함이 동반된다.

그렇다면 커뮤니케이션에서의 역할이 아닌 ‘유동성’에 초점을 맞추고 접근해보자. 서버와 클라이언트를 중계해줄 수 있는 중간 역할을 담당하는 어떤 고정된 포인트가 있어서 서버와 클라이언트가 모두 이 지점으로 접속한다면 가변적인 클라이언트와 가변적인 서버의 상황에 모두 적절하게 대응할 수 있을 것이다.

ZMQ 디바이스는 이런 상황에 사용하여 전체적인 네트워크의 구성을 단순화하는 좋은 수단이 된다. ZMQ 디바이스는 크게 세 종류가 기본적으로 지원된다.

  1. QUEUE : REQ-REP 패턴으로 연결되는 노드들를 중계하는 장치
  2. FORWARDER : PUB-SUB 패턴으로 연결되는 노드들를 중계하는 장치
  3. STREAMER : PUSH-PULL 패턴으로 연결되는 노드들을 중계하는 장치

그러한 경우에, 서버와 클라이언트를 모두 ZMQ 장치에 접속한다. 이 장치는 서로 다른 두 개의 포트를 가지고 있고, 한 포트에서 다른 포트로 메시지를 포워딩 해준다. 따라서 양단이 모두 가변적인 경우에 ZMQ 장치가 고정적인 포인트 역할을 하게 된다. 이러한 얼개는 아래 그림에 잘 나타나 있다. ZMQ 디바이스는 양쪽에 소켓을 두 개 가지고 있어서 한 쪽은 서버에 다른 한 쪽은 클라이언트에 연결된다.

.

ZMQ 디바이스는 zmq.device()라는 함수를 통해서 실행되며 실행되면 양쪽을 중계하는 서버처럼 작동하게 된다. 이 함수에는 중계 장치의 타입과 두 개의 ZMQ 소켓 객체가 전달되면 된다. QUEUE를 사용하는 예제를 한 번 살펴보자.

ZMQ Queue Device

큐는 REQREP 관계로 이루어지는 노드들 사이에 위치하면서 일종의 로드 밸런서처럼 작동한다. 디바이스는 그 스스로가 로드밸런서인 동시에 네트워크 그래프를 단순하게 만들어주는 구심점이 된다. 다만 주의할 것은 다른 ZMQ 장치들은 일반적인 PUSH, PULL 타입 소켓이나, PUB, SUB 소켓을 사용하지만, 큐의 경우에는 XREQ, XREP 소켓을 사용해야 한다는 점이다. 두 개의 소켓을 네트워크 포트에 바인드 한 후 장치를 시작한다. 그런 다음 REP 소켓들과 REQ 소켓들이 여기에 접속한다.

.

REP 소켓이 접속해야 하는 포트는 XREQ 소켓의 포트여야 하고, REQ 소켓이 접속하는 포트는 XREP 소켓의 포트여야 한다는 점만 주의하면 되겠다.

참고로 원문의 예제는 각각의 파일이 쪼개져 있었는데, 여기서는 하나의 파일에 합쳐서 작동하는 예를 만들어볼 것이다. 때문에 멀티 프로세스로 작동할 것이며, 이 환경에서 동시에 출력되는 로그끼리 얽히지 않게 하기 위해서 별도의 로그 모듈을 작성하여 사용할 것이다.

''' logger.py
    통합 로깅 모듈

    run_logger : 로깅 서버 실행
    get_logger : 로깅 클라이언트를 리턴하는 제너레이터 함수
'''

import zmq
import logging


def get_logger(ctx: zmq:Context, port=7779):
  sock = ctx.socket(zmq.PUSH)
  sock.connect(f"tcp://localhost:{port}")
  def wrapped():
    while True:
      mes = yield
      sock.send_string(mes)
  result = wrapped()
  next(result)
  return result


def run_logger(port=7779):
  logging.basicConfig(level=logging.DEBUG)
  ctx = zmq.Context()
  sock = ctx.socket(zmq.PULL)
  sock.bind(f"tcp://*:{port}")
  while True:
    message = sock.recv_string()
    logging.debug(message)

ZMQ 디바이스 생성

QUEUE 디바이스를 생성해보자. 클라이언트들은 REQ 소켓을 사용할 것이며, 서버들은 REP 소켓을 사용할 것이다. 따라서 우리의 디바이스는 XREQ, XREP 소켓을 사용하여 구성하게 된다. 코드 자체는 무척 간단하다. 두 개의 소켓을 각각 미리 정해둔 포트에 바인딩하고 zmq.device()를 호출해서 중계 장치를 기동한다.

import zmq
from logger import get_logger, run_logger
# ...

def dev_main():
  try:
    ctx = zmq.Context()
    log = get_logger(ctx)
    # socket facing clients
    fsock = zmq.socket(zmq.XREP)
    fsock.bind("tcp://*:5559")
    # socket facing servers
    bsock = zmq.socket(zmq.XREQ)
    bsock.bind("tcp://*:5560")
    zmq.device(zmq.QUEUE, fsock, bsock)
  except Exception as e:
    log.send("{e} - Bringing down the ZMQ Device...")
  finally:
    fsock.close()
    bsock.close()
    ctx.term()

서버 만들기

QUEUE 패턴의 서버는 REP 소켓을 가진 간단한 서버의 형식을 그대로 따른다. XREQ 소켓이 바인딩된 5560 포트로 접속하게 한다.

import time
import random


def srv_main(srv_id=0):
  port = 5560
  srv_id = random.randrange(1, 100)
  ctx = zmq.Context()
  log = get_logger(ctx)
  sock = ctx.socket(zmq.REP)
  sock.connect(f"tcp://localhost:{port}")
  while True:
    mes = sock.recv_string()
    log.send(f"Server {srv_id} received request: {mes}")
    time.sleep(1)
    sock.send_string(f"world from server {srv_id}")

클라이언트 만들기

클라이언트 역시 간단한 REQ 클라이언트의 코드를 그대로 따른다. 로그를 출력하는 것을 print() 함수가 아닌 로깅 서버를 사용하는 것만 다르다.

def clnt_main(clnt_id=0):
  port = 5559
  ctx = zmq.Context()
  log = get_logger(ctx)
  sock = ctx.socket(zmq.REQ)
  sock.connect(f"tcp://localhost:{port}")
  for req in range(10):
    log.send(f"Client{clnt_id} - Sending request {req + 1}...")
    sock.send_string(f"Hello from {clnt_id}")
    reply = sock.recv_string()
    log.send(f"Client {clnt_id} - Received reply [{reply}]")

Put Together

로깅서버, 큐 장치와 서버, 클라이언트 프로세스들을 멀티 프로세스를 통해 구동한다. 클라이언트 프로세스들의 작업 종료시점까지 기다린 후, 서버 및 그외 프로세스들을 kill하면 모든 과정이 종료된다. 참고로 각각의 프로세스는 모두 거의 동시에 시작 요청을 받으며, 경우에 따라서는 클라이언트들이 서버보다 먼저 시작될 수 있기 때문에 서버 시작 후 약간의 딜레이를 주는 것이 좋다.

def main():
  # setup zmq device and logger
  p_log = Process(target=run_logger)
  p_log.start()
  p_dev = Process(target=dev_main)
  p_dev.start()
  # create and run server processes
  srvs = [Process(target=srv_main, args=(i + 1,)) for i in range(2)]
  for s in srvs:
    s.start()
  time.sleep(2)
  # create and run client processes
  clnts = [Process(target=clnt_main, args=(i + 1,)) for i in range(5)]
  for c in clnts:
    c.start()
  for c in clnts:
    c.join()
  # All jobs are done
  for s in srvs:
    s.kill()
  p_dev.kill()
  p_log.kill()


if __name__ == "__main__":
  main()

이제 파일을 실행하면 서버와 클라이언트들이 제각각 작동하면서 메시지들을 출력할 것이다.

Forwarder / Streamer

Forwarder는 앞서 언급한 것과 같이 PUB-SUB 기반의 중계 장치이고, Streamer는 PUSH-PULL 기반의 중계장치이다. Queue가 XREQ, XREP와 같은 별도 타입의 소켓을 사용한 것과 달리, Forwarder, Streamer는 동일한 PUB,SUB,PUSH, PULL 소켓을 그대로 사용한다. 사용하는 방법 자체는 zmq.device() 를 호출하는 것이므로 자세한 코드를 소개하지는 않고 대략적인 구현 부분만 소개하겠다.

# forwarder

def run_forwarder(fport, bport):
  ctx = zmq.Context()
  fsock = ctx.socket(zmq.XPUB)
  bsock = ctx.socket(zmq.XSUB)
  # XSUB는 중계목적의 소켓이므로 setsockopt_string()을 호출할 필요가 없다. 
  fsock.bind(f"tcp://*:{fport}")
  bsock.bind(f"tcp://*:{bport}")
  zmq.device(zmq.FORWARDER, fsock, bsock)

# Streamer

def run_streamer(fport, bport):
  ctx = zmq.Context()
  fsock = ctx.socket(zmq.PUSH)
  bsock = ctx.socket(zmq.PULL)
  fsock.bind(f"tcp://*:{fport}")
  bsock.bind(f"tcp://*:{bport}")
  zmq.device(zmq.STREAMER, fsock, bsock)