Improve the Throttle for Akka(1)Local Cache
ImprovetheThrottleforAkka(1)LocalCache
1BasicIdeaofAkka
StarttheAkkaSystem,sendmessages.
packagecom.sillycat.throttle.demo
importakka.actor.{Actor,ActorSystem,Props}
classHelloActorextendsActor{
defreceive={
case"hello"=>{
println("hello!")
}
case_=>{
println("huh?")
}
}
}
objectHelloAppextendsApp{
valsystem=ActorSystem("HelloSystem")
system.actorOf(Props[HelloActor],name="test1")
system.actorOf(Props[HelloActor],name="test2")
system.actorOf(Props[HelloActor],name="test3")
//implicitvalresolveTimeout=Timeout(5seconds)
//system.actorSelection("/user/test*").resolveOne().map{helloActor=>
//println(helloActor)
//helloActor!"hello"
//helloActor!"bye"
//}
valhelloActor=system.actorSelection("/user/test1")
helloActor!"hello"
helloActor!"bye"
system.awaitTermination()
}
SimpleExamplewithRouter
packagecom.sillycat.throttle.demo
importakka.actor._
importakka.routing.FromConfig
importcom.typesafe.config.ConfigFactory
objectRoutingAppextendsApp{
//Asimpleactorthatprintswhateveritreceives
classPrinterextendsActor{
defreceive={
casex=>println(self.path+"saying"+x)
}
}
classShooterextendsActor{
defreceive={
casex=>println(self.path+"shouting"+x)
}
}
valsystem=ActorSystem("RoutingSystem",ConfigFactory.load())
valrouter1:ActorRef=system.actorOf(Props[Printer].withRouter(FromConfig()),name="Router1")
//Thesethreemessageswillbesenttotheprinterimmediately
router1!"11"
router1!"12"
router1!"13"
//Thesetwowillwaitatleastuntil1secondhaspassed
router1!"14"
router1!"15"
println("Router1"+router1.path)
valrouter2:ActorRef=system.actorOf(Props[Shooter].withRouter(FromConfig()),name="Router2")
router2!"21"
router2!"22"
valrouter3:ActorSelection=system.actorSelection("/user/Router2")
router3!"23"
println("Router2"+router2.path)
system.shutdown()
}
2ThrottlerBasedonLocalCache
Trytoimplementthissolution.
//FUNCTIONLIMIT_API_CALL(ip)
//ts=CURRENT_UNIX_TIME()
//keyname=ip+":"+ts
//current=GET(keyname)
//IFcurrent!=NULLANDcurrent>10THEN
//ERROR"toomanyrequestspersecond"
//ELSE
//MULTI
//INCR(keyname,1)
//EXPIRE(keyname,10)
//EXEC
//PERFORM_API_CALL()
//END
Ifirstbuildoneimplementationontopofguavalocalcache.
packageactors.throttle
importakka.actor.{Actor,ActorRef}
importcom.sillycat.util.IncludeLogger
importservices.LocalCache
importutils.IncludeDateTimeUtil
importscala.concurrent.ExecutionContext.Implicits.global
importscala.concurrent.duration._
classLocalCacheBasedThrottler(varrate:Rate,vartarget:ActorRef)extendsActorwithIncludeLoggerwithIncludeDateTimeUtil{
valthrottleKey="THROTTLE_"
defreceive={
casemsg:MessageTick=>{
valmsgKey=msg.key
valrealMsg=msg.msg
valcounter=msg.count
vallimitCalls=rate.numberOfCalls
valtimeWindows=rate.duration
valtimeKey=convertCurrentTime2Key(timeWindows)
valkey=throttleKey+timeKey+msgKey
LocalCache.throttleBucket.getIfPresent(key)match{
casecount:java.lang.Integerifcount>=limitCalls=>{
//delayrandomandtickself
LocalCache.throttleBucket.put(key,count+counter)
valdelay=calculateDelay(count+counter,limitCalls,timeWindows)
//ticktoselfwithinthedelay
context.system.scheduler.scheduleOnce(delaysecond,self,msg)
}
casecount:java.lang.Integer=>{
//count+1
LocalCache.throttleBucket.put(key,count+counter)
//passtheticket
target!realMsg
}
case_=>{
//initthecount
LocalCache.throttleBucket.put(key,newInteger(counter))
//passtheticket
target!realMsg
}
}
}
case_=>{
logger.error("ReceivedamessageIdon'tunderstand.")
}
}
}
caseclassRate(valnumberOfCalls:Int,valduration:Int)
caseclassMessageTick(key:String,msg:Any,count:Int=1)
TheLocalCacheClasses
packageservices
importjava.util.concurrent.TimeUnit
importcom.google.common.cache.CacheBuilder
importmodels.CountStep
objectLocalCache{
valbuilderThrottle=CacheBuilder.newBuilder().expireAfterWrite(60,TimeUnit.SECONDS)
valthrottleBucket=builderThrottle.build[java.lang.String,java.lang.Integer]()
valbuilderMsg=CacheBuilder.newBuilder().expireAfterWrite(30,TimeUnit.MINUTES)
valmsgBucket=builderMsg.build[java.lang.String,CountStep]()
}
Hereishowtouseit.
Akka.system.actorOf(Props(classOf[LocalCacheBasedThrottler],
Rate(4,15),
contextIOActor),
name="context-io-throttler")
References:
http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2
https://github.com/hbf/akka-throttler
http://doc.akka.io/api/akka/2.3.4/index.html#akka.contrib.throttle.TimerBasedThrottler
http://redis.io/commands/incr
http://doc.akka.io/docs/akka/snapshot/scala/routing.html