카테고리 없음

kqueue in Unix AsynchronousSocketChannel

bknote71 2024. 11. 7. 17:03

비동기 소켓 I/O에서 사용되는 AsynchrounousServerSocketChannelI는 어떤 매커니즘으로 I/O를 비동기적으로 처리하는 것일까?

 

세운 가설

  • 내부에 I/O와 콜백을 처리하는 스레드 풀이 존재한다.
  • 유저가 비동기 함수를 호출했을 때 대신 요청하여 블로킹되고 즉시 반환된다.
  • 요청이 완료되면 비동기 콜백을 호출한다.

실제로는 가설 거의 비슷하였지만 다른 부분이 내부적으로 kqueue 논블로킹 I/O 통해서 요청을 처리하는 것이었다.

 

kqueue

I/O 멀티플렉싱 (이벤트 기반 비동기 I/O) 방법 중 하나로, BSD 계열에서 사용되는 방식이다.

맥OS는 BSD 기반이기 때문에 BSD에 설치된 JDK가 kqueue를 사용하는 것인 듯 싶다.

 

아마 운영체제 별 설치된 JDK마다 구현 내부는 달라질듯 싶다. 

- linux는 epoll, 윈도우는 iocp?

 

C#에서도 비동기 I/O를 지원하는 데, 이때 Completed 옵션이 있는걸로 보아선 아마 (완료 이벤트를 처리할 수 있는) overlapped를 사용하는 iocp를 내부적으로 사용하는 것이 아닌가 싶다.

 

kqueue vs iocp

이벤트 관점: iocp는 I/O 완료 여부에 대한 이벤트를 통지하고, kqueue는 I/O 이용 가능 여부에 대한 이벤트를 통지한다.

동시성 제어 관점: 중요한 차이점 중 하나는 iocp는 커널이 이벤트를 처리할 스레드를 선택하여, 동시성 제어를 할 필요가 없는 것에 비해 kqueue는 내부적으로 동시성 제어를 하지 않는다는 것이다.

kqueue를 사용하는 유저 애플리케이션 단에서 동시성 제어를 하지 않는다면, 여러 스레드에서 동일한 이벤트를 처리하게 되는 문제가 발생할 수 있다.

 

그렇다면 kqueue를 사용하는 UnixAsynchronousSocketChannelImpl에서는 어떻게 동시성을 제어할까?

그 방법은 BlockingQueue를 사용하는 것이다.

 

I/O를 처리하는 스레드들은 하나의 실제 kqueue 객체에 접근하는 것이 아닌 추상화된 BlockingQueue에 접근하여 이벤트를 대기(블로킹)한다.

오로지 하나의 스레드만이 kqueue에 접근하여 발생한 kqueue 이벤트를 꺼내어 BlockingQueue에 래핑한 이벤트를 전달한다.

그렇게되면 블로킹된 스레드들이 깨어나서 BlockingQueue에 존재하는 이벤트를 꺼내어 처리할 수 있는 것이다.

 

kqueue의 특징 중 하나는 ET(Edge-Triggered)만을 지원한다는 것이다.

이는 큐 사이즈가 충분하지 않다는 것을 가정하고 소켓을 non-blocking 모드로 처리해야 한다.

소켓 fd를 non-blocking 모드로 설정

 

논블로킹 I/O ?

  • 폴링 방식
  • I/O가 완료되어야 실행될 수 있는 블로킹 방식에 비해, I/O를 요청하고 이미 커널 버퍼에 채워진 값들을 fetch하여 즉시 리턴한다.
  • 이때 값이 있을 수도 있고 없을 수도 있다.
  • (C) 값이 없을 때는 -1이 반환되고 errno이 EAGAIN 혹은 EWOULDBLCOK으로 설정된다.

accept

 

Net.accept: (논블로킹으로 설정된) accept (read 동작)

  • 현재 listen queue(accept queue)에 존재하는 (ACK) 세그먼트를 읽고, 3-way handshake를 최종 처리(server established)하는 동작이다.
  • 논블로킹 모드에서는 listen queue에 아무것도 없다면 즉시 반환

자바에서 논블로킹 모드에서 아무것도 읽지 못했을 때 다음과 같은 값을 가진다.

public final class IOStatus {
	...
    @Native public static final int UNAVAILABLE = -2; // Nothing available (non-blocking)
}

 

읽기 시도 후 아무것도 읽지 못한 상태라면? (KQueuePort) port.startPoll(fdVal, Net.POLLIN);

소켓 fd를 필터링할 이벤트와 함께 kqueue 객체에 등록한다.

그렇게 되면 kqueue 객체에서 발생한 이벤트를 필터링하여 처리할 수 있게되는 것이다.

port.startPoll(fdVal, Net.POLLIN)

KQueuePort는 어떤 과정으로 이벤트를 처리할까?

 

1. channel provider

서버 소켓 채널을 사용할 때 open() (빈 파라미터)을 호출하게 된다.

AsynchronousServerSocketChannel open = AsynchronousServerSocketChannel.open();

 

open은 내부적으로 AsynchronousChannelProvider.provider() 를 이용하여 소켓 채널을 사용한다.

빈 파라미터로 호출한 open() 함수는 DefaultAsynchronousChannelProvider를 사용하는데, 이는 맥OS에서 BsdAsynchrnousChannelProvider이다.

 

이 BsdAsynchrnousChannelProvider에서 KQueuePort를 생성하고 start()를 호출한다.

 

2. (KQueuePort) start(): 실제 kqueue 이벤트 처리 과정

(KQueuePort) startThreads

 

internalThreadCount가 1이고, poolSize는 10이다.

즉 고정된 11개의 스레드(스레드 풀)가 task(KQueuePort.EventHandlerTask)를 실행하는 것이다.

 

3. KQueuePort.EventHandlerTask

3.1. run()

 public void run() {
 	...
    Event ev;
    try {
        for (;;) {
	    ...
            try {
                replaceMe = false;
                ev = queue.take(); // BlockingQueue

                if (ev == NEED_TO_POLL) {
                    try {
                        ev = poll();
                    } ...
                }
            } ...

           ...

            // process event
            try {
                ev.channel().onEvent(ev.events(), isPooledThread);
            } ...
        }
    } finally {
		...
    }
}

 

맨 처음 큐에는 NEED_TO_POLL 이벤트가 들어간다. (생성자에서 this.queue.offer(NEED_TO_POLL) 호출)

이전 위에서 오로지 하나의 스레드만이 발생한 kqueue 이벤트를 읽고, BlockingQueue에 래핑한 이벤트를 넣는다.

그러면 대기 중인 스레드가 깨어난다.

 

이 하나의 스레드가 별도의 스레드가 아닌 큐에서 추상화된 NEED_TO_POLL 이벤트를 읽는 스레드이다.

즉 최초의 접속한 스레드만이 NEED_TO_POLL 이벤트를 읽을 수 있고, poll 함수를 호출하여 kqueue에서 발생한 이벤트를 BlockingQueue에 채워넣는 역할을 하는 것이다.

 

3.2. poll()

private Event poll() throws IOException {
    try {
    	for (;;) {
            int n;
            do {
                n = KQueue.poll(kqfd, address, MAX_KEVENTS_TO_POLL, -1L);
            } while (n == IOStatus.INTERRUPTED);

            fdToChannelLock.readLock().lock();
            try {
                while (n-- > 0) {
                    long keventAddress = KQueue.getEvent(address, n);
                    int fd = KQueue.getDescriptor(keventAddress);

                   // wake up logic …

                    PollableChannel channel = fdToChannel.get(fd);
                    if (channel != null) {
                        int filter = KQueue.getFilter(keventAddress);
                        int events = 0;
                        if (filter == EVFILT_READ)
                            events = Net.POLLIN;
                        else if (filter == EVFILT_WRITE)
                            events = Net.POLLOUT;

                        Event ev = new Event(channel, events);

                        if (n > 0) {
                            queue.offer(ev);
                        } else {
                            return ev;
                        }
                    }
                }
            } finally {
                fdToChannelLock.readLock().unlock();
            }
        }
    } finally {
        queue.offer(NEED_TO_POLL);
    }
}

 

kqueue.poll(): 발생할 이벤트를 블로킹하여 읽는다.

 

발생한 kqueue events가 n개이면 n - 1개를 래핑하여 BlockingQueue에 넣는다.

마지막 1은 poll을 호출한 스레드가 처리하도록 한다.

 

현재 발생한 모든 이벤트를 넣었고, BlockingQueue 끝에는 NEED_TO_POLL 이벤트가 없다.

NEED_TO_POLL 이벤트가 있어야 실제로 kqueue에서 이벤트를 읽을 수 있는데, 만약 NEED_TO_POLL 이벤트가 없다면 이후 도착하는 스레드는 무한 대기하게 되는 것이다. 

 

이것을 방지하기 위해 finally에서 NEED_TO_POLL 이벤트를 넣어준다. 즉 발생한 모든 이벤트 뒤에 NEED_TO_POLL를 넣어 그 이후에 도착한 처음 스레드가 다시 kqueue 이벤트를 읽을 수 있도록 한다.

 

채워진 래핑된 이벤트를 꺼낸 스레드들은 onEvent를 호출하게 된다.

 

onEvent

(Port) onEvent

1. UnixAsynchronousServerSocketChannelImpl

accept를 처리하고, 완료되면 accept에 넘겨준 콜백이 호출된다.

 

2. UnixAsynchronousSocketChannelImpl 

클라이언트에 대한 소켓으로, 소켓 I/O(send, receive, connect)를 처리한다.

finish

 

read 처리만 살펴보자.

private void finishRead(boolean mayInvokeDirect) {
	if (scattering) {
	    n = (int)IOUtil.read(fd, readBuffers, true, nd);
	} else {
	    n = IOUtil.read(fd, readBuffer, -1, true, nd);
	}
    
	if (n == IOStatus.UNAVAILABLE) { // (논블로킹) 읽은 것이 없다면 즉시 리턴
	    ...
	    return;
	}

	// 읽기를 완료하면 전달한 콜백(handler) 호출
	if (handler == null) {
	    future.setResult(result, exc);
	} else {
	    if (mayInvokeDirect) {
	        Invoker.invokeUnchecked(handler, att, result, exc);
	    } else {
	        Invoker.invokeIndirectly(this, handler, att, result, exc);
	    }
	}
}

 

논블로킹 모드에서 읽기를 완료하여 전달한 콜백을 호출하는 것으로 비동기 I/O처리가 끝이 나게 된다.