Runnable/Callable

Runnable имеет один метод, который не возвращает значения.

trait Runnable {
  def run(): Unit
}

Callable за исключением предыдущего трейта возвращает значение

trait Callable[V] {
  def call(): V
}

Потоки

Параллельные вычисления в Scala построены поверх модели параллельных вычислений в Java.

На Sun JVM, с большой нагрузкой на IO, мы можем запустить десятки тысяч потоков на одном компьютере.

Thread принимает параметром Runnable. Вы должны вызвать start в Thread, для того чтобы запустить Runnable.

scala> val hello = new Thread(new Runnable {
  def run() {
    println("hello world")
  }
})
hello: java.lang.Thread = Thread[Thread-3,5,main]

scala> hello.start
hello world

Когда вы видите класс реализующий Runnable, знайте, что он предназначен для запуска кем-то в Thread определенной работы.

Что-нибудь однопоточное

Ниже представлен кусочек кода, который работает, но имеет некоторые проблемы

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)

  def run() {
    while (true) {
      // Здесь будет блокировка, пока не произойдет соединение.
      val socket = serverSocket.accept()
      (new Handler(socket)).run()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

Каждый запрос будет посылать ответ с именем текущего потока, который всегда main.

Основным недостатком этого кода является то, что только один запрос может отвечать в данный момент!

Вы можете поместить каждый запрос в Thread. Просто поменяйте

(new Handler(socket)).run()

на

(new Thread(new Handler(socket))).start()

а вдруг вам захочется заново использовать потоки или изменить политику поведения потока?

Executors

С релизом Java 5, было решено, что требуется более абстрактный интерфейс для Потоков.

Вы можете получить ExecutorService, используя статические методы с объектом Executors. Эти методы позволяют вам конфигурировать ExecutorService с множеством возможностей, таких как пул потоков.

Ниже представлен наш старый блокирующий сетевой сервер, позовляющий использовать параллельные запросы.

import java.net.{Socket, ServerSocket}
import java.util.concurrent.{Executors, ExecutorService}
import java.util.Date

class NetworkService(port: Int, poolSize: Int) extends Runnable {
  val serverSocket = new ServerSocket(port)
  val pool: ExecutorService = Executors.newFixedThreadPool(poolSize)

  def run() {
    try {
      while (true) {
        // This will block until a connection comes in.
        val socket = serverSocket.accept()
        pool.execute(new Handler(socket))
      }
    } finally {
      pool.shutdown()
    }
  }
}

class Handler(socket: Socket) extends Runnable {
  def message = (Thread.currentThread.getName() + "\n").getBytes

  def run() {
    socket.getOutputStream.write(message)
    socket.getOutputStream.close()
  }
}

(new NetworkService(2020, 2)).run

Ниже, представленные соединения показывают, как внутренние потоки могут использоваться повторно.

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

$ nc localhost 2020
pool-1-thread-1

$ nc localhost 2020
pool-1-thread-2

Futures

Future предоставляет возможность асинхронных вычислений. Вы можете обернуть ваши вычисления с помощью Future и когда вам будет нужен результат, вы просто вызовите блокирование метода get(). Executor возвращают Future.

FutureTask – это трейт Runnable и спроектирован для запуска с помощью Executor

val future = new FutureTask[String](new Callable[String]() {
  def call(): String = {
    searcher.search(target);
}})
executor.execute(future)

Теперь мне нужны результаты, поэтому устанавливаем блокировку пока они не будут получены.

val blockingResult = future.get()

Смотрите также: В Effective Scala есть описание Futures .

Проблема безопасности потока

class Person(var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

Эта программа не является безопасной в многопоточной среде. Если два потока имеют ссылки на тот же экземпляр Person и вызывает set, вы не можете предсказать, что name будет в конце обоих вызовов.

В модели памяти Java, каждый процессор имеет право кэшировать значения в L1 и L2 кэш, и два потока работающих на разных процессорах, могут иметь свои собственные виды данных.

Давайте поговорим о некоторых инструментах, которые позволяют держать виды данных в потоках согласованными.

Три инструмента

Синхронизация

Мьютексы позволяют владеть семантикой вычислений. Когда вы входите в мьютекс, вы являетесь его владельцем. Наиболее распространенный способ использования мьютекса в JVM является синхронизация каких-то состояний. В этом случае, мы будем синхронизировать наш userMap.

В JVM, можно синхронизировать любой экземпляр, если только это не null.

class Person(var name: String) {
  def set(changedName: String) {
    this.synchronized {
      name = changedName
    }
  }
}

Изменчивость

В Java 5 перешли к модели памяти, изменчивые и синхронизированные потоки в основном похожи, за исключением того, что в изменчивых потоках разрешен null.

synchronized позволяет более тонкую блокировку. volatile синхронизирован при каждом доступе.

class Person(@volatile var name: String) {
  def set(changedName: String) {
    name = changedName
  }
}

AtomicReference

Также в Java 5 был добвлен целый набор низкоуровневых примитивов для параллельных вычислений. Одним из них является класс AtomicReference

import java.util.concurrent.atomic.AtomicReference

class Person(val name: AtomicReference[String]) {
  def set(changedName: String) {
    name.set(changedName)
  }
}

Значит это ничего не стоит?

@AtomicReference является самым дорогостоящим из этих двух вариантов, поскольку вы должны пройти через метод диспетчер для доступа к значениям.

volatile и synchronized строятся поверх встроенных мониторов в Java. Мониторы стоят очень мало, если нет никаких разногласий. synchronized позволяет более тонкий контроль над синхронизацией, будет меньше конкуренции, поэтому synchronized, как правило, самый дешевый вариант.

Когда вы входите в точки синхронизации, пытаетесь обратиться к изменчивым ссылкам или используете AtomicReferences, Java заставляет процессор очистить кэш-память и обеспечивает согласованное представление данных.

ПОЖАЛУЙСТА, ПОПРАВЬТЕ МЕНЯ, ЕСЛИ Я ЗДЕСЬ ОШИБАЮСЬ. Это сложная тема, и я уверен, что будут продолжительные дискуссии на этот счет.

Другие полезные инструменты Java 5

Как я ранее заметил, Java 5 принес много полезных вещей благодаря AtomicReference.

CountDownLatch

CountDownLatch – это простой механизм для использования множества потоков и их взаимодействия.

val doneSignal = new CountDownLatch(2)
doAsyncWork(1)
doAsyncWork(2)

doneSignal.await()
println("both workers finished!")

Среди прочего, он отлично подходит для юнит-тестов. Допустим, вы делаете некоторую асинхронную работу и хотите убедиться, что функции завершена. Просто ваши функции должны иметь блокировку CountDown и await в тесте.

AtomicInteger/Long

Для испльзования Int и Long во многих задачах были добавлены AtomicInteger и AtomicLong.

AtomicBoolean

Я думаю ненужно объяснять зачем это нужно.

ReadWriteLocks

ReadWriteLock позволяет вам блокировать потоки читателей и писателей. Читатель блокируется, когда писатель установил блокировку.

Давайте построим небезопасный поисковый движок

У нас есть простой инвертированный индекс, который потоконебезопасен. Наши инвертированные индексные карты – это часть имени конкретного пользователя.

Все написано в простой форме, предполагая лишь однопоточный доступ.

Обратите внимание на альтернативный стандартный конструктор this(), который использует mutable.HashMap

import scala.collection.mutable

case class User(name: String, id: Int)

class InvertedIndex(val userMap: mutable.Map[String, User]) {

  def this() = this(new mutable.HashMap[String, User])

  def tokenizeName(name: String): Seq[String] = {
    name.split(" ").map(_.toLowerCase)
  }

  def add(term: String, user: User) {
    userMap += term -> user
  }

  def add(user: User) {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

Я пока оставил возможность получения пользователей вне нашего индекса. Мы вернемся к этому позже.

А теперь давайте сделаем код безопасным

В нашем инвертированном индексе, для userMap не гарантируется безопасность. Несколько клиентов могут попытаться добавить элементы в одно и то же время и имеют ту же видимость ошибок, которую мы видели в нашем первом примере Person.

Так как userMap не является потокобезопасным, то как мы можем держать только один поток изменив его?

Вы могли бы рассмотреть блокировку userMap при добавлении.

def add(user: User) {
  userMap.synchronized {
    tokenizeName(user.name).foreach { term =>
      add(term, user)
    }
  }
}

К сожалению, это слишком грубо. Всегда старайтесь сделать так много дорогостоящей работы за пределами мьютекса насколько это возможно. Помните, что я говорил, блокировка дожна быть дешевой, если нет никаких разногласий. Если вы сделаете меньше работы внутри блока, будет меньше разногласий.

def add(user: User) {
  // tokenizeName как было измерено, самая дорога операция
  val tokens = tokenizeName(user.name)

  tokens.foreach { term =>
    userMap.synchronized {
      add(term, user)
    }
  }
}

SynchronizedMap

Мы может смешать синронизацию с изменяющимся HashMap, используя трейт SynchronizedMap.

Мы можем расширить наш существующий InvertedIndex, давая пользователям простой способ построения синхронизированного индекса.

import scala.collection.mutable.SynchronizedMap

class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) {
  def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User])
}

Если вы посмотрите на реализацию, вы поймете, что это просто синхронизация каждого метода, так что пока это безопасно, оно не может быть таким производительным как бы вам этого хотелось.

Java ConcurrentHashMap

Java поставляется с прекрасным потокобезопасным ConcurrentHashMap. К счастью, мы можем использовать JavaConversions, чтобы использовать семантику Scala.

В самом деле, мы можем легко создать новый код, используя потокобезопасный InvertedIndex, как продолжение старого небезопасного.

import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._

class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User])
    extends InvertedIndex(userMap) {

  def this() = this(new ConcurrentHashMap[String, User])
}

Давайте загрузим наш InvertedIndex

Простой способ


trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class FileRecordProducer(path: String) extends UserMaker {
  def run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      index.add(makeUser(line))
    }
  }
}

Для каждой строки в нашем файле, мы вызываем makeUser, а затем add для нашего InvertedIndex. Если мы используем параллельный InvertedIndex, мы можем вызвать добавление в параллельном потоке и makeUser не будет иметь побочных эффектов, он является потокобезопасным.

Мы не можем прочитать файл параллельно, но мы можем создать пользователя и добавить его в индекс параллельно.

Решение: Producer/Consumer

Общий шаблон для асинхронных вычислений состоит в том, чтобы отделить производителей от потребителей и заставить их взаимодействовать только через Queue(Очередь). Давайте рассмотрим, как это будет работать для нашего индексатора в поисковом движке.

import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue}

// Concrete producer
class Producer[T](path: String, queue: BlockingQueue[T]) implements Runnable {
  public void run() {
    Source.fromFile(path, "utf-8").getLines.foreach { line =>
      queue.put(line)
    }
  }
}

// Абстрактный потребитель
abstract class Consumer[T](queue: BlockingQueue[T]) implements Runnable {
  public void run() {
    while (true) {
      val item = queue.take()
      consume(item)
    }
  }

  def consume(x: T)
}

val queue = new LinkedBlockingQueue[String]()

// Один поток для потребителя
val producer = new Producer[String]("users.txt", q)
new Thread(producer).start()

trait UserMaker {
  def makeUser(line: String) = line.split(",") match {
    case Array(name, userid) => User(name, userid.trim().toInt)
  }
}

class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker {
  def consume(t: String) = index.add(makeUser(t))
}

// Давайте представим, что у нас 8 ядер на данной машине.
val cores = 8
val pool = Executors.newFixedThreadPool(cores)

// Распределим по одному потребителю на каждое ядро.
for (i <- i to cores) {
  pool.submit(new IndexerConsumer[String](index, q))
}