【Akka扩展】Akka Extensions

如果你想为 Akka 添加功能,那么有一个非常优雅且功能强大的机制。它被称为 Akka 扩展(Akka Extensions),由2个基本组件组成:ExtensionExtensionId

每个 ActorSystem 只能加载一次扩展,由 Akka 管理。你可以选择通过 Akka 配置按需加载扩展程序或在 ActorSystem 创建时加载。 有关如何进行此操作的详细信息,请参见“从配置加载”一节。

由于扩展是钩入(hook into) Akka 本身的一种方式,扩展的实现者需要确保扩展的线程安全。

创建一个扩展

因此,让我们创建一个示例扩展,使我们计算发生事件的次数。

首先,我们定义我们的 Extension 应该做什么:

import akka.actor.Extension

class CountExtensionImpl extends Extension {
  //由于这个扩展是一个共享实例
  // 每个 ActorSystem 我们需要线程安全
  private val counter = new AtomicLong(0)

  //这是这个 Extension 提供的操作
  def increment() = counter.incrementAndGet()
}

然后,我们需要为我们的扩展创建一个 ExtensionId,以便我们可以获取它。

import akka.actor.ActorSystem
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.ExtendedActorSystem

object CountExtension
  extends ExtensionId[CountExtensionImpl]
  with ExtensionIdProvider {
  //ExtensionIdProvider需要查找方法,
  // 所以我们在这里返回,这允许我们
  // 配置我们的扩展程序以便
  // 在ActorSystem启动时加载
  override def lookup = CountExtension

  //这个方法将被Akka调用
  // 来实例化我们的扩展
  override def createExtension(system: ExtendedActorSystem) = new CountExtensionImpl

  /**
   * Java API:检索给定系统的计数扩展。
   */
  override def get(system: ActorSystem): CountExtensionImpl = super.get(system)
}

真顽皮!现在我们需要做的就是实际使用它:

CountExtension(system).increment

或者从 Akka Actor 内部使用:

class MyActor extends Actor {
  def receive = {
    case someMessage ⇒
      CountExtension(context.system).increment()
  }
}

你还可以隐藏 traits 后面的 extension:

trait Counting { self: Actor ⇒
  def increment() = CountExtension(context.system).increment()
}
class MyCounterActor extends Actor with Counting {
  def receive = {
    case someMessage ⇒ increment()
  }
}

这一切就是这么简单!

从配置加载

为了能够从 Akka 配置中加载扩展,您必须在您提供给您的 ActorSystem 的配置的 akka.extensions 部分中添加 ExtensionIdExtensionIdProvider 实现的FQCN。

akka {
  extensions = ["docs.extension.CountExtension"]
}

适应性

没有限制,一切都是可能的(The sky is the limit)!顺便说一句,你知道 Akka 的 Typed ActorsSerialization 和其他功能是作为 Akka 扩展实现吗?

应用程序特定配置

该配置可用于应用程序特定的设置。 一个好的做法是将这些设置放 Extention 中。

示例配置:

myapp {
  db {
    uri = "mongodb://example1.com:27017,example2.com:27017"
  }
  circuit-breaker {
    timeout = 30 seconds
  }
}

然后 Extension

import akka.actor.ActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.ExtendedActorSystem
import scala.concurrent.duration.Duration
import com.typesafe.config.Config
import java.util.concurrent.TimeUnit

class SettingsImpl(config: Config) extends Extension {
  val DbUri: String = config.getString("myapp.db.uri")
  val CircuitBreakerTimeout: Duration =
    Duration(
      config.getMilliseconds("myapp.circuit-breaker.timeout"),
      TimeUnit.MILLISECONDS)
}
object Settings extends ExtensionId[SettingsImpl] with ExtensionIdProvider {

  override def lookup = Settings

  override def createExtension(system: ExtendedActorSystem) =
    new SettingsImpl(system.settings.config)

  /**
   * Java API: retrieve the Settings extension for the given system.
   */
  override def get(system: ActorSystem): SettingsImpl = super.get(system)
}

使用它:

class MyActor extends Actor {
  val settings = Settings(context.system)
  val connection = connect(settings.DbUri, settings.CircuitBreakerTimeout)
  // ......
}

库扩展

第三部分库可以在 actor 系统启动时注册它的自动加载扩展,方法是将其附加到 reference.conf 中的 akka.library-extensions

akka.library-extensions += "docs.extension.ExampleExtension"

由于没有办法有选择地删除这样的扩展名,所以应该小心使用它,并且只有在用户不希望禁用该扩展名的情况下或者对禁用这些子特性具有特定支持时才应使用它。有一个重要的例子:比如在测试中。

akka.library-extensions 绝不能分配(= [“Extension”])而不是附加,因为这会破坏库扩展机制并使行为依赖于类路径排序。

相关推荐