Runnable имеет один метод, который не возвращает значения.
trait Runnable { def run(): Unit }
Callable за исключением предыдущего трейта возвращает значение
trait Callable[V] { def call(): V }
Параллельные вычисления в Scala построены поверх модели параллельных вычислений в Java.
На Sun JVM, с большой нагрузкой на IO, мы можем запустить десятки тысяч потоков на одном компьютере.
Thread принимает параметром Runnable. Вы должны вызвать start
в Thread, для того чтобы запустить Runnable.
scala> val hello = new Thread(new Runnable { def run() { println("hello world") } }) hello: java.lang.Thread = Thread[Thread-3,5,main] scala> hello.start hello world
Когда вы видите класс реализующий Runnable, знайте, что он предназначен для запуска кем-то в Thread определенной работы.
Ниже представлен кусочек кода, который работает, но имеет некоторые проблемы
import java.net.{Socket, ServerSocket} import java.util.concurrent.{Executors, ExecutorService} import java.util.Date class NetworkService(port: Int, poolSize: Int) extends Runnable { val serverSocket = new ServerSocket(port) def run() { while (true) { // Здесь будет блокировка, пока не произойдет соединение. val socket = serverSocket.accept() (new Handler(socket)).run() } } } class Handler(socket: Socket) extends Runnable { def message = (Thread.currentThread.getName() + "\n").getBytes def run() { socket.getOutputStream.write(message) socket.getOutputStream.close() } } (new NetworkService(2020, 2)).run
Каждый запрос будет посылать ответ с именем текущего потока, который всегда main
.
Основным недостатком этого кода является то, что только один запрос может отвечать в данный момент!
Вы можете поместить каждый запрос в Thread. Просто поменяйте
(new Handler(socket)).run()
на
(new Thread(new Handler(socket))).start()
а вдруг вам захочется заново использовать потоки или изменить политику поведения потока?
С релизом Java 5, было решено, что требуется более абстрактный интерфейс для Потоков.
Вы можете получить ExecutorService
, используя статические методы с объектом Executors
. Эти методы позволяют вам конфигурировать ExecutorService
с множеством возможностей, таких как пул потоков.
Ниже представлен наш старый блокирующий сетевой сервер, позовляющий использовать параллельные запросы.
import java.net.{Socket, ServerSocket} import java.util.concurrent.{Executors, ExecutorService} import java.util.Date class NetworkService(port: Int, poolSize: Int) extends Runnable { val serverSocket = new ServerSocket(port) val pool: ExecutorService = Executors.newFixedThreadPool(poolSize) def run() { try { while (true) { // This will block until a connection comes in. val socket = serverSocket.accept() pool.execute(new Handler(socket)) } } finally { pool.shutdown() } } } class Handler(socket: Socket) extends Runnable { def message = (Thread.currentThread.getName() + "\n").getBytes def run() { socket.getOutputStream.write(message) socket.getOutputStream.close() } } (new NetworkService(2020, 2)).run
Ниже, представленные соединения показывают, как внутренние потоки могут использоваться повторно.
$ nc localhost 2020 pool-1-thread-1 $ nc localhost 2020 pool-1-thread-2 $ nc localhost 2020 pool-1-thread-1 $ nc localhost 2020 pool-1-thread-2
Future
предоставляет возможность асинхронных вычислений. Вы можете обернуть ваши вычисления с помощью Future и когда вам будет нужен результат, вы просто вызовите блокирование метода get()
. Executor
возвращают Future
.
FutureTask
– это трейт Runnable и спроектирован для запуска с помощью Executor
val future = new FutureTask[String](new Callable[String]() { def call(): String = { searcher.search(target); }}) executor.execute(future)
Теперь мне нужны результаты, поэтому устанавливаем блокировку пока они не будут получены.
val blockingResult = future.get()
Смотрите также: В Effective Scala есть описание Futures .
class Person(var name: String) { def set(changedName: String) { name = changedName } }
Эта программа не является безопасной в многопоточной среде. Если два потока имеют ссылки на тот же экземпляр Person и вызывает set
, вы не можете предсказать, что name
будет в конце обоих вызовов.
В модели памяти Java, каждый процессор имеет право кэшировать значения в L1 и L2 кэш, и два потока работающих на разных процессорах, могут иметь свои собственные виды данных.
Давайте поговорим о некоторых инструментах, которые позволяют держать виды данных в потоках согласованными.
Мьютексы позволяют владеть семантикой вычислений. Когда вы входите в мьютекс, вы являетесь его владельцем. Наиболее распространенный способ использования мьютекса в JVM является синхронизация каких-то состояний. В этом случае, мы будем синхронизировать наш userMap.
В JVM, можно синхронизировать любой экземпляр, если только это не null.
class Person(var name: String) { def set(changedName: String) { this.synchronized { name = changedName } } }
В Java 5 перешли к модели памяти, изменчивые и синхронизированные потоки в основном похожи, за исключением того, что в изменчивых потоках разрешен null.
synchronized
позволяет более тонкую блокировку. volatile
синхронизирован при каждом доступе.
class Person(@volatile var name: String) { def set(changedName: String) { name = changedName } }
Также в Java 5 был добвлен целый набор низкоуровневых примитивов для параллельных вычислений. Одним из них является класс AtomicReference
import java.util.concurrent.atomic.AtomicReference class Person(val name: AtomicReference[String]) { def set(changedName: String) { name.set(changedName) } }
@AtomicReference является самым дорогостоящим из этих двух вариантов, поскольку вы должны пройти через метод диспетчер для доступа к значениям.
volatile
и synchronized
строятся поверх встроенных мониторов в Java. Мониторы стоят очень мало, если нет никаких разногласий. synchronized
позволяет более тонкий контроль над синхронизацией, будет меньше конкуренции, поэтому synchronized
, как правило, самый дешевый вариант.
Когда вы входите в точки синхронизации, пытаетесь обратиться к изменчивым ссылкам или используете AtomicReferences, Java заставляет процессор очистить кэш-память и обеспечивает согласованное представление данных.
ПОЖАЛУЙСТА, ПОПРАВЬТЕ МЕНЯ, ЕСЛИ Я ЗДЕСЬ ОШИБАЮСЬ. Это сложная тема, и я уверен, что будут продолжительные дискуссии на этот счет.
Как я ранее заметил, Java 5 принес много полезных вещей благодаря AtomicReference
.
CountDownLatch
– это простой механизм для использования множества потоков и их взаимодействия.
val doneSignal = new CountDownLatch(2) doAsyncWork(1) doAsyncWork(2) doneSignal.await() println("both workers finished!")
Среди прочего, он отлично подходит для юнит-тестов. Допустим, вы делаете некоторую асинхронную работу и хотите убедиться, что функции завершена. Просто ваши функции должны иметь блокировку CountDown
и await
в тесте.
Для испльзования Int и Long во многих задачах были добавлены AtomicInteger
и AtomicLong
.
Я думаю ненужно объяснять зачем это нужно.
ReadWriteLock
позволяет вам блокировать потоки читателей и писателей. Читатель блокируется, когда писатель установил блокировку.
У нас есть простой инвертированный индекс, который потоконебезопасен. Наши инвертированные индексные карты – это часть имени конкретного пользователя.
Все написано в простой форме, предполагая лишь однопоточный доступ.
Обратите внимание на альтернативный стандартный конструктор this()
, который использует mutable.HashMap
import scala.collection.mutable case class User(name: String, id: Int) class InvertedIndex(val userMap: mutable.Map[String, User]) { def this() = this(new mutable.HashMap[String, User]) def tokenizeName(name: String): Seq[String] = { name.split(" ").map(_.toLowerCase) } def add(term: String, user: User) { userMap += term -> user } def add(user: User) { tokenizeName(user.name).foreach { term => add(term, user) } } }
Я пока оставил возможность получения пользователей вне нашего индекса. Мы вернемся к этому позже.
В нашем инвертированном индексе, для userMap не гарантируется безопасность. Несколько клиентов могут попытаться добавить элементы в одно и то же время и имеют ту же видимость ошибок, которую мы видели в нашем первом примере Person
.
Так как userMap не является потокобезопасным, то как мы можем держать только один поток изменив его?
Вы могли бы рассмотреть блокировку userMap при добавлении.
def add(user: User) { userMap.synchronized { tokenizeName(user.name).foreach { term => add(term, user) } } }
К сожалению, это слишком грубо. Всегда старайтесь сделать так много дорогостоящей работы за пределами мьютекса насколько это возможно. Помните, что я говорил, блокировка дожна быть дешевой, если нет никаких разногласий. Если вы сделаете меньше работы внутри блока, будет меньше разногласий.
def add(user: User) { // tokenizeName как было измерено, самая дорога операция val tokens = tokenizeName(user.name) tokens.foreach { term => userMap.synchronized { add(term, user) } } }
Мы может смешать синронизацию с изменяющимся HashMap, используя трейт SynchronizedMap.
Мы можем расширить наш существующий InvertedIndex, давая пользователям простой способ построения синхронизированного индекса.
import scala.collection.mutable.SynchronizedMap class SynchronizedInvertedIndex(userMap: mutable.Map[String, User]) extends InvertedIndex(userMap) { def this() = this(new mutable.HashMap[String, User] with SynchronizedMap[String, User]) }
Если вы посмотрите на реализацию, вы поймете, что это просто синхронизация каждого метода, так что пока это безопасно, оно не может быть таким производительным как бы вам этого хотелось.
Java поставляется с прекрасным потокобезопасным ConcurrentHashMap. К счастью, мы можем использовать JavaConversions, чтобы использовать семантику Scala.
В самом деле, мы можем легко создать новый код, используя потокобезопасный InvertedIndex, как продолжение старого небезопасного.
import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConversions._ class ConcurrentInvertedIndex(userMap: collection.mutable.ConcurrentMap[String, User]) extends InvertedIndex(userMap) { def this() = this(new ConcurrentHashMap[String, User]) }
trait UserMaker { def makeUser(line: String) = line.split(",") match { case Array(name, userid) => User(name, userid.trim().toInt) } } class FileRecordProducer(path: String) extends UserMaker { def run() { Source.fromFile(path, "utf-8").getLines.foreach { line => index.add(makeUser(line)) } } }
Для каждой строки в нашем файле, мы вызываем makeUser
, а затем add
для нашего InvertedIndex. Если мы используем параллельный InvertedIndex, мы можем вызвать добавление в параллельном потоке и makeUser не будет иметь побочных эффектов, он является потокобезопасным.
Мы не можем прочитать файл параллельно, но мы можем создать пользователя и добавить его в индекс параллельно.
Общий шаблон для асинхронных вычислений состоит в том, чтобы отделить производителей от потребителей и заставить их взаимодействовать только через Queue(Очередь)
. Давайте рассмотрим, как это будет работать для нашего индексатора в поисковом движке.
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue} // Concrete producer class Producer[T](path: String, queue: BlockingQueue[T]) implements Runnable { public void run() { Source.fromFile(path, "utf-8").getLines.foreach { line => queue.put(line) } } } // Абстрактный потребитель abstract class Consumer[T](queue: BlockingQueue[T]) implements Runnable { public void run() { while (true) { val item = queue.take() consume(item) } } def consume(x: T) } val queue = new LinkedBlockingQueue[String]() // Один поток для потребителя val producer = new Producer[String]("users.txt", q) new Thread(producer).start() trait UserMaker { def makeUser(line: String) = line.split(",") match { case Array(name, userid) => User(name, userid.trim().toInt) } } class IndexerConsumer(index: InvertedIndex, queue: BlockingQueue[String]) extends Consumer[String](queue) with UserMaker { def consume(t: String) = index.add(makeUser(t)) } // Давайте представим, что у нас 8 ядер на данной машине. val cores = 8 val pool = Executors.newFixedThreadPool(cores) // Распределим по одному потребителю на каждое ядро. for (i <- i to cores) { pool.submit(new IndexerConsumer[String](index, q)) }