Improve the Throttle for Akka(1)Local Cache

ImprovetheThrottleforAkka(1)LocalCache

1BasicIdeaofAkka

StarttheAkkaSystem,sendmessages.

packagecom.sillycat.throttle.demo

importakka.actor.{Actor,ActorSystem,Props}

classHelloActorextendsActor{

defreceive={

case"hello"=>{

println("hello!")

}

case_=>{

println("huh?")

}

}

}

objectHelloAppextendsApp{

valsystem=ActorSystem("HelloSystem")

system.actorOf(Props[HelloActor],name="test1")

system.actorOf(Props[HelloActor],name="test2")

system.actorOf(Props[HelloActor],name="test3")

//implicitvalresolveTimeout=Timeout(5seconds)

//system.actorSelection("/user/test*").resolveOne().map{helloActor=>

//println(helloActor)

//helloActor!"hello"

//helloActor!"bye"

//}

valhelloActor=system.actorSelection("/user/test1")

helloActor!"hello"

helloActor!"bye"

system.awaitTermination()

}

SimpleExamplewithRouter

packagecom.sillycat.throttle.demo

importakka.actor._

importakka.routing.FromConfig

importcom.typesafe.config.ConfigFactory

objectRoutingAppextendsApp{

//Asimpleactorthatprintswhateveritreceives

classPrinterextendsActor{

defreceive={

casex=>println(self.path+"saying"+x)

}

}

classShooterextendsActor{

defreceive={

casex=>println(self.path+"shouting"+x)

}

}

valsystem=ActorSystem("RoutingSystem",ConfigFactory.load())

valrouter1:ActorRef=system.actorOf(Props[Printer].withRouter(FromConfig()),name="Router1")

//Thesethreemessageswillbesenttotheprinterimmediately

router1!"11"

router1!"12"

router1!"13"

//Thesetwowillwaitatleastuntil1secondhaspassed

router1!"14"

router1!"15"

println("Router1"+router1.path)

valrouter2:ActorRef=system.actorOf(Props[Shooter].withRouter(FromConfig()),name="Router2")

router2!"21"

router2!"22"

valrouter3:ActorSelection=system.actorSelection("/user/Router2")

router3!"23"

println("Router2"+router2.path)

system.shutdown()

}

2ThrottlerBasedonLocalCache

Trytoimplementthissolution.

//FUNCTIONLIMIT_API_CALL(ip)

//ts=CURRENT_UNIX_TIME()

//keyname=ip+":"+ts

//current=GET(keyname)

//IFcurrent!=NULLANDcurrent>10THEN

//ERROR"toomanyrequestspersecond"

//ELSE

//MULTI

//INCR(keyname,1)

//EXPIRE(keyname,10)

//EXEC

//PERFORM_API_CALL()

//END

Ifirstbuildoneimplementationontopofguavalocalcache.

packageactors.throttle

importakka.actor.{Actor,ActorRef}

importcom.sillycat.util.IncludeLogger

importservices.LocalCache

importutils.IncludeDateTimeUtil

importscala.concurrent.ExecutionContext.Implicits.global

importscala.concurrent.duration._

classLocalCacheBasedThrottler(varrate:Rate,vartarget:ActorRef)extendsActorwithIncludeLoggerwithIncludeDateTimeUtil{

valthrottleKey="THROTTLE_"

defreceive={

casemsg:MessageTick=>{

valmsgKey=msg.key

valrealMsg=msg.msg

valcounter=msg.count

vallimitCalls=rate.numberOfCalls

valtimeWindows=rate.duration

valtimeKey=convertCurrentTime2Key(timeWindows)

valkey=throttleKey+timeKey+msgKey

LocalCache.throttleBucket.getIfPresent(key)match{

casecount:java.lang.Integerifcount>=limitCalls=>{

//delayrandomandtickself

LocalCache.throttleBucket.put(key,count+counter)

valdelay=calculateDelay(count+counter,limitCalls,timeWindows)

//ticktoselfwithinthedelay

context.system.scheduler.scheduleOnce(delaysecond,self,msg)

}

casecount:java.lang.Integer=>{

//count+1

LocalCache.throttleBucket.put(key,count+counter)

//passtheticket

target!realMsg

}

case_=>{

//initthecount

LocalCache.throttleBucket.put(key,newInteger(counter))

//passtheticket

target!realMsg

}

}

}

case_=>{

logger.error("ReceivedamessageIdon'tunderstand.")

}

}

}

caseclassRate(valnumberOfCalls:Int,valduration:Int)

caseclassMessageTick(key:String,msg:Any,count:Int=1)

TheLocalCacheClasses

packageservices

importjava.util.concurrent.TimeUnit

importcom.google.common.cache.CacheBuilder

importmodels.CountStep

objectLocalCache{

valbuilderThrottle=CacheBuilder.newBuilder().expireAfterWrite(60,TimeUnit.SECONDS)

valthrottleBucket=builderThrottle.build[java.lang.String,java.lang.Integer]()

valbuilderMsg=CacheBuilder.newBuilder().expireAfterWrite(30,TimeUnit.MINUTES)

valmsgBucket=builderMsg.build[java.lang.String,CountStep]()

}

Hereishowtouseit.

Akka.system.actorOf(Props(classOf[LocalCacheBasedThrottler],

Rate(4,15),

contextIOActor),

name="context-io-throttler")

References:

http://letitcrash.com/post/28901663062/throttling-messages-in-akka-2

https://github.com/hbf/akka-throttler

http://doc.akka.io/api/akka/2.3.4/index.html#akka.contrib.throttle.TimerBasedThrottler

http://redis.io/commands/incr

http://doc.akka.io/docs/akka/snapshot/scala/routing.html

相关推荐