Akka. Тестирование в общем и про тестирование кластера в частности

Продолжая заниматься hello world'ом на akka погрузился в вопрос тестирования акторов.

Общие вещи просты, не вижу смысла пересказывать документацию, остановлюсь только на ключевых моментах и выводах.

Асинхронное vs синхронное тестирование

Есть два подхода синхронное и асинхронное тестирование. Первое в реальной жизни почти никогда не нужно, если только не хочется протестировать какие-то уж совсем внутренние кишки актора. В остальных случаях, лучше тестировать честно, отправляя и принимая ответы от акторов.

TestProbe и TestActor

Часть которую важно понимать. Сначала я думал, что при тестировании будет какая-то чёрная магия, которая позволит мне получать сообщения, летающие между разными акторами.

На деле всё проще. Внутри вашего TestCase создаётся TestProbe и TestActor, которые затем используются для запросов к акторам и анализа приходящих результатов. К сожалению, в документации сразу показывается пример с trait ImplicitSender, который слегка "гримирует" наличие testActor, что вызвало по началу повышенное количество wtf-per-line.

Соответственно набор стандартных assert'ов на самом деле вызывается у стандартного TestProbe. Конечно же таких TestProbe можно даже создать несколько и, например, поместить в них дополнительные специфичные вам assert'ы.

Отсюда же вывод, что для тестирования parent-child взаимодействия придётся вставлять между ними тестовый актор с проксированием сообщений, в документации описаны способы сделать это. В любом случае production код нужно немного к такому подготовить, другой вопрос, что изменения полезны и для других целей.

Cluster testing

С тестированием кластерных конфигурацию всё не так тривиально.

Для начала есть решение multi-jvm тестирования с плагином для sbt, в документации к akka описано как это всё подружить с тестами, чтобы получить Multi Node Testing. Пригодится и для других задач, когда используется просто akka remote.

Печалит, что нужно серьёзно "испортить" конфиг sbt, но, наверное, можно решить выносом таких тестов в отдельный sbt проект. Также ваша IntelliJ IDEA по понятным причинам про такие тесты ничего знать не будет, так как всё магия работает только в связке с sbt. Думаю, в ScalaIDE будет аналогично.

Простого способа дебажить это тоже нет. Логи не очень удобны, так как валяться в параллель со всех JVM. В идеале нужно писать обёртки, которые будут собирать их по каждой ноде отдельно.

"... напоминает мне игру: "Что? Где? Когда?" называется! Непонятно, что где валяется и когда все это кончится!"

Общерекомендуемый подход писать multi-jvm тесты в одном классе, который будет одинаково выполняться на всех нодах. Это обязывает постоянно следить за тем какой код и где исполняется. Например, написанный в лоб assert будет выполнен на всех нодах, часть из которых может быть ещё не присоединена к кластеру.

Постоянно об это спотыкался, но потом написал себе пару удобных утилиток:

import scala.collection.mutable
import org.scalatest.{ BeforeAndAfterAll, Matchers, Suite }
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.{ CurrentClusterState, MemberUp }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeSpec, MultiNodeConfig, MultiNodeSpecCallbacks }
import akka.testkit.ImplicitSender



abstract class MultiNodeBaseSpec(config: MultiNodeConfig)
  extends MultiNodeSpec(config)
  with Suite
  with BeforeAndAfterAll
  with MultiNodeSpecCallbacks
  with ImplicitSender
  with Matchers
{

  override def beforeAll() = {
    super.beforeAll()
    multiNodeSpecBeforeAll()
  }

  override def afterAll() = {
    enterBarrier("before-clean-up")
    cleanUp()
    enterBarrier("clean-up")
    multiNodeSpecAfterAll()
    super.afterAll()
  }

  def cluster: Cluster = Cluster(system)
  cluster.subscribe(testActor, classOf[MemberUp])
  expectMsgClass(classOf[CurrentClusterState])
  println(s"myself address: ${node(myself).address}, role: ${myself.name}")

  val currentClusterNodes = mutable.Set[RoleName]()

  def joinToCluster(nodes: Seq[RoleName], seedNode: RoleName): Unit = {
    currentClusterNodes ++= nodes
    // on new nodes await events for all cluster member
    runOn(nodes: _*) {
      cluster join node(seedNode).address
      (receiveN(currentClusterNodes.size).collect { case MemberUp(member) => member.address }.toSet
        should contain theSameElementsAs currentClusterNodes.map(node(_).address).toSet)
    }

    // on existing nodes await events for only new cluster members
    runOn((currentClusterNodes -- nodes.toSet).toList: _*) {
      (receiveN(nodes.size).collect { case MemberUp(member) => member.address }.toSet
        should contain theSameElementsAs nodes.map(node(_).address).toSet)
    }

    enterBarrier("join-"+ nodes.map(_.name).mkString(","))
  }

  def runOnJoinedNodes(a: => Unit): Unit =
    runOn(currentClusterNodes.toList: _*) {
      a
    }

  def cleanUp(): Unit =
    cluster.unsubscribe(testActor)
}

В базовом spec'е выше реализовано:

  • подписывание на события кластера
  • метод joinToCluster для правильного присоединения к кластеру нод
  • метод runOnJoinedNodes для выполнения кода на уже работающих нодах кластера, аналогичный по использованию встроенному runOn

Тестирование сети

Есть встроенная поддержка тестирования транспорта и сети с возможностью эмуляции проблем между нодами (blackhole). При этом я надеюсь как-нибудь попробовать приспособить docker, его API и iptables для данных целей, благо multi-jvm, кажется умеет сам в тестах упаковывать тестовую ноду в jar, раскладывать через ssh+rsync, а затем запускать.

Примеры

Можно глянуть, что получилось у меня. Много полезных примеров я обнаружил в самих исходниках akka и в проекте akka crdt.

Вывод

Тестировать akka, даже в сложных конфигурациях можно и нужно, но tooling ещё требует доработки.


Akka graceful shutdown и обработка системных сигналов

Когда начал писать для себя hello world на akka сразу захотелось сделать какой-то артефакт, удобный для жизни в дикой среде production. Хочется, чтобы приложение вело себя как примерный гражданин, умело правильно реагировать на всякие системные сигналы и прочее.

С упаковкой приложения в jar вместе с scala runtime удалось легко разобраться. Помогут sbt-native-packager и sbt-assembly. Напишу про это позже, когда ещё пойму как правильно результат сложить в .deb пакет.

Не сразу почему-то удалось сделать правильные реагирования на сигналы системы. Меня пока не интересуют всякие специфичные штуки, достаточно было двух сигналов выключения - SIGINT и SIGTERM, по которым хотелось аккуратно потушить ноду в akka cluster - убить её локальные акторы, отсоединиться и пр.

Сначала попробовал использовать модный_молодёжный метод scala.sys.addShutdownHook. Он принимает => Unit и выполняет его в отдельном треде, когда завершается jvm. Вызов не гарантируется, что в данном случае ок. Плюс метода - переносимость. Минус - код возврата не возможно поставить, метод System.exit() выполняется, но код не ставиться. Экспериментально понял, что на SIGINT код всегда 130, на SIGTERM - 146. Вторая проблема - akka при выключении часть логов пишет в stdout, которые при использовании хука уже не выводятся. Не проверял, но кажется stdout закрывается раньше, чем вызывается этот хук.

Пришлось взять в руки обработчик сигналов и делать на нём. Он из пакета sun.misc, так что с альтернативными jvm могут быть вопросы, но меня это не волнует в данный момент.

В итоге метод выглядит примерно так:

import java.util.concurrent.atomic.AtomicBoolean
import sun.misc.SignalHandler
import sun.misc.Signal
import scala.concurrent.duration._

import com.typesafe.config.ConfigFactory
import akka.cluster.Cluster
import akka.actor.ActorSystem
import akka.util.Timeout

object Main extends App with SignalHandler {

  val SIGING = "INT"
  val SIGTERM = "TERM"

  val terminated = new AtomicBoolean(false)

  // регистрируем сам App объект, как обработчик сигналов
  Signal.handle(new Signal(SIGING), this)
  Signal.handle(new Signal(SIGTERM), this)

  val config = ConfigFactory.load()
  val system = ActorSystem("main", config)
  implicit val executionContext = system.dispatcher
  implicit val defaultTimeout = Timeout(500.millis)
  val cluster = Cluster(system)

  //hook для akka, будет использоваться для любых остановок, не только по сигналам
  system.registerOnTermination {
    System.exit(0)
  }

  // собственно обработчик
  override def handle(signal: Signal): Unit = {
    if (terminated.compareAndSet(false, true) && List(SIGING, SIGTERM).contains(signal.getName)) {
        system.shutdown()
    }
  }
}

Теперь и сигналы честно обрабатываются, и коды возвратов правильные, и все выходы из приложения сводятся в хук для akka.


Akka для распределённых приложений - remote vs cluster

Буду делать небольшие заметки на полях по результатам изучения akka.

Akka – это система акторов на jvm, реализованная на scala. Можно использовать и из java. Про саму концепцию акторов можно почитать в интернетах, здесь я буду останавливаться на особенностях akka.

Что читать?

Я пока рекомендую такие источники:

  • официальная документация - она хорошая, содержит как теоретические основы, так и конкретные примеры кода и описания API. Можно читать из начала в конец, как книгу. Есть pdf.

  • "Akka in Action" by Raymond Roestenburg, Rob Bakker, and Rob Williams - неплохая книга. Я читаю 16 MEAP (early access), к осени-зиме 2015 обещают релиз. Цена 40$. В основном ориентируюсь на документацию, но в книге плюс - итеративный подход к реализации демо проекта. Начинают с hello world'ов и далее по нарастающей. Было полезно, когда хотелось быстрого старта без изучения всех тонкостей теории.

  • Stack Overflow - как всегда стоит делать аккуратно. Фреймворк активно развивается, некоторые вещи появляются, которые раньше нужно было самому велосипедить, некоторые становятся deprecated и пр., но есть не мало полезных объяснений.

  • Googling по тонким вопросам и непоняткам, который нередко приводит в google группу akka.

Если жалко 40$, то книгу можно не покупать. Благородя Typesafe хайпа в сети по akka хватает. Для фреймворка – это хорошо.

Применимость

Меня akka интересует в основном с точки зрения распределённых систем. Однако akka работает и на одной jvm. Даже есть успешные кейсы, когда в такой подход даёт выигрыш в противовес обычным многопоточным потокам.

При этом akka имеет грамотный подход – реализации работы различных механизмов по сети ставиться во главу угла, а локальная работа рассматривается, как один из способов оптимизации.

remote vs cluster

В akka есть понятие "akka remote" – это возможность работать с любыми акторами прозрачно, не зная, где именно они находятся - локально или удалённо. Вы должны каким-либо образом знать, как до этих акторов добраться и эта реализация на вашей ответственности.

Поверх akka remote строиться akka cluster, который реализует кластерное решение, т.е. количество нод может динамически меняться, каждой ноде может быть назначена роль, которую она исполняет в системе.

Ноды договариваются с друг другом по gossip протоколу и внутри используют для хранения состояния CRDT структуры. Одна из нод назначается мастером, если она погасла мастером станет автоматически другая нода.

Гашение нод вы определяете сами, система только говорит, что нода стала не доступна, вы сами можете решить, что делать - удалять её из кластера или нет. Есть стандартная реализация, когда нода удаляется после недоступности в течении какого-то времени.

На события кластера, или более точно на жизненный цикл его нод, можно подписаться.

Некоторые вещи в akka cluster ещё только в планах на реализацию.

Нет встроенного механизма сказать кластеру "я запустился", у ноды при старте должен быть список seed нод, хотя бы одна из которых должна работать. Seed нодой может быть любая нода работающего кластера. Т.е. в реальной жизни тут нужно будет брать какие-то решения для discovery. Как вариант предлагают также держать ноды, которые ничего не делают (не запускают пользовательские акторы), а только используются как seed ноды.

Пока всё. Некоторые из аспектов описанных выше я буду изучать подробнее и ещё про них напишу. Если найдёте в моих записях ошибки, буду раз замечаниям.