Runnable/Callable

Runnable에는 메소드가 하나만 있다. 그 메소드는 반환하는 값이 없다.

trait Runnable {
  def run(): Unit
}

Callable도 비슷하지만, 반환값이 있다.

trait Callable[V] {
  def call(): V
}

Thread

스칼라 동시성은 자바 동시성 모델 위에 구현되어 있다.

Sun의 JVM에서 IO-작업 부하가 큰 경우 한 기계 위에 수만개의 쓰레드를 실행할 수 있다.

Thread는 Runnable을 받는다. Thread가 Runnable을 실행하게 만들려면 Thread의 start 를 호출해야 한다.

scala> val hello = new Thread(new Runnable {
  def run() {
    println("hello world")
  }
})
hello: java.lang.Thread = Thread[Thread-3,5,main]

scala> hello.start
hello world

Runnable을 구현한 클래스는 나중에 어디선가 쓰레드상에 실행될 것을 가정하고 만들어진 클래스이다.

단일 쓰레드 프로그램의 예

아래 프로그램은 잘 동작하긴 하지만 문제가 좀 있다.

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)

  def run() {
    while (true) {
      // This will block until a connection comes in.
      val socket = serverSocket.accept()
      (new Handler(socket)).run()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

요청이 오면 현재 쓰레드의 이름을 응답으로 내보낸다. 이 프로그램에서는 항상 main 이 전달된다.

이 프로그램의 가장 큰 문제점은 한번에 한 요청밖에 처리하지 못한다는 점이다!

각각의 요청을 쓰레드에 넣었다면 더 좋았을 것이다. 단지 아래 부분을

(new Handler(socket)).run()

다음과 같이 고치면 된다.

(new Thread(new Handler(socket))).start()

하지만 쓰레드를 재사용하고 싶거나, 쓰레드의 동작 방식에 어떤 정책을 지정하고 싶다면 어떻게 해야 할까?

Executor

쓰레드를 처리하기 위한 더 추상적인 인터페이스가 필요하다는 의견에 따라 자바 5부터 추가된 것이 있다.

Executors 객체의 정적 메소드를 호출하면 ExecutorService 객체를 얻을 수 있다. 이런 정적 메소드를 활용해 쓰레드 풀과 같은 여러 정책을 ExecutorService 에 지정할 수 있다.

아래는 동시 요청을 허용하기 위해 작성된 구식 블록킹 네트워크 서버이다.

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)
  val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

  def run() {
    try {
      while (true) {
        // This will block until a connection comes in.
        val socket = serverSocket.accept()
        pool.execute(new Handler(socket))
      }
    } finally {
      pool.shutdown()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

아래의 실행 예는 내부에서 쓰레드가 어떻게 재활용되는지 잘 보여준다.

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

Future

Future 는 비동기적 연산을 나타낸다. 필요한 계산을 Future로 감싼 다음 결과가 필요할 때 그 Future의 get() 메소드를 호출하면 된다. 이 get() 메소드는 블록킹 메소드이다.
ExecutorFuture 를 반환한다. Finagle RPC 시스템을 사용한다면 Future 인스턴스에 결과를 담는다. 결과는 경우에 따라 아직 도착하지 않았을 수도 있다.

FutureTask 는 Runnable이며 Executor 가 실행하도록 설계되었다.

val future = new FutureTask[String](new Callable[String]() {
  def call(): String = {
    searcher.search(target);
}})
executor.execute(future)

이제 결과가 필요해졌다. 결과가 나올때까지 블록시키도록 하자.

val blockingResult = future.get()

See Also 스칼라 학교의 Finagle 페이지에는 Future를 사용한 예제가 많이 있다. 그 중에는 Future를 서로 엮는 멋진 방법을 보여주는 것들도 있다. 효율적 스칼라에서도 Future에 대해 다룬다.

쓰레드 안전성 문제

class Person(var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

위의 프로그램은 다중 쓰레드 환경에서는 안전하지 않다. 두 쓰레드가 같은 Person에 대한 인스턴스를 참조하고 있는데 set 을 호출한다면, 각각 쓰레드에서 호출 결과가 어떤 값이 될지 예상할 수 없다.

자바 메모리 모델에서 각 프로세서는 L1, L2 캐시에 값을 넣어둘 수 있기 때문에 서로 다른 프로세서에서 실행중인 쓰레드마다 보고 있는 데이터가 다를 수 있다.

여러 쓰레드에서 데이터를 볼 때 일관성이 있게 해줄 수 있는 여러 도구를 살펴보자.

세가지 도구

동기화(synchronized)

뮤텍스는 소유권을 제공한다. 뮤텍스 안에 들어가면 그것을 소유한 것이다. JVM에서 뮤텍스를 사용하는 가장 일반적인 방법은 어떤 객체에 대해 동기화하는 것이다. 아래 예에서는 Person에 대해 동기화할 것이다.

JVM에서는 null이 아닌 인스턴스라면 어떤 것이든 동기화 대상이 될 수 있다.

class Person(var name: String) {
  def set(changedName: String) {
    this.synchronized {
      name = changedName
    }
  }
}

휘발성(volatile)

자바 5에서 메모리 모델이 바뀌면서 volatile과 synchronized가 같은 의미가 되었다. 다만, volatile에서는 null도 허용한다.

synchronized 는 좀 더 세밀한 잠금을 위해 사용된다. volatile 은 대상에 대한 억세스가 일어날 때마다 동기화한다.

class Person(@volatile var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

AtomicReference

자바 5부터 많은 저수준 동시성 도구들이 추가되었다. 그 중 하나가 AtomicReference 클래스이다.

import java.util.concurrent.atomic.AtomicReference

class Person(val name: AtomicReference[String]) {
  def set(changedName: String) {
    name.set(changedName)
  }
}

사용시 얼마나 비용이 드나?

AtomicReference 는 값을 억세스하기 위해 메소드 디스패치를 거쳐야 하기 때문에 대부분의 경우 가장 비용이 크다.

volatilesynchronized 는 자바의 내장 모니터(역주: 컴퓨터 모니터나 감시용 모듈 등이 아니라 동시성 도구인 모니터이다)를 사용한다. 경쟁이 없는 경우 모니터는 비용이 거의 안든다. synchronized 가 더 세밀한 동시성 제어가 가능하기 때문에 경쟁상황이 더 적게 발생할 것이다. 따라서 synchronized 가 가장 비용이 적게 들 가능성이 높다.

자바에서는 synchronized로 동기화된 지점에 들어가거나, volatile로 지정된 참조를 억세스하거나, AtomicReferences를 역참조하는 경우 프로세서가 캐시 라인을 비워서 데이터를 메모리에서 가져오도록 만든다. 이를 통해 데이터를 볼 때 일관성을 부여해준다.

내(원저자)가 틀렸다면 지적해 주기 바란다. 이 주제는 복잡하기 때문에, 논의할 것이 많이 있다.

자바 5부터 지원되는 여러 도구들

AtomicReference 를 이야기할 때 말했듯이 자바 5부터 많은 도구가 추가되었다.

CountDownLatch

CountDownLatch 는 여러 쓰레드가 서로 통신하는 간단한 메카니즘을 제공한다.

val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)

doneSignal.await()
println("both workers finished!")

무엇보다 단위 테스트시 유용하다. 비동기 작업들이 있어서 각각의 함수가 마무리되었음을 확인하고 싶다 치자. 이때는 각 함수가 래치를 countDown 하고 테스트는 래치를 await 하면 된다.

AtomicInteger/Long

Int나 Long을 증가시키는 경우가 자주 있다. 그래서 AtomicIntegerAtomicLong 이 도입되었다.

AtomicBoolean

더이상의 자세한 설명은 생략한다.

ReadWriteLocks

ReadWriteLock 를 사용하면 읽기와 쓰기 자물쇠(Lock)를 얻을 수 있다. 읽기 자물쇠는 쓰기 자물쇠가 잠겨있는 경우에만 블록된다.

안전하지 않은 검색 엔진을 만들자

여기 쓰레드 안전성이 없는 역 인덱스가 있다. 이 역 인덱스는 이름의 일부에 대해 사용자를 연결해준다.

오직 한 쓰레드만 억세스할 수 있다는 순진한 가정하에 작성되어 있다.

mutable.HashMap 를 사용하게 되어 있는 추가 기본 생성자 this() 에 유의하라.

import scala.collection.mutable

case class User(name: String, id: Int)

class InvertedIndex(val userMap: mutable.Map[String, User]) {

  def this() = this(new mutable.HashMap[String, User])

  def tokenizeName(name: String): Seq[String] = {
    name.split(" ").map(_.toLowerCase)
  }

  def add(term: String, user: User) {
    userMap += term -> user
  }

  def add(user: User) {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

이 인덱스에서 어떻게 사용자 정보를 가져올 수 있는지는 설명하지 않겠다. 어차피 나중에 보게될 것이다.

이제 안전하게 만들자

앞에서 본 역 인덱스 예제에서 userMap의 쓰레드 안전성은 보장되어 있지 않다. 여러 클라이언트가 동시에 원소를 추가하면 첫번째 Person 예제에서 보았던 시점의 문제가 똑같이 발생할 수 있다.

userMap이 쓰레드 안전성이 없다면, 어떻게 그 맵을 변경하는 쓰레드가 한 순간에 하나만 존재하게 강제할 수 있을까?

아마도 원소를 추가할 때만 userMap을 잠그면 어떨까 하고 생각했을 지 모르겠다.

def add(user: User) {
  userMap.synchronized {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

불행하게도 잠그는 범위가 너무 넓다. 비싼 작업은 가능한 한 뮤텍스 바깥쪽에서 하도록 항상 노력 해야한다. 경합이 없어야 잠금의 비용이 줄어든다는 점을 기억하라. 뮤텍스 안에서 하는 일이 적을수록 경합 상황도 적어질 것이다.

def add(user: User) {
  // tokenizeName was measured to be the most expensive operation.
  val tokens = tokenizeName(user.name)

  tokens.foreach { term =>
    userMap.synchronized {
      add(term, user)
    }
  }
}

SynchronizedMap

SynchronizedMap 트레잇을 사용하면 변경 가능한 HashMap에 동기화를 추가할 수 있다.

이를 활용하면 기존 InvertedIndex를 손쉽게 동기화된 인덱스로 만들 수 있다.

import scala.collection.mutable.SynchronizedMap

class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
  def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

하지만, SynchronizedMap의 구현을 살펴보면 모든 메소드를 동기화하고 있음을 알 수 있다. 따라서 안전하기는 하지만, 성능은 원하는 만큼 나오지 않을 수도 있다.

자바 ConcurrentHashMap

자바에 ConcurrentHashMap이라는 훌륭한 쓰레드 안전한 맵이 존재한다. 고맙게도 JavaConverters를 사용해 스칼라로 변환이 가능하다.

이를 활용하면 실제로 예전의 안전하지 않은 InvertedIndex를 매끄럽게 확장해 새 쓰레드 안전한 인덱스로 만들 수 있다.

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._

class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
    extends InvertedIndex(userMap) {

  def this() = this(new ConcurrentHashMap[String, User] asScala)
}

이제 InvertedIndex에 부하를 가해 보자

소박한 방법


trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class FileRecordProducer(path: String) extends UserMaker {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      index.add(makeUser(line))
    }
  }
}

파일의 각 줄에 대해 makeUser 을 부르고 이를 InvertedIndex에 add 한다. 동시적인 InvertedIndex를 사용한다면 makeUser가 부작용이 없어서 항상 쓰레드 안전하기 때문에 병렬로 추가가 가능하다.

파일을 병렬로 읽을 수는 없지만, User는 병렬로 만들 수 있고 , 이를 병렬로 인덱스에 추가할 수 있다.

해결책: 생산자/소비자

비동기적 계산에서 공통된 패턴은 생산자와 소비자를 분리해 둘 사이에는 Queue 를 사용한 통신만 허용하는 것이다. 앞의 검색엔진 인덱스 프로그램에 이를 어떻게 적용할 수 있나 살펴보자.

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// 구체적 생산자
class Producer[T](path: String, queue: BlockingQueue[T]) extends Runnable {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      queue.put(line)
    }
  }
}

// 추상 소비자
abstract class Consumer[T](queue: BlockingQueue[T]) extends Runnable {
  def run() {
    while (true) {
      val item = queue.take()
      consume(item)
    }
  }

  def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()

// 생산자는 쓰레드 하나로 동작
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
  def consume(t: String) = index.add(makeUser(t))
}

// 기계에 코어가 8개 있다 치자
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

// 코어마다 소비자를 하나씩 배당하자
for (i <- i to cores) {
  pool.submit(new IndexerConsumer[String](index, q))
}