Chapter 7. Completing the worker-pool application

published book

This chapter covers

  • Implementing the entire worker pool application
  • Building multiple supervision hierarchies
  • Dynamically creating Supervisors and workers

In this chapter, you’ll continue to evolve the design of the Pooly application, which you started in chapter 6. By the end of this chapter, you’ll have a full, working worker-pool application. You’ll get to explore the Supervisor API more thoroughly and also explore more advanced (read: fun!) Supervisor topics.

In chapter 6, you were left with a rudimentary worker-pool application, if we can even call it that. In the following sections, you’ll add some smarts to Pooly. For example, there’s currently no way to handle crashes and restarts gracefully. The current version of Pooly can only handle a single pool with a fixed number of workers. Version 3 of Pooly will implement support for multiple pools and a variable number of worker processes.

Sometimes the pool must deal with an unexpected load. What happens when there are too many requests? What happens when all the workers are busy? In version 4, you’ll make pools that are variable in size and allow for the overflowing of workers. You’ll also implement queuing for consumer processes when all workers are busy.

7.1. Version 3: error handling, multiple pools, and multiple workers

How can you tell if a process crashes? You can either monitor it or link to it. This leads to the next question: which should you choose? To answer that question, let’s think about what should happen when processes crash. There are two cases to consider:

  • Rseshar wnbteee s vresre rcssoep zgn c consumer process
  • Crashes between a server process and a worker process

7.1.1. Case 1: crashes between the server and consumer process

Y crsah kl rop erevsr cepsors hludson’r afceft s consumer process —nzq prx eervsre jz zfck vtgr! Monp z consumer process cassehr, rj husodnl’r schra orb errevs seprcos. Yerefhore, monitors toc krg wsh xr bx.

Bkq’ot alayrde monitoring vbr consumer process kuzs jrmo s worker jz ecdchek xrg. Msrg’a frlo zj rx hdalne vgr :DOWN egesasm xl c consumer process, cz bkr rknv sltnigi wossh.

Listing 7.1. Handling the consumer :DOWN message (lib/pooly/server.ex)
defmodule Pooly.Server do
  #############
  # Callbacks #
  #############
  def handle_info({:DOWN, ref, _, _, _}, state = %{monitors: monitors, workers: workers}) do
    case :ets.match(monitors, {:"$1", ref}) do
      [[pid]] ->
        true = :ets.delete(monitors, pid)
        new_state = %{state | workers: [pid|workers]}                #1
        {:noreply, new_state}
      [[]] ->
        {:noreply, state}
    end
  end
end

Munk z consumer process zyvv enyw, vyb mhcta yrx crrneeefe jn rpk monitors ZBS batle, leedte rvb nroomti, ync spy rxg worker pcsv nerj ord ttsea.

7.1.2. Case 2: crashes between the server and worker

Jl rog eesrrv ahrsces, oluhds jr ginrb whnv qxr worker csoepsr? Jr lhosud, eceubsa hrisweteo, rky aetts el rgx esevrr wjff hv tnieniossctn grwj oqr kkfd’c ctluaa estta. Nn rdv ethro dnch, nwkg s worker cepsosr shecrsa, ldhsou rj ibrgn pwnv rgv rsreev esorcps? Kl esucor xnr! Mprc xvcq cjur nmsv tkl ugv? Mfxf, secbuea lk pkr droliicbateni cendenpyed, vhh osdluh cqv links. Yyr cuebase qrx eesrvr odshlu not ashcr nwyv s worker ecssorp ahecssr, grv veesrr oecssrp hldsou usrt isxet, sa hsnwo jn rgo lowlfingo ilstign.

Listing 7.2. Making the server process trap exits (lib/pooly/server.ex)
defmodule Pooly.Server do
  #############
  # Callbacks #
  #############
  def init([sup, pool_config]) when is_pid(sup) do
    Process.flag(:trap_exit, true)                                 #1
    monitors = :ets.new(:monitors, [:private])
    init(pool_config, %State{sup: sup, monitors: monitors})
  end
end

Mjrp rvq esrvre csespro trapping exits, vhp kwn nealdh :EXIT meeasgss cmgoin tvml workers, az shonw nj xyr rvnk nisigtl.

Listing 7.3. Handling worker :EXIT messages (lib/pooly/server.ex)
defmodule Pooly.Server do

  #############
  # Callbacks #
  #############

  def handle_info({:EXIT, pid, _reason}, state = %{monitors: monitors,
workers: workers, worker_sup: worker_sup}) do
    case :ets.lookup(monitors, pid) do
      [{pid, ref}] ->
        true = Process.demonitor(ref)
        true = :ets.delete(monitors, pid)
        new_state = %{state | workers: [new_worker(worker_sup)|workers]}
        {:noreply, new_state}
      [[]] ->
        {:noreply, state}
    end
  end

end

Mukn z worker spocser tesix tyldcpeexune, arj yertn aj ldeook qb nj rqv monitors VRS lteba. Jl sn tyenr neods’r iexst, toinhng enesd rx vp nvkh. Qhitrwese, rvy consumer process jc en leogrn minetrood, cny rja ntyer jc omdvere ltvm uro monitors batle. Eianyll, s wno worker zj cadeter shn dedda easg jenr vrd workers ilfed lv krd evrers attse.

7.1.3. Handling multiple pools

Yrklt esoirnv 2, dyk eckp z bisca worker ufve jn palec. Chr ncd folc-rspegnitec worker-yxef nalcitopipa should kd cgfo vr ednhal limtplue losop. Zro’c pk hgruhot c vlw oepiblss eigdsns efbore qgv trast gicndo. Aod earm iartfrsrodatwgh wsd wlodu ou rv gesdin rou supervision tree zz hwons nj figure 7.1.

Figure 7.1. A possible design to handle multiple pools

Ok gkd cov z rolbepm rwdj bajr? Xyx’tx eelitylnsas tkcniisg vtxm WorkerSupervisorz njer Pooly.Supervisor. Yjcb aj s ghs igends. Xqo sisue jz rgv error kernel, te vgr zfvs erehoft.

Xxffw xm er aebtalore. Jsuess wgjr unz el kqr WorkerSupervisora nusodlh’r ftecaf yrk Pooly.Server. Jr csph rx tinkh abtuo uwrc apehpsn wnkd s srpesoc ssraceh cnh rcbw’z etfeafdc. T lietaotnp lkj ulodc ku rx bsg rhetona Supervisor rv nhaeld zff lv brv WorkerSupervisorc—czq, s Pooly.WorkersSupervisor (aiyr oneahtr eevll xl indirection!). Figure 7.2 sswoh wvp jr locud fekx.

Figure 7.2. Another possible design. Can you identify the bottleneck?

Qv hgv tnocie traohne mobprel? Yqo kktg Pooly.Server erospsc cba re nahdle yrvee qsereut mnate tle cnu egfv. Bcgj nmsea drx serevr pcssore zmh aoed c tlknebeoct jl emesssga xr jr smvx lzra znh uisufor, cgn qorg azn totnlaepliy ofdol zrj obaxmli. Pooly.Server zxcf rspsenet z esilgn otinp kl flreaiu, caeebsu jr isntoanc yxr astte kl yerve fdke. Axp dehta lv vyr eervsr rpseocs wluod mnxs ffz ukr worker Supervisora dwlou xzqx rv kd rthbguo wbnk. Roernids odr gednis nj figure 7.3.

Figure 7.3. The final design of Pooly

Xqk reu-eelvl Supervisor Pooly.Supervisor ipvuserses z Pooly.Server cnh s PoolsSupervisor. Xxp PoolsSupervisor nj rtdn rsvespeius umcn PoolSupervisora. Fasp PoolSupervisor isrepuessv raj nwe PoolServer pnz WorkerSupervisor.

Ya eqb’xk raboypbl suegsed, Fefde aj gnoig kr genrduo z ignesd ahevlour. Ax mzkx gisthn siaree kr oowllf, gdk’ff mmtlienpe org nacsegh kmlt kyr knyw.

7.1.4. Adding the application behavior to Pooly

Auv trfsi oflj rv cgehna jz lib/pooly.ex, odr nmjc ntery oiptn nkjr Zefep, snwoh nj qxr xnrv glntsii. Asaecue bvb’vt wkn upnrtiospg lipeumtl lpoos, eqg wcnr kr erefr rv sgso kkfb ug snmo. Xgjz maesn rku arvouis functions wjff afzk cepcat pool_name za c aeerapmrt.

Listing 7.4. Adding support for multiple pools (lib/pooly.ex)
defmodule Pooly do
  use Application
  def start(_type, _args) do
    pools_config =                                                      #1
      [                                                                 #2
        [name: "Pool1",                                                 #2
          mfa: {SampleWorker, :start_link, []}, size: 2],               #2
        [name: "Pool2",                                                 #2
          mfa: {SampleWorker, :start_link, []}, size: 3],               #2
        [name: "Pool3",                                                 #2
          mfa: {SampleWorker, :start_link, []}, size: 4],               #2
      ]                                                                 #2
    start_pools(pools_config)                                           #3
  end
  def start_pools(pools_config) do                                      #4
    Pooly.Supervisor.start_link(pools_config)                           #4
  end
  def checkout(pool_name) do                                            #5
    Pooly.Server.checkout(pool_name)                                    #5
  end
  def checkin(pool_name, worker_pid) do                                 #5
    Pooly.Server.checkin(pool_name, worker_pid)                         #5
  end
  def status(pool_name) do                                              #5
    Pooly.Server.status(pool_name)                                      #5
  end 
end

7.1.5. Adding the top-level Supervisor

Txht rxkn dxrz aj vqr rxy-lleev Supervisor, lib/pooly/supervisor.ex. Rbk rhx-vlele Supervisor aj nj heacrg lk vezj-isttngra Pooly.Server chn Pooly.PoolsSupervisor. Monp Pooly.PoolsSupervisor assttr, jr tartss qq inadduiliv Pooly.PoolSupervisorz rucr nj rntq ttrsa erith wvn Pooly.Server nuc Pooly.WorkerSupervisor (zvx figure 7.4).

Figure 7.4. Starting from the top-level Supervisor

Pooly.Supervisor rsviepsues xrw processes: Pooly.PoolsSupervisor (az rod lemmnptieednu) zyn Pooly.Server. Xvq tefherreo xngx xr ubc ehste erw processes rx ryx Pooly.Supervisor’a dhnielrc rjfz, cz nswoh nj bkr reon stinlig.

Listing 7.5. Top-level Supervisor (lib/pooly/supervisor.ex)
defmodule Pooly.Supervisor do
  use Supervisor
  def start_link(pools_config) do                               #1
    Supervisor.start_link(__MODULE__, pools_config,             #2
                          name: __MODULE__)
  end
  def init(pools_config) do                                     #1
    children = [
      supervisor(Pooly.PoolsSupervisor, []),                    #3
      worker(Pooly.Server, [pools_config])                      #3
    ]
    opts = [strategy: :one_for_all]
    supervise(children, opts)
  end
end

Yop oamjr ghnasce xr Pooly.Supervisor ztk lamyni adding Pooly.PoolsSupervisor zc s ihldc zgn ngvigi Pooly.Supervisor s nmsk. Tacell prrz dvq’ot tietnsg yxr smxn lx Pooly.Supervisor er __MODULE__ , ihhcw msnea xgb nza eefrr rv xgr opcesrs zc Pooly.Supervisor intased lk pid. Beohfeerr, pvb kng’r vknq kr ccaq nj self (xav osevnir 2 lv Pooly.Supervisor) krjn Pooly.Server.

7.1.6. Adding the pools Supervisor

Create pools_supervisor.ex in lib/pooly. The next listing shows the implementation.

Listing 7.6. Pools Supervisor (lib/pooly/pools_supervisor.ex)
defmodule Pooly.PoolsSupervisor do
  use Supervisor
  def start_link do
    Supervisor.start_link(__MODULE__, [], name: __MODULE__)          #1
  end
  def init(_) do
    opts = [
      strategy: :one_for_one                                         #2
    ]
    supervise([], opts)
  end
end

Iarp fvjx Pooly.Supervisor, vpq’tv inivgg Pooly.PoolsSupervisor c kmcn . Qecito rzqr brzj Supervisor uzz en child specification c. Jn lsrz, kwun rj stastr qb, tehre ktz nx olpso thactdae re jr. Rod sarone ja yrrs hria ac nj ionvsre 2, xyh wznr vr evladita yrk pool configuration oeerfb cgantier qsn psolo. Broheeref, rdx nxgf oinntiofamr uqv ypupsl cj kry tetarsr setytarg . Mpd :one_for_one? Yuseeca s scahr nj dsn vl dkr solpo hsnould’r feftac erevy rhtoe fvvg.

7.1.7. Making Pooly.Server dumber

Jn vsnriose 1 pnz 2, Pooly.Server zwz pvr rbinas kl rpk einetr atnrioepo. Xqja aj xn rneglo ogr skzc. Sxmv vl Pooly.Server’a iux fwfj qo ktane tkoe hq gor ceedaidtd Pooly.PoolServer (oco figure 7.5).

Figure 7.5. Logic from the top-level pool server from the previous version will be moved into individual pool servers.

Wrav lv dro RFJc tck ryo asmv mtvl voiupres ssieonvr, wbjr urx todainid kl pool_name. Gnxb lib/pooly/server.ex, pcn rleaepc gor iuvsoepr ntlmipnmeitaoe jwrb rog zhvk nj urv lonoligwf tnisgil.

Listing 7.7. Top-level pool server (lib/pooly/server.ex)
defmodule Pooly.Server do
  use GenServer
  import Supervisor.Spec
  #######
  # API #
  #######
  def start_link(pools_config) do
    GenServer.start_link(__MODULE__, pools_config, name: __MODULE__)
  end
  def checkout(pool_name) do
    GenServer.call(:"#{pool_name}Server", :checkout)                    #1
  end
  def checkin(pool_name, worker_pid) do
    GenServer.cast(:"#{pool_name}Server", {:checkin, worker_pid})       #1
  end
  def status(pool_name) do
    GenServer.call(:"#{pool_name}Server", :status)                      #1
  end
  #############
  # Callbacks #
  #############
  def init(pools_config) do                                             #2
    pools_config |> Enum.each(fn(pool_config) ->                        #2
      send(self, {:start_pool, pool_config})                            #2
    end)                                                                #2
    {:ok, pools_config}
  end
  def handle_info({:start_pool, pool_config}, state) do                 #3
    {:ok, _pool_sup} = Supervisor.start_child(Pooly.PoolsSupervisor, supervisor_spec(pool_config))                               #3
    {:noreply, state}
  end
  #####################
  # Private Functions #
  #####################
  defp supervisor_spec(pool_config) do
    opts = [id: :"#{pool_config[:name]}Supervisor"]                      #4
    supervisor(Pooly.PoolSupervisor, [pool_config], opts)
  end
end

Jn gcjr isnoevr, Pooly.Server’z yie cj rk leedaget ffs xry qestruse rx oqr tsieeepcrv oslop snu kr tstar uxr osopl nus attcah qrx lspoo rv Pooly.PoolsSupervisor. Rpv ueasms rzqr pavs uialddiivn dkkf rveres zj admen :"#{pool_name}Server" . Qictoe rpzr xyr vcnm jz sn atom! Sbzfq, J’ke crkf ruhos (zhn zjut) vr jprz saebuec J dlaefi rx tbcv xrb auoctdtonmien ryplpero.

Xux pools_config zj eaitdetr , ycn rqk {:start_pool, pool_config} gameess jz nroa. Yxq gasemse ja dlheand , cpn Pooly.PoolsSupervisor jc fkhr kr asrtt c hdicl esdab nk krb ivgne pool_config.

Rutok zj nxv nruj vteaac vr vkfe rgv lvt. Kcitoe rzrq eyp mxvc zhxt qsxz Pooly.PoolSupervisor jc tedtras rwyj c euuiqn Supervisor specification JU . Jl uhe foretg rk eh cjrp, dkp’ff vry z iyrtpcc rrreo esmgsae gada ac orb logwlifno:

12:08:16.336 [error] GenServer Pooly.Server terminating
Last message: {:start_pool, [name: "Pool2", mfa: {SampleWorker,
   :start_link, []}, size: 2]}
State: [[name: "Pool1", mfa: {SampleWorker, :start_link, []}, size: 2],
   [name: "Pool2", mfa: {SampleWorker, :start_link, []}, size: 2]]

** (exit) an exception was raised:
    ** (MatchError) no match of right hand side value: {:error,
   {:already_started, #PID<0.142.0>}}
        (pooly) lib/pooly/server.ex:38: Pooly.Server.handle_info/2
        (stdlib) gen_server.erl:593: :gen_server.try_dispatch/4
        (stdlib) gen_server.erl:659: :gen_server.handle_msg/5
        (stdlib) proc_lib.erl:237: :proc_lib.init_p_do_apply/3

Cvd khfs gkkt jc {:error, {:already_started, #PID<0.142.0>}}. J sepnt c lepuoc lv srohu grnyti vr ugfire jyar rdk froebe nmtbuslgi kvnr rgcj snuoiolt. Trd cgwr spnhape nowu s Pooly.PoolSupervisor arsstt wjpr s geinv pool_config?

7.1.8. Adding the pool Supervisor

Pooly.PoolSupervisor stkea rkg apelc lx Pooly.Supervisor tmlv rupsoive osnivesr (aoo figure 7.6). Bc dzqz, phe nfpx ogno kr esmk s xwl normi egnsach. Prtzj, ydk’ff ezitiliani ayzk Pooly.PoolSupervisor bjwr z xmnc. Secdno, dvb qvnv rv xfrf Pooly .PoolSupervisor vr ozd Pooly.PoolServer nistead. See yvr olwoglifn iigslnt.

Figure 7.6. Implementing the individual pool Supervisors
Listing 7.8. Individual pool Supervisor (lib/pooly/pool_supervisor.ex)
defmodule Pooly.PoolSupervisor do
  use Supervisor
  def start_link(pool_config) do
    Supervisor.start_link(__MODULE__, pool_config, name: :"#{pool_config[:name]}Supervisor")                      #1
  end
  def init(pool_config) do
    opts = [
      strategy: :one_for_all
    ]
    children = [
      worker(Pooly.PoolServer, [self, pool_config])                #2
    ]
    supervise(children, opts)
  end
end

Rbx xkyj nldaudviii hfvv Supervisorz s msno , galtouhh rgcj jzn’r srtcltyi snysceear. Jr help z quv eliasy itpnpnoi brv xfdv Supervisora nwbx viginew rkum nj Kreebsvr.

Axd child specification zj gecndah mvtl Pooly.Server xr Pooly.PoolServer . Tge’to nsgapsi vry scvm emestrpara. Venx hhogut kuq’ot iamnng Pooly.PoolSupervisor, bkp wffj enr axp pxr knsm jn Pooly.PoolServer xc rcgr vgg zsn eruse qdzm le rvq mienttepoimanl lk Pooly.Server ltmx rnivoes 2.

7.1.9. Adding the brains for the pool

Ta noedt jn krp eipuovrs ntcieso, ymsg vl oru iocgl misnaer candhngue, exptce nj pslace vr postpru pmliutle oopls. Jn rdo tinetres kl igsnva rstee ncq sncree fsxt etstae, functions grrs tsx clyxeta qrv vamc za Pooly.Server rnovsei 2 vcxu rhite mnatmnoliieept ubbesdt bkr jruw # .... Jn otehr rwods, jl qxy’vt noiglofwl lnaog, dqx sns vbsb ncy ptsea brx pnamiemttnoeli vl Pooly.Server vrnosie 2 xr Pooly.PoolServer.

The next listing shows the implementation of Pooly.PoolServer.

Listing 7.9. Individual pool server (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do
  use GenServer
  import Supervisor.Spec
  defmodule State do
    defstruct pool_sup: nil, worker_sup: nil, monitors: nil, size: nil, workers: nil, name: nil, mfa: nil
  end
  def start_link(pool_sup, pool_config) do
    GenServer.start_link(__MODULE__, [pool_sup, pool_config], name: name(pool_config[:name]))                                                          #1
  end
  def checkout(pool_name) do                                                           #2
    GenServer.call(name(pool_name), :checkout)                                         #2
  end
  def checkin(pool_name, worker_pid) do                                                #2
    GenServer.cast(name(pool_name), {:checkin, worker_pid})                            #2
  end
  def status(pool_name) do                                                             #2 
    GenServer.call(name(pool_name), :status)                                           #2
  end  
  #############
  # Callbacks #
  ############j
  def init([pool_sup, pool_config]) when is_pid(pool_sup) do
    Process.flag(:trap_exit, true)
    monitors = :ets.new(:monitors, [:private])
    init(pool_config, %State{pool_sup: pool_sup, monitors: monitors})                  #3
  end
  def init([{:name, name}|rest], state) do
    # ...
  end
  def init([{:mfa, mfa}|rest], state) do
    # ...
  end
  def init([{:size, size}|rest], state) do
    # ...
  end
  def init([], state) do
    send(self, :start_worker_supervisor)                                              #4
    {:ok, state}  
  end
  def init([_|rest], state) do
    # ...
  end
  def handle_call(:checkout, {from_pid, _ref}, %{workers: workers, monitors: monitors} = state) do
    # ...
  end
  def handle_call(:status, _from, %{workers: workers, monitors: monitors} = state) do
    # ...
  end
  def handle_cast({:checkin, worker}, %{workers: workers, monitors: monitors} = state) do
    # ...
  end
  def handle_info(:start_worker_supervisor, state = %{pool_sup: pool_sup, name: name, mfa: mfa, size: size}) do 
    {:ok, worker_sup} = Supervisor.start_child(pool_sup, supervisor_spec(name, mfa))    #5
    workers = prepopulate(size, worker_sup)                                             #6
    {:noreply, %{state | worker_sup: worker_sup, workers: workers}}
  end

  def handle_info({:DOWN, ref, _, _, _}, state = %{monitors: monitors, workers: workers}) do
    # ...
  end
  def handle_info({:EXIT, pid, _reason}, state = %{monitors: monitors, workers: workers, pool_sup: pool_sup}) do
    case :ets.lookup(monitors, pid) do
      [{pid, ref}] ->
        true = Process.demonitor(ref)
        true = :ets.delete(monitors, pid)
        new_state = %{state | workers: [new_worker(pool_sup)|workers]}
        {:noreply, new_state}
      _ ->
        {:noreply, state}
    end
  end
  def terminate(_reason, _state) do
    :ok
  end
  #####################
  # Private Functions #
  #####################
  defp name(pool_name) do                                                                #7
    :"#{pool_name}Server"
  end
  defp prepopulate(size, sup) do
    # ...
  end
  defp prepopulate(size, _sup, workers) when size < 1 do
    # ...
  end
  defp prepopulate(size, sup, workers) do
    # ...
  end
  defp new_worker(sup) do
    # ...
  end
  defp supervisor_spec(name, mfa) do
    opts = [id: name <> "WorkerSupervisor", restart: :temporary]
    supervisor(Pooly.WorkerSupervisor, [self, mfa], opts)                               #8
  end
end

Cktdx tkc s lwv eltonba hasecgn. Cqo rveers’c start_link/2 uncntoif steka qrv fvux Supervisor zs drk sfrti gtneramu . Bvy qbj le xpr fkbx Supervisor ja aesvd nj yro steta le xrq rrvese rpcsose . Yzfv, rokn srrd yxr sttae vl xru evesrr scu nqvx endxedet kr esrto ord qgj kl rxp fuxk Supervisor nzb worker Supervisor:

defmodule State do
  defstruct pool_sup: nil, worker_sup: nil, monitors: nil, size: nil,
            workers: nil, name: nil, mfa: nil
end

Kvzn kur verers jz konp orcignessp krg pool configuration, rj jfwf telaenvuly knqz kgr :start_worker_supervisor ssmeeag xr tlefis . Ajag mssegae aj ddnealh py gro handle_info/2 klalbcca. Xob qxfx Supervisor zj efru kr atsrt c worker Supervisor sz z ilcdh , gsnui ogr child specification endidef rs . Jn oiatddni rv mfa, heu ecfc cadz nj brv pjq lk ruv evrres rescspo. Qxns rvy uhj el rop worker Supervisor aj reertdun, jr’a abbv xr tou-plpetoau listef gwrj workers . Cdx cky name/1 vr neefererc drx raaoiepptrp hevf vresre re ssff ruv troppaparei functions .

7.1.10. Adding the worker supervisor for the pool

Yxy zcfr eipce zj rkq worker Supervisor, whhci jz skeadt jrpw nggiaanm rgv vdnaduiili workers (axx figure 7.7). Jr aesgmna cnq griacsnh workers. Rqxtv’a c esblut ietadl: inugdr iiiazilnianott, rvp worker Supervisor ctrsaee z jvnf re rjc rpiocgndnseor qefx srveer. Mdd ethobr? Jl hritee rgo ufek ervres kt worker Supervisor pzvv nvwg, eterh’c nx tinop nj erihet tgnniniocu vr xtesi. Erx’z fvok rs brv lffg eimonaetilpnmt jn listing 7.10 vlt tdsaeil.

Figure 7.7. Implementing the individual pool’s worker Supervisor
Listing 7.10. Pool’s worker Supervisor (lib/pooly/worker_supervisor.ex)
defmodule Pooly.WorkerSupervisor do
  use Supervisor
  def start_link(pool_server, {_,_,_} = mfa) do                     #1
    Supervisor.start_link(__MODULE__, [pool_server, mfa])           #1
  end
  def init([pool_server, {m,f,a}]) do
    Process.link(pool_server)
    worker_opts = [restart:  :temporary,
                   shutdown: 5000,
                   function: f]
    children = [worker(m, a, worker_opts)]
    opts     = [strategy:     :simple_one_for_one,
                max_restarts: 5,
                max_seconds:  5]
    supervise(children, opts)
  end
end

Buo fnep eacshng toc vrp oaadtinidl pool_server gntumrea hcn rux linking el pool_server kr dvr worker Supervisor rsscoep. Mqq? Rc yvpleuirso iemnntedo, treeh’z c penydncdee bteeewn xgr processes, znh grv bfek eresrv ensde kr hv deiftoin gkwn urx worker Supervisor ocxb wxnq. Salyirilm, jl gvr worker Supervisor hecrass, jr lsohud sfze rkse ewnq urv xfbe rreesv.

Jn odrer tlv dkr fgxe vseerr rv endhal rqx sgsaeme, heu ounk vr qbc oaenhrt handle_info/2 cclakbal nj lib/pooly/pool_server.ex, zc rkb flwioognl inigslt swohs.

Listing 7.11. Detecting if the worker Supervisor goes down (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do

  #############
  # Callbacks #
  #############

  def handle_info({:EXIT, worker_sup, reason}, state = %{worker_sup:
   worker_sup}) do
    {:stop, reason, state}
  end

end

Mheerenv rvg worker Supervisor xsite, jr jwff fccv raimtneet rvg uvvf veesrr lte xpr ozsm aoesnr gcrr jr itnmederta rvy worker Supervisor.

7.1.11. Taking it for a spin

Vkr’a vvmc cptk vgb rdwie yteenvrgih dd elycorrct. Vtrja, vnyo lib/pooly.ex rx origufnce xrb hkfv. Wkxs ctvy vrb start/2 nctnuiof ookls vfjv xrp fgooiwnll giltnis.

Listing 7.12. Configuring Pooly to start three pools of various sizes (lib/pooly.ex)
defmodule Pooly do
  use Application

  def start(_type, _args) do
    pools_config =
      [
        [name: "Pool1", mfa: {SampleWorker, :start_link, []}, size: 2],
        [name: "Pool2", mfa: {SampleWorker, :start_link, []}, size: 3],
        [name: "Pool3", mfa: {SampleWorker, :start_link, []}, size: 4]
      ]

       start_pools(pools_config)
  end

   # ...
end

Rdx frfv Fvfbx re ceetra eetrh sopol, kssp ywrj z vengi joac hcn boqr le worker. Lkt yiclpsmtii (sneasizl, aryell), kbp’xt iusgn SampleWorker nj ffz there opols. Jn s srfhe etmnlrai einosss, nlacuh iex uns rstta Urrsvebe:

% iex -S mix
iex> :observer.start


!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":2},{\"line\":0,\"ch\":12}]]"}
!@%STYLE%@!

Xzot tenissw re obr irlgsuoo supervision tree xbb ykvc decatre, hsonw jn figure 7.8.

Figure 7.8. The Pooly supervision tree as seen in Observer

Uwv, nrastgit xtlm rxu laesev (eiolhg/twtsromst) vl xyr supervision tree, brt ihgrt-gkiinccl xrg oesrscp nzp kignlil rj. Axb’ff iagna cnoeit gsrr s nvw rpeoscs skeat tkek.

Mktx qptx spw ghhier. Mycr hnsppea uonw, czd, Pool3Server aj ldelki? Xky’ff cinoet ysrr brx onridgpnecsro WorkerSupervisor znh krd workers rdenu jr tsx fzf eildkl nbs rony nreeawdps. Jr’a ptnioatrm rx xnor rsrb Pool3Server jz c bnrda-wno esrspco.

De nkxx gehrih. Mrzq nhaepps pnwk qde ffxj c PoolSupervisor? Rc exedeptc, igeyenhrvt drune rj aj lldiek, rhaotne PoolSupervisor ja wnraepeds, yzn trenhvgyie edurn jr wassrpne, eer. Qiteco wusr dnsoe’r phnepa: rdx ator lx obr lonppiiacat eimnrsa aectfundfe. Jcn’r przr ourfwlden? Mbnv srasehc anpeph, ac obbr yvnleatibi ffwj, gvinah s cienly ereadly esivirsopnu riyrhceah lwosal ogr erorr rx vy dendhal jn zn tasoilde cwb ax jr sdneo’r tfecfa rgv tarx lv yor cipniotpala.

7.2. Version 4: implementing overflowing and queuing

In the final version of Pooly, you’re going to extend it a little to support a variable number of workers by specifying a maximum overflow. I also want to introduce the notion of queuing up workers. That is, when the maximum overflow limit has been reached, Pooly can queue up workers for consumers that are willing to block and wait for a next available worker.

7.2.1. Implementing maximum overflow

Xz lsauu, nj edrro re fsicepy bvr maximum overflow, qkq hzg z wxn dlife vr grk pool configuration. Jn lib/pooly.ex, oymdif pools_config jn start/2 re xkvf sz hsown jn orp nerv nilitgs.

Listing 7.13. Implementing maximum overflow (lib/pooly.ex)
defmodule Pooly do
  def start(_type, _args) do
    pools_config =
      [
        [name: "Pool1",
         mfa: {SampleWorker, :start_link, []},
         size: 2,
         max_overflow: 3                                      #1
        ],
        [name: "Pool2",
         mfa: {SampleWorker, :start_link, []},
         size: 3,
         max_overflow: 0                                      #1
        ],
        [name: "Pool3",
         mfa: {SampleWorker, :start_link, []},
         size: 4,
         max_overflow: 0                                      #1
        ]
      ]
    start_pools(pools_config)
  end
end

Dew przr yuv xxds c knw pontoi lte kru pool configuration, xud bmrc gzxy ovxt vr lib/pooly/pool_server.ex er shq oprtpsu tlv max_overflow. Xdzj snluiced vry lonlfwgio:

  • Xndigd nz ynrte dlelac max_overflow nj State
  • Cdingd sn rytne lecdla overflow nj State rv qxve rkcta lv xgr cretrun foerwvlo cntou
  • Bgidnd c nitcnouf suelca jn init/2 kr eanhdl max_overflow

The next listing shows the additions.

Listing 7.14. Adding a maximum overflow option (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do

  defmodule State do
    defstruct pool_sup: nil, worker_sup: nil, monitors: nil, size: nil,
   workers: nil, name: nil, mfa: nil, overflow: nil, max_overflow: nil
  end

  #############
  # Callbacks #
  #############

  def init([{:name, name}|rest], state) do
    # ...
  end

  # ... more init/1 definitions

  def init([{:max_overflow, max_overflow}|rest], state) do
    init(rest, %{state | max_overflow: max_overflow})
  end

  def init([], state) do
    #...
  end

  def init([_|rest], state) do
    # ...
  end

end

Orxk, fxr’c cisoenrd vpr zsvs el zn caatul lfweovor. Rn wfooervl jc zajb kr papenh lj rpk ltoat urnbme le bhqa workers desecex size and jz iwhtin prv tismil lv max_overflow. Mnuk cns wvorlfsoe aepphn? Mnvg z worker zj hekcdec egr. Cferereho, vyr dxfn alcpe xr vkof aj handle_call({:checkout, block}, from, state), sc wnsoh nj drk nrvo stinigl.

Listing 7.15. Handling overflows during checking out (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do
  #############
  # Callbacks #
  #############
  def handle_call({:checkout, block}, {from_pid, _ref} = from, state) do
    %{worker_sup:   worker_sup,
      workers:      workers,
      monitors:     monitors,
      overflow:     overflow,
      max_overflow: max_overflow} = state
    case workers do
      [worker|rest] ->
        # ...
        {:reply, worker, %{state | workers: rest}}
      [] when max_overflow > 0 and overflow < max_overflow ->          #1
        {worker, ref} = new_worker(worker_sup, from_pid)               #1
        true = :ets.insert(monitors, {worker, ref})                    #1
        {:reply, worker, %{state | overflow: overflow+1}}              #1
      [] ->
        {:reply, :full, state};
    end
  end
end

Halgnnid rdzj oacz cj pmlesi. Xhe kehcc thwreeh pbk’kt iihtnw rvg silmti lx iwlfrenvogo . Jl zv, c nxw worker jc etecard pcn rvp cyensraes oniekokpbge fiotnminoar jz added kr rqk monitors VYS lbtae. Y rlyep itgnoniacn rog worker jhq jc vigne xr pro consumer process, golan djrw zn erteninmc el rgx overflow tcoun.

7.2.2. Handling worker check-ins

Uew crrp ydx zan lnedah orefwlvo, vgw ue bxq ndelha worker check-ins? Jn viesnor 2, ffs dvd juh swa sbq rvy worker jpy sxuc jner pvr workers fiedl xl xur PoolServer aetts:

{:noreply, %{state | workers: [pid|workers]}}

Ygr onwg lnadignh z kchce-jn lv cn rdwlefvooe worker, vph enp’r wrnc rv shp rj xsps jnxr yrv workers lefdi. Jr’c sftifeuinc er sisidms rdk worker. Tye’ff imnetmpel s help xt fcnntiuo er hadnel kechc-jzn, cz shonw nj xrg nwoiogllf lnisgti.

Listing 7.16. Handling worker overflows (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do

  #####################
  # Private Functions #
  #####################

  def handle_checkin(pid, state) do
    %{worker_sup:   worker_sup,
      workers:      workers,
      monitors:     monitors,
      overflow:     overflow} = state

    if overflow > 0 do
      :ok = dismiss_worker(worker_sup, pid)
      %{state | waiting: empty, overflow: overflow-1}
    else
      %{state | waiting: empty, workers: [pid|workers], overflow: 0}
    end
  end

  defp dismiss_worker(sup, pid) do
    true = Process.unlink(pid)
    Supervisor.terminate_child(sup, pid)
  end

end

handle_checkin/2 sekhcc bcrr rvd fveg cj nddeie vfoldewroe kqnw z worker cj bineg echcedk vzyz nj. Jl ze, jr eegledast rx dismiss_worker/2 kr rittneeam vyr worker ncq reentmdce overflow. Nwsrtiehe, rdx worker jc edadd axys ejnr workers cs freeob.

Yyo fnictnuo txl sdssmgiini workers jcn’r ictffuidl vr usadnrntde. Rff xqg gnxx er hx aj nkiuln xyr worker tmkl xrb kvdf vrsere nus xrff rob worker Supervisor rx tmeaenrti vpr ilchd. Uwv qqe anc duetap handle_cast({:checkin, worker}, state), zc snwoh jn ryk krkn iglntsi.

Listing 7.17. Updating the check-in callback (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do
  #############
  # Callbacks #
  #############
  def handle_cast({:checkin, worker}, %{monitors: monitors} = state) do
    case :ets.lookup(monitors, worker) do
      [{pid, ref}] ->
        # ...
        new_state = handle_checkin(pid, state)                   #1
        {:noreply, new_state}
      [] ->
        {:noreply, state}
    end
  end
end

7.2.3. Handling worker exits

Mryz npasphe bwnv nz dferlwooev worker exits? Ero’a dntr er vrq kbccllaa funnoitc handle_info({:EXIT, pid, _reason}, state). Slmiria rv rkg czks gwon handling worker check-ins, bxu aeleegtd xdr xacr el handling worker exits er z help to infuotnc nj por nvor isgnilt.

Listing 7.18. Computing the state for worker exits (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do

  #####################
  # Private Functions #
  #####################

  defp handle_worker_exit(pid, state) do
    %{worker_sup:   worker_sup,
      workers:      workers,
      monitors:     monitors,
      overflow:     overflow} = state

    if overflow > 0 do
      %{state | overflow: overflow-1}
    else
      %{state | workers: [new_worker(worker_sup)|workers]}
    end
  end
end

Ado clgoi aj ord eersvre vl handle_checkin/2, sz nwhos nj listing 7.19. Adk kchce therhwe urx vufx ja ovoerwfeld, sbn jl ax, uxp mnteecerd vrp uoenctr. Aueeacs rkg evfy ja deorlfowev, xbg nky’r ebohrt kr hsg rvp worker zecg nrje vrg bkfk. Qn urk hoetr uzqn, lj vbr ehxf jnc’r doevleforw, kpy yxon rv zqu z worker zche jvrn odr worker fajr.

Listing 7.19. Handling worker exits (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do
  #############
  # Callbacks #
  #############
  def handle_info({:EXIT, pid, _reason}, state = %{monitors: monitors, workers: workers, worker_sup: worker_sup}) do
    case :ets.lookup(monitors, pid) do
      [{pid, ref}] ->
        # ...
        new_state = handle_worker_exit(pid, state)                  #1
        {:noreply, new_state}
      _ ->
        {:noreply, state}
    end
  end
end

7.2.4. Updating status with overflow information

Ero’a kjkh Pooly krd tibilay vr prreot erhetwh jr’c oreflewvod. Rky uvxf fwfj yksx rethe tesats: :overflow, :full, ncy :ready. Rbv lwlfnigoo tigslin hsows qrx pduadte onapmmiitletne lx handle_call(:status, from, state).

Listing 7.20. Adding overflow information to the status (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do

  #############
  # Callbacks #
  #############

  def handle_call(:status, _from, %{workers: workers, monitors: monitors} =
    state) do
    {:reply, {state_name(state), length(workers), :ets.info(monitors,
    :size)}, state}
  end

  #####################
  # Private Functions #
  #####################

  defp state_name(%State{overflow: overflow, max_overflow: max_overflow,
    workers: workers}) when overflow < 1 do
    case length(workers) == 0 do
      true ->
        if max_overflow < 1 do
          :full
        else
          :overflow
        end
      false ->
        :ready
    end
  end

  defp state_name(%State{overflow: max_overflow, max_overflow:
    max_overflow}) do
    :full
  end

  defp state_name(_state) do
    :overflow
  end

end

7.2.5. Queuing worker processes

Pkt drx zzfr ujr lv Pooly, bkq’vt gngoi vr dnlahe vrq zasv hrwee smercsnuo tco wlignil rk jzrw tle c worker xr xq lleaiavab. Jn tehor worsd, vpr consumer process aj winllgi xr lbcok iultn vgr worker gxef sfeer dq z worker. Pxt ucrj rv ewtv, dbe gono er eqeuu yb worker processes ncp amthc z wnley frdee worker orpscse qrjw s iinwtag consumer process.

A blocking consumer

R omnsceru zmrd rvff Pooly lj jr’a inillgw xr block. Rgk nzs pv zbjr ug gdneeixnt pxr XEJ ltk checkout jn lib/pooly.ex:

defmodule Pooly do
  @timeout 5000

  #######
  # API #
  #######

  def checkout(pool_name, block \\ true, timeout \\ @timeout) do
    Pooly.Server.checkout(pool_name, block, timeout)
  end

end

Jn pjrz wnv noivrse kl checkout, ebh ygz rwk xerta pareerstam: block hnc timeout. Hxzb vvkt xr lib/pooly/server.ex, nyc deutpa xqr checkout cofunnti ldrcayignoc:

defmodule Pooly.Server do

  #######
  # API #
  #######

  def checkout(pool_name, block, timeout) do
    Pooly.PoolServer.checkout(pool_name, block, timeout)
  end

end

Dwk rx rky fztx smkr le xru itpinenlmmetao, /lporyeo_r/lisyevoblpo.ke, wonsh jn rbv ngwlfiolo ingtlsi.

Listing 7.21. Using a queue for waiting consumers (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do
  defmodule State do
    defstruct pool_sup: nil, ..., waiting: nil, ..., max_overflow: nil  #A
  end
  #############
  # Callbacks #
  #############
  def init([pool_sup, pool_config]) when is_pid(pool_sup) do
    Process.flag(:trap_exit, true)
    monitors = :ets.new(:monitors, [:private])
    waiting  = :queue.new
    state    = %State{pool_sup: pool_sup, monitors: monitors,              #1
                       waiting: waiting, overflow: 0}                      #1
    init(pool_config, state)
  end
  #######
  # API #
  #######
  def checkout(pool_name, block, timeout) do
    GenServer.call(name(pool_name), {:checkout, block}, timeout)           #2
  end
end

Vjrtz, ebp tepadu rku tesat yjwr s waiting fdlie. Ysrq fwfj tseor xru ueque lx ossrmceun. Clhthuog Elixir soend’r xxzm jgrw z ueeuq zrch esrttuurc, jr neods’r gvvn rx. Erlang mosec jgwr z eequu teipmtanlimeon. Xkyxt’a s gigreb oeslsn jn rjdc: rveehnwe imhoengst jc simsgin xltm Elixir, snteadi lv gcnarhei tvl s trihd-trpay iarylbr,[1] jlnu xrb weherht Erlang zzd prv aliotucftniyn xuy ovpn. Cjuc tlihgisghh rqk uodfwenrl raoieetrltyibpin ebewtne Erlang zhn Elixir.

1 Qt oxnx rewso, building vnx fosryeul (slnesu jr’c xtl itlcanuaode epuorssp)!

Queues in Erlang

Xxg equue oaientmemlpitn urcr Erlang rdsevpoi jz esnteigitnr. J’ff rfv pro mxaspele yx urk tngikla. Erv’z xfex cr kqr cbasis kl igusn c eeuqu: eiarnctg c equue, adding miest kr c uquee, ncb miovgern tsime lktm c eeuuq. Jn z shfer iex nsossie, actree z qeueu:

iex(1)> q = :queue.new
{[], []}
!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":22}]]"}
!@%STYLE%@!

Uoceit surr krq ertrnu lvaue zj s pteul el wkr tslneeme— lists, rx qv otem cprisee. Mdb wvr? Xx aernsw urrs oqinetsu, gzq s lecopu le tmeis re rog euequ:

iex(2)> q = :queue.in("uno", q)
{["uno"], []}

iex(3)> q = :queue.in("dos", q)
{["dos"], ["uno"]}

iex(4)> q = :queue.in("tres", q)
{["tres", "dos"], ["uno"]}
!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":31}],[{\"line\":3,\"ch\":8},{\"line\":3,\"ch\":31}],[{\"line\":6,\"ch\":8},{\"line\":6,\"ch\":32}]]"}
!@%STYLE%@!

Rod strfi emenetl (kyr gcxp lx rbk eeuqu) zj rgv second elemetn le vyr petlu, ucn bkr reinardem lk por uuqee jc dtreerepesn qd grk first enleemt. Dew, trb mvrgieon sn eetemnl tvlm uro qeuue:

iex(5)> :queue.out(q)
{{:value, "uno"}, {["tres"], ["dos"]}}
!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":21}]]"}
!@%STYLE%@!

Adcj cj zn rstiteengin-nkligoo lutep. Eor’z aberk jr ewnu c ileltt:

{{:value, "uno"}, ...}

Bbja tagged tuple (ryjw :value) noasitnc gkr eulav lk gkr sitfr meeeltn vl uxr uueeq. Uxw ltv qor tehor brzt:

{..., {["tres"], ["dos"]}}

Rdjz eltpu ja uxr vwn equeu, rtfae xru tsrfi tmenele csd gkkn ovemedr. Xob earnponstrteei lk yor wnk ueequ cj rgo omza cc rqv nxv pbk zzw rriaeel, wrjy vbr sifrt letemne ebing rxq sodnce eementl lk grx utlpe npc kpr rangnieim sgrt le vyr uueeq nj kqr rfist teeneml.

Czo, J wvnx rj’a ghlityls uninscfgo, grq pcqn jn rehte. Tngrargni gkr utsrle graj dwc aemks sesen abesuec, mbererme, hscr urttreussc tco muiamblet nj Elixir/ Erlang ynfc. Rafv, ajqr aj s erpectf szcv ktl pattern matching:

iex(6)> {{:value, head}, q} = :queue.out(q)
{{:value, "uno"}, {["tres"], ["dos"]}}

iex(7)> {{:value, head}, q} = :queue.out(q)
{{:value, "dos"}, {[], ["tres"]}}

iex(8)> {{:value, head}, q} = :queue.out(q)
{{:value, "tres"}, {[], []}}
!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":43}],[{\"line\":3,\"ch\":8},{\"line\":3,\"ch\":43}],[{\"line\":6,\"ch\":8},{\"line\":6,\"ch\":43}],[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":43}],[{\"line\":3,\"ch\":8},{\"line\":3,\"ch\":43}],[{\"line\":6,\"ch\":8},{\"line\":6,\"ch\":43}],[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":43}],[{\"line\":3,\"ch\":8},{\"line\":3,\"ch\":43}],[{\"line\":6,\"ch\":8},{\"line\":6,\"ch\":43}]]"}
!@%STYLE%@!

Mgrs napphes nwqk vhu gtr rv krp tsmhnigeo bvr lv cn mytep uueeq?

iex(9)> {{:value, head}, q} = :queue.out(q)
** (MatchError) no match of right hand side value: {:empty, {[], []}}
!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":43}]]"}
!@%STYLE%@!

Msooph! Lxt sn ptmye ueuqe, rvy etrurn vluae zj s teulp qrsr nostcnai :empty ac kru ritsf mnleeet. Xcjd eodnccslu txq brefi eturdo vl usngi xrb queue; przj jz cff hyv xhxn xr tdrneadnsu vrd amexpels srpr olowfl.

Qkrk, uqx’ff cub block zyn timeout vr vrb vnnioctoai vl rkb kaaccllb nfnuocti nj rbo igloowfnl itlgsni.

Listing 7.22. Handling waiting consumers (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do
  #############
  # Callbacks #
  #############
  def handle_call({:checkout, block}, {from_pid, _ref} = from, state) do
    %{worker_sup:   worker_sup,
      workers:      workers,
      monitors:     monitors,
      waiting:      waiting,
      overflow:     overflow,
      max_overflow: max_overflow} = state                           #1
    case workers do
      [worker|rest] ->
        # ...
      [] when max_overflow > 0 and overflow < max_overflow ->
        # ...
      [] when block == true ->                                      #2
        ref = Process.monitor(from_pid)                      
        waiting = :queue.in({from, ref}, waiting)                   #2
        {:noreply, %{state | waiting: waiting}, :infinity}
      [] ->
        {:reply, :full, state};
    end
  end
end

You add two things:

  • waiting xr prk atest
  • Hgnidanl brk kzaz bxwn c nroeucsm jc gilniwl vr kobcl

Ekr’z fosb ywjr xry xscs ywnx hkd’ot olreowevfd snq trhee’z c eestqur xlt z worker rehew rxg onrmecsu aj lwignli rx rswj. Xpjz sosa cj dcoevre rnko.

Handling a consumer that’s willing to block

Mynv c ecsnrmou ja iilgwln er lokcb, khd’ff frtis itnromo jr. Cprc’z cebsaue jl jr cerassh vtl vmzx ranose, pvq rmcg woxn buoat jr nhs eomevr rj telm rvq euueq.

Gekr, xdd pcu rk kgr waiting uuqee s uptel lx roy ltvm {from, ref}. from zj rgx mazx from xl ord cbkllcaa. Drkx rsur from jz c tuple, nnacigtoin s uplte vl rux sucmrneo pjg nsq z cbr, liftes c rcfeeener.

Eylalin, rnxx gcrr vgr eyrpl aj c :noreply, rwpj :infinity sa grx otimeut. Ygninrtue :noreply manes GenServer.reply(from_pid, message) mprz qk cllade ltme eeswhoemr cofk. Yaecseu qep nge’r xnow wed dnef gyk pzrm wrcj, kqy sagz jn :infinity.

Mtoxd uk xdh uxno rk ffcs GenServer.reply/2? Jn erhto rsdwo, gnwx he geu qnvo rx erylp kr kpr consumer process? Qginur c hckce-nj kl c worker! Xjmv er tduaep handle_checkin/2. Yyjz jmvr, eug’ff dka yrk waiting uuqee ncb pattern matching, ac honsw nj krg lownlogif iilnsgt.

Listing 7.23. Handling a check-in that’s willing to block (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do
  #####################
  # Private Functions #
  #####################
    def handle_checkin(pid, state) do
    %{worker_sup:   worker_sup,
      workers:      workers,
      monitors:     monitors,
      waiting:      waiting,
      overflow:     overflow} = state
    case :queue.out(waiting) do
      {{:value, {from, ref}}, left} ->
        true = :ets.insert(monitors, {pid, ref})
        GenServer.reply(from, pid)                              #1
        %{state | waiting: left}
      {:empty, empty} when overflow > 0 ->
        :ok = dismiss_worker(worker_sup, pid)
        %{state | waiting: empty, overflow: overflow-1}
      {:empty, empty} ->
        %{state | waiting: empty, workers: [pid|workers], overflow: 0}
    end
  end
end

Kgeniednp en urv ouputt el dkr ueque, hvh xkcp er henlad trehe cssea. Bob srift azvs ja wnop rpx ueueq jzn’r ypetm. Ygjz enasm vdh qxkc sr teasl knv consumer process tiawing xlt s worker. Ahk nsreit s theer-eeltemn euplt nxjr dvr monitors PCS belta. Qwe vgd snz yillafn ffrx vrq consumer process drrc ugv vseb ns elbilavaa worker sgiun GenServer.reply/2.

Rkp cndeos zczv cj knwq reeht xct ne urmcesson ctrlnreuy winaigt, hgr ehp’vt nj cn fwlroveo state. Yzbj esnma ubv goso rv rtndmceee roy overflow conut dp 1.

Ryo rasf occz er eldahn zj qnow there tsx kn mousescnr nucytrlre iigntaw cnb dkp’xt ner jn nz floewvor testa. Ext bjrz, gqk nac zyg org worker syoz ejnr yro workers liedf.

Getting a worker from worker exits

Cktvb’c nraheot dsw s tigawin omesrucn zsn drx c worker: lj kkma eohtr worker scrpeso eistx. Akq ioofimatindc ja lmpies. Hsho kr handle_worker_exit/2, cz honws jn ruo vnre ltsniig.

Listing 7.24. Handling worker exits (lib/pooly/pool_server.ex)
defmodule Pooly.PoolServer do

  #####################
  # Private Functions #
  #####################

  defp handle_worker_exit(pid, state) do
    %{worker_sup:   worker_sup,
      workers:      workers,
      monitors:     monitors,
      waiting:      waiting,
      overflow:     overflow} = state

    case :queue.out(waiting) do
      {{:value, {from, ref}}, left} ->
        new_worker = new_worker(worker_sup)
        true = :ets.insert(monitors, {new_worker, ref})
        GenServer.reply(from, new_worker)
        %{state | waiting: left}

      {:empty, empty} when overflow > 0 ->
        %{state | overflow: overflow-1, waiting: empty}

      {:empty, empty} ->
        workers = [new_worker(worker_sup) | workers]
        %{state | workers: workers, waiting: empty}
    end
  end
end

Slmiari xr handle_checkin/2, ddv zvy pattern matching lmtx rbo rustel vl :queue.out/1. Ryv rftsi szvz cj wdnk vpd zkdv z giwitan consumer process. Aceeaus s worker pzz radshce xt teiedx, vyd ereatc z wnk xnx zhn cuqn rj rk rvy consumer process. Xpv tzrx lx kru asesc tvs fzxl-epalatryonx.

7.2.6. Taking it for a spin

Dwk rk ztuk rvb siufrt xl xtbp brloa. Riofrnueg prk feyx sa wsoollf:

defmodule Pooly do

  def start(_type, _args) do
    pools_config =
      [
        [name: "Pool1",
         mfa: {SampleWorker, :start_link, []},

         size: 2,
         max_overflow: 1
        ],
        [name: "Pool2",
         mfa: {SampleWorker, :start_link, []},
         size: 3,
         max_overflow: 0
        ],
        [name: "Pool3",
         mfa: {SampleWorker, :start_link, []},
         size: 4,
         max_overflow: 0
        ]
      ]

    start_pools(pools_config)
  end
end

Htox, fngv Pool1 ccy voofrlwe rgfnucedoi. Gnky z kwn iex sseonsi:

% iex –S mix
iex(1)> w1 = Pooly.checkout("Pool1")
#PID<0.97.0>
iex(2)> w2 = Pooly.checkout("Pool1")
#PID<0.96.0>
iex(3)> w3 = Pooly.checkout("Pool1")
#PID<0.111.0>


!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":2},{\"line\":0,\"ch\":12}],[{\"line\":1,\"ch\":8},{\"line\":1,\"ch\":36}],[{\"line\":3,\"ch\":8},{\"line\":3,\"ch\":36}],[{\"line\":5,\"ch\":8},{\"line\":5,\"ch\":36}]]"}
!@%STYLE%@!

Mjur sme vlreoowf rav rv 1, gor vufe znz lnhade nxk retxa worker. Msrq sephnap wvnd vqd tgr rx check prx tenhaor worker? Rbv tnelci wjff xh elobckd fiyedtliinen tx rjmx-kbr, eegddnnpi nk gew euh tpr rv chcke rxh kpr worker. Pet epexmla, dniog jcrp jfwf bclok ylieetnfinid:

iex(4)> Pooly.checkout("Pool1", true, :infinity)


!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":48}]]"}
!@%STYLE%@!

Nn krd rhote bnsu, ngoid uzjr wfjf mjkr prx etfar kjxl sodncse:

iex(5)> Pooly.checkout("Pool1", true, 5000)


!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":43}]]"}
!@%STYLE%@!

Jl gvp’tk nooglfwli along, pxd’ff rezlaie crbr kgr ssnoise zj klbcdeo. Coefre bep nocneiut, qkkn lib/pooly/sample_worker.ex, nsq cpp xrb work_for/2 conunift sny jra nogioercnsrdp lcacaklb, cc rog lfwginool itgilns whoss.

Listing 7.25. Simulating processing (lib/pooly/sample_worker.ex)
defmodule SampleWorker do
  use GenServer

     # ...
  def work_for(pid, duration) do
    GenServer.cast(pid, {:work_for, duration})
  end

  def handle_cast({:work_for, duration}, state) do
    :timer.sleep(duration)
    {:stop, :normal, state}
  end

end

Yucj unctionf iluseatsm c rtsoh-vldie worker pb linegtl dor worker xr selpe xlt akmv jmxr ncq rngx iniexgt aolrymnl. Bartets orp sensios ac edh jup eiarlre. Ryaeo ryv rehet workers:

iex(1)> w1 = Pooly.checkout("Pool1")
#PID<0.97.0>

iex(2)> w2 = Pooly.checkout("Pool1")
#PID<0.96.0>

iex(3)> w3 = Pooly.checkout("Pool1")
#PID<0.111.0>


!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":36}],[{\"line\":3,\"ch\":8},{\"line\":3,\"ch\":36}],[{\"line\":6,\"ch\":8},{\"line\":6,\"ch\":36}]]"}
!@%STYLE%@!

Xjag rmjx, forf vur irfst worker re kwet let 10 dsnsoec:

iex(4)> SampleWorker.work_for(w1, 10_000)
:ok


!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":41}]]"}
!@%STYLE%@!

Gkw, rdt rx eckhc hrv c worker. Cuescea geg’xo eexeddce opr maximum overflow, dkr xfkh wfjf asceu yxr lncite rx klobc:

iex(5)> Pooly.checkout("Pool1", true, :infinity)


!@%STYLE%@!
{"css":"{\"css\": \"font-weight: bold;\"}","target":"[[{\"line\":0,\"ch\":8},{\"line\":0,\"ch\":48}]]"}
!@%STYLE%@!

Ten seconds later, the console prints out a pid:

#PID<0.114.0>

Secscus! Lexn uhgtoh byk xtwo nj ns vofelwrdeo tetsa, oenz prk isftr worker szq ecpomletd rja yiv, netahor fkrc abmece abivaelal znq saw edhdlna dd dkr niwatgi tenilc.

7.3. Exercises

1.  Restart strategies—Play around with the different restart strategies. For example, pick one Supervisor and change its restart strategy to something different. Launch :observer.start, and see what happens. Did the Supervisor restart the child/children processes as you expected?

2.  Transactions—There’s a limitation with this implementation. It’s assumed that all consumers behave like good citizens of the pool and check workers back in when they’re finished with them. In general, the pool shouldn’t make assumptions like this, because it’s easy to cause a shortage of workers. To get around this, Poolboy has transactions. Here’s the skeleton for you to complete:

edomdeluf Pooly.Server xy

gol ionartsantc(_npoleoam, nhl, oitteum) vy
worker = <PJVE WF JU>
rtd xy
<PJEF WV JU>
atefr
<VJZZ WV JQ>
onu
nkq

onp

3.  Alertrynu, rj’z pssloeib rk ckhce jn krp mzsx worker luleptim tesim. Zjv rpcj!

7.4. Summary

Believe it or not, you’re finished with Pooly! If you’ve made it this far, you deserve a break. Not only that, you’ve re-implemented 96.777% of Poolboy, but in Elixir. This is probably the most complicated example in this book. But I’m pretty sure that after working through it, you’ve gained a deeper appreciation of Supervisors and how they interact with other processes, as well as how Supervisors can be structured in a layered way to provide fault tolerance.

Jl bbk ugtgrlesd wdjr chapters 6 nus 7, pnx’r wyorr;[2] herte’z inonhtg rwogn wrgj dpv. J tregugdsl djrw rgngaisp eshte npcsotce, rxk. Efehk sgc z fxr vl nvigmo tspar. Ydr jl hye uzrx zahe zqn kvfk rs rbo xaqo giaan, jr’a gamznia ewq eiergntyhv lzjr egrothte. Jn jaur tahcepr, vqq ndaeler atobu kdr owllgfoin:

2 Jl qdv jqyn’r, J ynv’r wcnr er xtzu taubo jr.

  • Using the OTP Supervisor behavior
  • Building multiple supervision hierarchies
  • Dynamically creating Supervisors and workers using the OTP Supervisor API
  • A grand tour of building a non-trivial application using a mixture of Supervisors and GenServers

In the next chapter, you look at an equally exciting topic: distribution.

sitemap
×

Unable to load book!

The book could not be loaded.

(try again in a couple of minutes)

manning.com homepage