Мы начинаем создавать простой распределенный поисковый движок, используя Scala, и обсуждаемый ранее Finagle фреймфорк.
В широком смысле, наши цели должны включать абстракцию (возможность использовать полученную систему, без всех внутренних деталей); модульность (возможность системы делиться на маленькие, простые части, которые просты для понимания и/или могут быть легко заменены); и масштабирование (возможность возможность увеличить производительность системы простыми способами).
Система, которую мы собираемся описать состоит из трех частей: (1) клиенты, которые делают запросы к (2) серверам, которые посылают ответ на запрос, и (3) транспорт – механизм, который упаковывает эти сообщения. Обычно клиент и сервер будут находиться на разных машинах и общаться по сети через конкретный порт, но в данном примере, они будут работать на одной и той же машине (и тоже общаться с помощью портов). В нашем примере, клиенты и серверы будут написаны на Scala, и транспорт будет обрабатываться с помощью Thrift. Основной целью этого урока является показать работу простого сервера и клиента, которые могут быть расширены, чтобы обеспечить необходимый уровень производительность при масштабировании.
Во-первых, создадим скелет проекта (“Searchbird”) с использованием scala-bootstrapper. С его помощью создается простой Finagle на основе Scala сервиса, который экспортирует из памяти хранилище ключ-значение. Сначала мы расширим его, чтобы он мог искать значения, а затем расширим его, чтобы он мог поддерживать поиск во многих, находящихся в памяти, базах, используя несколько процессов.
$ mkdir searchbird ; cd searchbird $ scala-bootstrapper searchbird writing build.sbt writing config/development.scala writing config/production.scala writing config/staging.scala writing config/test.scala writing console writing Gemfile writing project/plugins.sbt writing README.md writing sbt writing src/main/scala/com/twitter/searchbird/SearchbirdConsoleClient.scala writing src/main/scala/com/twitter/searchbird/SearchbirdServiceImpl.scala writing src/main/scala/com/twitter/searchbird/config/SearchbirdServiceConfig.scala writing src/main/scala/com/twitter/searchbird/Main.scala writing src/main/thrift/searchbird.thrift writing src/scripts/searchbird.sh writing src/scripts/config.sh writing src/scripts/devel.sh writing src/scripts/server.sh writing src/scripts/service.sh writing src/test/scala/com/twitter/searchbird/AbstractSpec.scala writing src/test/scala/com/twitter/searchbird/SearchbirdServiceSpec.scala writing TUTORIAL.md
Давайте сначала изучим стандартный проект scala-bootstrapper
, созданный для нас. Он является шаблоном. Вы в конечном счете замените большую часть проекта, но он послужит удобным средством для старта. Проект определяет простое (но полное) хранилище ключ-значение. Конфигурация, thrift интерфейс, статистика экспорта и ведение логов все это включено изначально.
Прежде чем мы вглянем на код, давайте запустим клиент и сервер, чтобы увидеть, как все работает. Вот то, что мы делаем:
а вот интерфейс, который экспортирует наш сервис. Поскольку сервис Searchbird это Thrift сервис (как и большинство наших сервисов), его внешний интерфейс определен в Thrift IDL (“язык описания интерфейса”).
service SearchbirdService { string get(1: string key) throws(1: SearchbirdException ex) void put(1: string key, 2: string value) }
Здесь все просто: наш сервис SearchbirdService
экспортирует 2 RPC метода, get
и put
. Они включают в себя простой интерфейс для хранилища ключ-значение.
Теперь давайте запустим стандартный сервис и клиент, который подключается к этому сервису, и исследуем их через этот интерфейс. Откройте два окна, одно для сервера и одно для клиента.
В первом окне, запустите SBT в интерактивном режиме (запустите ./sbt
из командной строки 1), а затем собирите и запустите проект из SBT. После этого запустится программа main
в Main.scala
.
$ ./sbt ... > compile > run -f config/development.scala ... [info] Running com.twitter.searchbird.Main -f config/development.scala
Файл конфигурации (development.scala
) создает новый сервис, привязывая сервис к порту 9999 на нашей локальной машине. Клиенты могут общаться с этим сервисом при подключении к порту 9999.
Теперь мы создадим и запустим клиент, используя предоставленный @console@ шелл скрипт, который создает экземпляр SearchbirdConsoleClient
(из SearchbirdConsoleClient.scala
). Запустите этот сценарий в другом окне:
$ ./console 127.0.0.1 9999 [info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999 'client' is bound to your thrift client. finagle-client>
Клиентский объект client
теперь подключен к порту 9999 на нашем локальном компьютере и может общаться с сервисом, который мы ранее запустили на этом порту. Теперь давайте отправим несколько запросов:
scala> client.put("marius", "Marius Eriksen") res0: ... scala> client.put("stevej", "Steve Jenson") res1: ... scala> client.get("marius") res2: com.twitter.util.Future[String] = ... scala> client.get("marius").get() res3: String = Marius Eriksen
(Второй вызов get()
создает Future
, который является возвращаемым типом client.get()
, блокируя процесс до того, пока не будет получено значение.)
Сервер также экспортирует статистику времени выполнения (файл конфигурации определяет для этого порт 9900). Это удобно как для проверки отдельных серверов, а также для объединения в глобальную статистику сервиса (предоставляется также машиночитаемый JSON интерфейс). Откройте третье окно и проверьте эту статистику:
$ curl localhost:9900/stats.txt counters: Searchbird/connects: 1 Searchbird/received_bytes: 264 Searchbird/requests: 3 Searchbird/sent_bytes: 128 Searchbird/success: 3 jvm_gc_ConcurrentMarkSweep_cycles: 1 jvm_gc_ConcurrentMarkSweep_msec: 15 jvm_gc_ParNew_cycles: 24 jvm_gc_ParNew_msec: 191 jvm_gc_cycles: 25 jvm_gc_msec: 206 gauges: Searchbird/connections: 1 Searchbird/pending: 0 jvm_fd_count: 135 jvm_fd_limit: 10240 jvm_heap_committed: 85000192 jvm_heap_max: 530186240 jvm_heap_used: 54778640 jvm_nonheap_committed: 89657344 jvm_nonheap_max: 136314880 jvm_nonheap_used: 66238144 jvm_num_cpus: 4 jvm_post_gc_CMS_Old_Gen_used: 36490088 jvm_post_gc_CMS_Perm_Gen_used: 54718880 jvm_post_gc_Par_Eden_Space_used: 0 jvm_post_gc_Par_Survivor_Space_used: 1315280 jvm_post_gc_used: 92524248 jvm_start_time: 1345072684280 jvm_thread_count: 16 jvm_thread_daemon_count: 7 jvm_thread_peak_count: 16 jvm_uptime: 1671792 labels: metrics: Searchbird/handletime_us: (average=9598, count=4, maximum=19138, minimum=637, p25=637, p50=4265, p75=14175, p90=19138, p95=19138, p99=19138, p999=19138, p9999=19138, sum=38393) Searchbird/request_latency_ms: (average=4, count=3, maximum=9, minimum=0, p25=0, p50=5, p75=9, p90=9, p95=9, p99=9, p999=9, p9999=9, sum=14)
В дополнение к нашей статистике сервиса, мы также получаем общую статистику JVM, которая часто полезна.
Теперь давайте заглянем в код, который реализует конфигурацию, сервер и клиент.
Конфигурация является Scala трейтом, который имеет метод apply: RuntimeEnvironment => T
для некоторых T
, которые мы хотим создать. В этом смысле конфигурации – это “фабрики”. Во время выполнения файла конфигурации выполняется как скрипт (с помощью компилятора Scala в качестве библиотеки), и ожидается, что будет получен объект конфигурации.RuntimeEnvironment
является объектом, запрашивающим различные параметры во время выполнения (флаги командной строки, версия JVM, временные метки сборки и т.д.).
SearchbirdServiceConfig
определяет такой же класс. Он определяет параметры конфигурации вместе с их значениями по умолчанию. (Finagle поддерживает общие системы отслеживания, которые мы можем игнорировать для целей настоящего учебника; Zipkin распределенная система отслеживания, является коллектором/агрегатором таких путей следования.)
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] { var thriftPort: Int = 9999 var tracerFactory: Tracer.Factory = NullTracer.factory def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this) }
В нашем случае, мы хотим создать SearchbirdService.ThriftServer
. Он является серверным типом, который создается thrift-ым генератором кода 2.
При наборе “run” в sbt консоли вызывается main
, который настраивает и создает экземпляр этого сервера. Считывается конфигурация (указанная в development.scala
и используется как аргумент внутри “run”), создает SearchbirdService.ThriftServer
и запускает его. RuntimeEnvironment.loadRuntimeConfig
выполняет файл конфигурации и вызывает метод apply
, передавая себя в качестве аргумента3.
object Main { private val log = Logger.get(getClass) def main(args: Array[String]) { val runtime = RuntimeEnvironment(this, args) val server = runtime.loadRuntimeConfig[SearchbirdService.ThriftServer] try { log.info("Starting SearchbirdService") server.start() } catch { case e: Exception => log.error(e, "Failed starting SearchbirdService, exiting") ServiceTracker.shutdown() System.exit(1) } } }
Это основа сервиса: мы расширяем SearchbirdService.ThriftServer
с помощью нашей собственной реализации. Напомним, что SearchbirdService.ThriftServer
был создан для нас thrift-ым генератором кода. Он генерирует метод Scala используя thrift метод. В нашем примере, сгенерированный интерфейс выглядит так:
trait SearchbirdService { def put(key: String, value: String): Future[Void] def get(key: String): Future[String] }
Future[Value]
возвращаются вместо непосредственных значений, так что их вычисление может быть отложено (finagle документация имеет более подробную информацию о Future
). Для целей настоящего учебника, единственное, что вам нужно знать о Future
, то что вы можете получить его значение с помощью get()
.
Стандартная реализация хранилища ключ-значение предоставляемая scala-bootstrapper
очень проста: она обеспечивает структуру данных database
и вызовы get
и put
, которые имеют доступ к этой структуре данных.
class SearchbirdServiceImpl(config: SearchbirdServiceConfig) extends SearchbirdService.ThriftServer { val serverName = "Searchbird" val thriftPort = config.thriftPort override val tracerFactory = config.tracerFactory val database = new mutable.HashMap[String, String]() def get(key: String) = { database.get(key) match { case None => log.debug("get %s: miss", key) Future.exception(SearchbirdException("No such key")) case Some(value) => log.debug("get %s: hit", key) Future(value) } } def put(key: String, value: String) = { log.debug("put %s", key) database(key) = value Future.Unit } def shutdown() = { super.shutdown(0.seconds) } }
Результатом является простой thrift интерфейс для Scala HashMap
.
Теперь мы расширим наш пример, чтобы создать простой поисковой движок. Затем мы расширим его так, чтобы он стал распределенным поисковым движком, состоящим из нескольких распределенных частей, так что мы сможем вместить данных больше, чем может поместиться в памяти одной машины.
Для простоты, мы будем расширять наш нынешний сервис thrift по частям, реализуя поддержку операции поиска. Моделью использования является put
документы в поисковую систему, где каждый документ содержит ряд лексем (слов), тогда мы можем искать в строке лексем, чтобы вернуть все документы, которые содержат все лексемы в наборе. Архитектура совпадает с предыдущим примером, но для добавления вызываем search
.
Чтобы реализовать такой поисковый движок, сделайте следующие изменения в двух файлах:
service SearchbirdService { string get(1: string key) throws(1: SearchbirdException ex) void put(1: string key, 2: string value) list<string> search(1: string query) }
Мы добавили метод search
, который ищет в текущей хеш-таблице, возвращая список ключей, значения которых соответствуют запросу. Реализация также проста:
Большинство наших изменения происходят в этом файле.
В настоящее время database
hashmap содержит начальный индекс, который включает ключ на документ. Мы переименуем его в forward
и добавим вторую карту для обратного
индекса (который отображает лексему на набор документов, которые содержат эту лексему). Таким образом, внутри SearchbirdServiceImpl.scala
, заменим определение database
с помощью:
val forward = new mutable.HashMap[String, String] with mutable.SynchronizedMap[String, String] val reverse = new mutable.HashMap[String, Set[String]] with mutable.SynchronizedMap[String, Set[String]]
Внутри вызова get
, заменим database
с помощью forward
, иначе get
останется тем же (он выполняется перед поиском). Тем не менее put
требует изменений: мы также должны заполнить обратный индекс для каждой лексемы в документе путем добавления ключа документа в список, связанный с этой лексемой. Замените вызов put
с помощью следующего кода. Учитывая особый знак поиска, теперь мы можем использовать reverse
карту, чтобы просмотреть документы.
def put(key: String, value: String) = { log.debug("put %s", key) forward(key) = value // serialize updaters synchronized { value.split(" ").toSet foreach { token => val current = reverse.getOrElse(token, Set()) reverse(token) = current + key } } Future.Unit }
Заметим, что (хотя HashMap
является потокобезопасным) только один поток может обновлять reverse
карту за раз, чтобы убедиться, что чтение-модификация-запись конкретной записи – это атомарные операции. (Код является чрезмерно консервативным, он блокирует всю карту, вместо блокировки каждой отдельной операции получение-модификация-запись). Также обратите внимание на использование Set
как структуры данных, это гарантирует, что если похожая лексема появляется дважды в документе, то она будет обработана только один раз с помощью foreach
.
Реализация имеет проблему, которая остается в качестве упражнения для читателя: когда мы перезаписываем ключ с новым документом, мы не удаляем все ссылки на старый документ в обратном индексе.
Вернемся к поисковой системе: используем новый метод search
. Он должен разметить свой запрос, просмотреть все соответствующие документы, а затем находит общее в этих списках. После этого получаем список документов, которые содержат все лексемы в запросе. Все это просто выразить в Scala, добавив это к классу SearchbirdServiceImpl
:
def search(query: String) = Future.value { val tokens = query.split(" ") val hits = tokens map { token => reverse.getOrElse(token, Set()) } val intersected = hits reduceLeftOption { _ & _ } getOrElse Set() intersected.toList }
Несколько вещей, которые важны в этом небольшом фрагменте кода. При построении списка, если ключ (token
) не найден, getOrElse
будет отдавать значение второго параметра (в данном случае, пустой Set
). Мы выполняем пересечение, используя left-reduce. Специфический способ, reduceLeftOption
, не будет пытаться выполнить reduce, если hits
пуст, возвращая вместо этого None
. Он позволяет указать значение по умолчанию, вместо исключения. На самом деле, это эквивалентно:
def search(query: String) = Future.value { val tokens = query.split(" ") val hits = tokens map { token => reverse.getOrElse(token, Set()) } if (hits.isEmpty) Nil else hits reduceLeft { _ & _ } toList }
Какой вариант использовать это в основном дело вкуса, хотя функциональный стиль часто избегает условные конструкции для разумных значений по умолчанию.
Теперь мы можем экспериментировать с нашей новой реализацией с помощью консоли. Снова запустите сервер:
$ ./sbt ... > compile > run -f config/development.scala ... [info] Running com.twitter.searchbird.Main -f config/development.scala
и потом из директории searchbird, запустите клиент:
$ ./console 127.0.0.1 9999 ... [info] Running com.twitter.searchbird.SearchbirdConsoleClient 127.0.0.1 9999 'client' is bound to your thrift client. finagle-client>
Вставьте следующее описание лекции в консоль:
client.put("basics", " values functions classes methods inheritance try catch finally expression oriented") client.put("basics", " case classes objects packages apply update functions are objects (uniform access principle) pattern") client.put("collections", " lists maps functional combinators (map foreach filter zip") client.put("pattern", " more functions! partialfunctions more pattern") client.put("type", " basic types and type polymorphism type inference variance bounds") client.put("advanced", " advanced types view bounds higher kinded types recursive types structural") client.put("simple", " all about sbt the standard scala build") client.put("more", " tour of the scala collections") client.put("testing", " write tests with specs a bdd testing framework for") client.put("concurrency", " runnable callable threads futures twitter") client.put("java", " java interop using scala from") client.put("searchbird", " building a distributed search engine using")
Теперь мы можем выполнить несколько запросов, которые возвращают ключи документов, которые содержат условия поиска.
> client.search("functions").get() res12: Seq[String] = ArrayBuffer(basics) > client.search("java").get() res13: Seq[String] = ArrayBuffer(java) > client.search("java scala").get() res14: Seq[String] = ArrayBuffer(java) > client.search("functional").get() res15: Seq[String] = ArrayBuffer(collections) > client.search("sbt").get() res16: Seq[String] = ArrayBuffer(simple) > client.search("types").get() res17: Seq[String] = ArrayBuffer(type, advanced)
Напомним, что если вызов возвращает Future
, мы должны использовать блокировку вызова get()
для получения значения, содержащегося в этом future. Мы можем использовать команду Future.collect
, чтобы сделать несколько одновременных запросов и подождать, пока все они завершатся успешно:
> import com.twitter.util.Future ... > Future.collect(Seq( client.search("types"), client.search("sbt"), client.search("functional") )).get() res18: Seq[Seq[String]] = ArrayBuffer(ArrayBuffer(type, advanced), ArrayBuffer(simple), ArrayBuffer(collections))
На одной машине, наши простой поисковый движок, находящийся в памяти, не сможет искать фрагменты больше, чем размер оперативной памяти. Мы будем теперь пытаться исправить эту ситуацию, распределяя узлы с помощью простой схемы распределения (шардинга). Вот блок-схема:
В помощь нашему делу, мы введем другую абстаркцию -- для того чтобы отделить раелизацию индекса от Index
SearchbirdService
. Сделать это просто. Мы начнем с добавления файла Index в сборку (create the file searchbird/src/main/scala/com/twitter/searchbird/Index.scala
):
package com.twitter.searchbird import scala.collection.mutable import com.twitter.util._ import com.twitter.conversions.time._ import com.twitter.logging.Logger import com.twitter.finagle.builder.ClientBuilder import com.twitter.finagle.thrift.ThriftClientFramedCodec trait Index { def get(key: String): Future[String] def put(key: String, value: String): Future[Unit] def search(key: String): Future[List[String]] } class ResidentIndex extends Index { val log = Logger.get(getClass) val forward = new mutable.HashMap[String, String] with mutable.SynchronizedMap[String, String] val reverse = new mutable.HashMap[String, Set[String]] with mutable.SynchronizedMap[String, Set[String]] def get(key: String) = { forward.get(key) match { case None => log.debug("get %s: miss", key) Future.exception(SearchbirdException("No such key")) case Some(value) => log.debug("get %s: hit", key) Future(value) } } def put(key: String, value: String) = { log.debug("put %s", key) forward(key) = value // admit only one updater. synchronized { (Set() ++ value.split(" ")) foreach { token => val current = reverse.get(token) getOrElse Set() reverse(token) = current + key } } Future.Unit } def search(query: String) = Future.value { val tokens = query.split(" ") val hits = tokens map { token => reverse.getOrElse(token, Set()) } val intersected = hits reduceLeftOption { _ & _ } getOrElse Set() intersected.toList } }
Сейчас мы превратили наш thrift сервис в простой механизм диспетчеризации: он обеспечивает thrift интерфейс для любого экземпляра Index
. Это мощная абстракция, потому что она отделяет реализацию сервиса от реализации индекса. Сервис не должен знать никаких подробностей об основных индексах, индекс может быть локальным или удаленным, или может быть составной частью многих удаленных индексов, но сервис не заботится об этом, и реализация индекса может изменяться без изменения сервиса.
Замените определение класса SearchbirdServiceImpl
тем, что ниже (которая уже не содержит подробностей реализации индекса и гораздо проще). Обратите внимание, инициализация сервера теперь принимает второй аргумент Index
.
class SearchbirdServiceImpl(config: SearchbirdServiceConfig, index: Index) extends SearchbirdService.ThriftServer { val serverName = "Searchbird" val thriftPort = config.thriftPort def get(key: String) = index.get(key) def put(key: String, value: String) = index.put(key, value) flatMap { _ => Future.Unit } def search(query: String) = index.search(query) def shutdown() = { super.shutdown(0.seconds) } }
Обновите ваш вызов apply
в SearchbirdServiceConfig
соответственно:
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] { var thriftPort: Int = 9999 var tracerFactory: Tracer.Factory = NullTracer.factory def apply(runtime: RuntimeEnvironment) = new SearchbirdServiceImpl(this, new ResidentIndex) }
Мы создали нашу простую распределенную систему, в которой есть один главный узел, который координирует запросы к его дочерним узлам. Для того чтобы добиться этого, нам нужно два новых типа Index
. Один представляет собой удаленный индекс, другой сводный индекс нескольких других экземпляров Index
. Таким образом, мы можем построить распределенный индекс, определяя экземпляр сводного индекса для удаленных индексов. Заметим, что оба индекса типы Index
имеют одинаковый интерфейс, так что серверам не нужно знать, является ли индекс, к которому они подключаются удаленным или составным.
В Index.scala
, определим CompositeIndex
:
class CompositeIndex(indices: Seq[Index]) extends Index { require(!indices.isEmpty) def get(key: String) = { val queries = indices.map { idx => idx.get(key) map { r => Some(r) } handle { case e => None } } Future.collect(queries) flatMap { results => results.find { _.isDefined } map { _.get } match { case Some(v) => Future.value(v) case None => Future.exception(SearchbirdException("No such key")) } } } def put(key: String, value: String) = Future.exception(SearchbirdException("put() not supported by CompositeIndex")) def search(query: String) = { val queries = indices.map { _.search(query) rescue { case _=> Future.value(Nil) } } Future.collect(queries) map { results => (Set() ++ results.flatten) toList } } }
Сводный индекс работает с набором базовых экземпляров Index
. Обратите внимание, что он не заботится, как они на самом деле реализованы. Этот тип композиции обеспечивает большую гибкость в построении различных схем запросов. Мы не определяем схему распределения, и сводный индекс не поддерживает операции put
. Они выполняются непосредственно дочерними узлами. get
реализуется с помощью запросов всех наших дочерних узлов и выбирается первый успешный результат. Если таковых нет, то выбрасывается исключение. Отметим, что поскольку сообщается отсутствие значения, выбрасывая исключение, мы handle
(управляем) этим с помощью Future
, превращая любое исключение в занчение None
. В реальной системе, мы бы, вероятно, имели соответствующие коды ошибок для отсутствующих значений, а не использовали исключения. Исключения являются удобными и целесообразными для создания прототипов, но в других случаях они не подходят. Для того, чтобы отличить реальное исключение и отсутствующее значение, мне нужно посмотреть само исключение. Скорее всего, это лучший способ для указания различий непосредственно в типе возвращаемого значения.
search
работает похожим образом, как и раньше. Вместо того, чтобы взять первый результат, мы объединяем их, обеспечивая их уникальность с помощью конструкции Set
.
RemoteIndex
предоставляет интерфейс Index
для удаленного сервера.
class RemoteIndex(hosts: String) extends Index { val transport = ClientBuilder() .name("remoteIndex") .hosts(hosts) .codec(ThriftClientFramedCodec()) .hostConnectionLimit(1) .timeout(500.milliseconds) .build() val client = new SearchbirdService.FinagledClient(transport) def get(key: String) = client.get(key) def put(key: String, value: String) = client.put(key, value) map { _ => () } def search(query: String) = client.search(query) map { _.toList } }
Это создает finagle thrift клиент с некоторыми подходящими значениями по умолчанию и прокси вызовами, незначительно меняя типы.
Теперь у нас есть все, что нам нужно. Прежде нужно настроить конфигурацию для того, чтобы иметь возможность вызвать данный узел либо как главный, либо как узел разделения данных. Для того, чтобы сделать это, мы перечислим шарды (распределенные узлы) в нашей системе путем создания нового элемента конфигурации. Мы также должны добавить Index
аргумент в наш экземпляр SearchbirdServiceImpl
. Мы будем использовать аргументы командной строки (напомним, что Config
имеет к ним доступ), чтобы запустить сервер в любом режиме.
class SearchbirdServiceConfig extends ServerConfig[SearchbirdService.ThriftServer] { var thriftPort: Int = 9999 var shards: Seq[String] = Seq() def apply(runtime: RuntimeEnvironment) = { val index = runtime.arguments.get("shard") match { case Some(arg) => val which = arg.toInt if (which >= shards.size || which < 0) throw new Exception("invalid shard number %d".format(which)) // override with the shard port val Array(_, port) = shards(which).split(":") thriftPort = port.toInt new ResidentIndex case None => require(!shards.isEmpty) val remotes = shards map { new RemoteIndex(_) } new CompositeIndex(remotes) } new SearchbirdServiceImpl(this, index) } }
Теперь будем настраивать свою конфигурацию: добавляем инициализацию “шардов” в экземпляр SearchbirdServiceConfig
(мы можем общаться с шардом 0 через порт 9000, с шардом 1 через порт 9001, и так далее).
new SearchbirdServiceConfig { // Add your own config here shards = Seq( "localhost:9000", "localhost:9001", "localhost:9002" ) ...
Закомментируйте строку admin.httpPort
(мы не хотим, чтобы были запущены одни и теже сервисы на данной машине, каждый из которых пытается открыть один и тот же порт):
// admin.httpPort = 9900
Теперь, если мы вызываем наш сервер без каких-либо аргументов, он стартует как главный узел, который общается со всеми даннымы шардами. Если мы зададим аргумент с номером шарда, то сервер запускается на порту, который принадлежит этому шарду.
Давайте попробуем! Мы запустим 3 сервиса: 2 шарда и 1 главный узел. Во-первых компилируем изменения:
$ ./sbt > compile ... > exit
Потом запускаем 3 сервера:
$ ./sbt 'run -f config/development.scala -D shard=0' $ ./sbt 'run -f config/development.scala -D shard=1' $ ./sbt 'run -f config/development.scala'
Вы можете запустить их либо в 3 различных окнах, либо (в одном окне) каждый будет ждать своей очереди для запуска, нажмите Ctrl-Z, если вы хотите приостановить сервис, или используйте bg
, чтобы запустить его в фоновом режиме.
Мы будем взаимодействовать с ним через консоль. Во-первых, давайте заполним данные на двух шардах. Запускаем из директории searchbird:
$ ./console localhost 9000 ... > client.put("fromShardA", "a value from SHARD_A") > client.put("hello", "world")
$ ./console localhost 9001 ... > client.put("fromShardB", "a value from SHARD_B") > client.put("hello", "world again")
Вы можете выйти из этих сеансов консоли, как только вы их завершите. Теперь сделаем запрос к нашей базе данных на главном узле (порт 9999):
$ ./console localhost 9999 [info] Running com.twitter.searchbird.SearchbirdConsoleClient localhost 9999 'client' is bound to your thrift client. finagle-client> client.get("hello").get() res0: String = world finagle-client> client.get("fromShardC").get() SearchbirdException(No such key) ... finagle-client> client.get("fromShardA").get() res2: String = a value from SHARD_A finagle-client> client.search("hello").get() res3: Seq[String] = ArrayBuffer() finagle-client> client.search("world").get() res4: Seq[String] = ArrayBuffer(hello) finagle-client> client.search("value").get() res5: Seq[String] = ArrayBuffer(fromShardA, fromShardB)
Этот дизайн имеет несколько абстракций данных, которые позволяют использовать более модульную и масштабируемую реализацию:
ResidentIndex
ничего не знает о сети, серверах или клиентах.CompositeIndex
ничего не знает о том, как его составные индексы реализованы или их базовую структуру данных, он просто распределяет свои запросы к ним.search
для серверов, который позволяет серверу запросить его локальные структуры данных (ResidentIndex
) или распространять запросы на другие серверы (CompositeIndex
) без необходимости знать различия, которые скрыты от вызывающего объекта.SearchbirdServiceImpl
и Index
– это отдельные модули, позволяющие простую реализацию сервиса и отделяют реализацию структуры данных сервиса, которая обращается к нему.Возможные улучшения этой реализации будут включать:
put()
на все узлы. Вместо этого, мы могли бы использовать хэш-таблицу для отправки вызовов put()
только одному узлу хранения и распространяя хранилище на все узлы.
1 Локальный скрипт ./sbt
гарантирует, что версия SBT не изменяет то, что мы знаем обо всех имеющися библиотеках.
2 В target/gen-scala/com/twitter/searchbird/SearchbirdService.scala
.
3 Смотрите Ostrich’s “README”: https://github.com/twitter/ostrich/blob/master/README.md для получения дополнительной информации.