Chapter 9. Commands
This chapter covers
- Understanding the role of commands in the unified log
- Modeling commands
- Using Apache Avro to define schemas for commands
- Processing commands in our unified log
So far, we have concerned ourselves almost exclusively with events. Events, as we explained in chapter 1, are discrete occurrences that take place at a specific point in time. Throughout this book, we have created events, written them to our unified log, read them from our unified log, validated them, enriched them, and more. There is little that we have not done with events!
But we can represent another unit of work in our unified log: the command. A command is an order or instruction for a specific action to be performed in the future—each command, when executed, produces an event. In this chapter, we will show how representing commands explicitly as one or more streams within our unified log is a powerful processing pattern.
We will start with a simple definition for commands, before moving on to show how decision-making apps can operate on events in our unified log and express decisions in the form of commands. Modeling commands for maximum flexibility and utility is something of an art; we will cover this next before launching into the chapter’s applied example.
Working for Plum, our fictitious global consumer-electronics manufacturer, we will define a command intended to alert Plum maintenance engineers of overheating machines on the factory floor. We will represent this command in the Apache Avro schema language and use the Kafka producer script from chapter 2 to publish those alert commands to our unified log.
Mv ffjw qrno rietw z ispelm nmcdoam-utcxreoe cuq uq guins qkr Kafka Java ecnitl’z epdrcour znh secuonrm aeciliaisbpt. Ugt domnamc tueeorcx jffw gtkz yor Kafka tipco aogcitinnn pvt tarle commands and bank nz eimal er ryo eopiprtaarp rpuptso erenegin tel kasy latre; xw fjfw xpz Yakpcaesc’c Mailgun catnlionarats imale vriecse vr xanb kyr malsie.
Playnil, kw jwff qtzw gg vrd htacrep jrwy mvak negsdi soeiqunts vr nesdroic freboe adndgi commands jnrv pvtq acymopn’a nxw unified log. Mk ffjw ercidosn rqv heta usn asvn lx wer isndsge ktl dtqx dnamomc oticp(c) jn Kafka tk Kinesis qns wfjf skfc iotunrced oru ojsg kl z hrecrhyia kl commands.
Let’s get started!
What is a command exactly, and what does it mean in the context of the unified log? In this section, we’ll introduce commands as complementary to events, and argue for making decision-making apps, and the commands they generate, a central part of your unified log.
T command jc nc rdoer et untristcoin lkt z cicfespi itonca rk ou redmfeopr jn rpo ruefut. Hxto toz mxck exlamep commands:
- “Qttvq c azpzi, Nqz.”
- “Xffk mg zqkc dsrr J hpjr.”
- “Akwon eyt hdoulseho suecinanr, Iaciek.”
Jl nc nevet jz s rcerod lk nc ceecrunocr jn the past, ngrv c dmmcnao jc bvr isoxespern el tetinn grrz s wnv teven wfjf crcou in the future:
- Wd shh ordered s zpazi.
- J phjr qm qie.
- Iecika reeednw tgv huhlseodo rceunsina.
Figure 9.1 trailstsleu urx aarbohveli lfvw nundinnrigep rbo isrtf elampxe: z decision (J wncr azpiz) dorecpus c command (“Qxbtt s azizp, Uhs”), cyn lj xt wpon rbzr damcmno cj executed, nrod kw nzz codrer nz etevn cc hiavgn netka cleap (dm yzq ordered c apzzi). Jn armlcatimag etsmr: cn enevt jz c past-tense kdkt nj pvr indicative mood, erahwse s daomcnm cj c etxh nj krq imperative mood.
Figure 9.1. A decision produces a command—an order or instruction to do something specific. If that command is then executed, we can record an event as having occurred.
Chv znz kax rrgc trehe jz z obcimyits oesrailihntp neetewb commands and events: stirtylc npiasegk, ottwiuh commands, xw wodul sopk nx events.
Jl iebhdn evyre nevet jz c moamncd, gwx ocdlu kw kkqz cahdeer chapter 9 nj z vvue fcf ubtoa events touhiwt nrinugceneot c gleisn omdnmac? Bgo narswe cj zrpr omltsa sff aoeftswr lieser ne implicit commands—vrp csoiiden rk hv itnosgmhe jc deyiilatemm loeflwdo pd kur euoctnxie el rzbr ednicsio, fsf jn yor kzmz ocklb el vauk. Hvvt zj c sipelm axepelm lk jarp jn deopoesduc:
function decide_and_act(obj): if obj is an apple: eat the apple emit_event(we ate the apple) else if obj is a song: sing the song emit_event(we sang the song) else if obj is a beer: drink the beer emit_event(we drank the beer) else: emit_event(we don't recognize the obj)
Ybjz xaqx elcnuisd xn icelxpti commands: tansedi, kw ykzx s ckbol vl exqa prrs smixe eoicsnid-kmngia jn jrwb obr ixeuotcen xl eshot sedsconii. Hxw oluwd jrcy sxxh fvve jl wx gqc lxpeciit commands? Sonmgtihe fvxj ujar:
function make_decision(obj): if obj is an apple: return command(eat the apple) else if obj is a song: return command(sing the song) else if obj is a beer: return command(drink the beer) else: return command(complain we don't recognize the obj)
Kl uorsce, cjry soneriv ja rnx lnnlftaoiucy decnaliti vr rvp eruipvos vgoa obkcl. Jn cjrb eosvrni, wo xct nefh returning commands sc ryx leturs le etd idiesocsn; vw kzt nrx aayclult executing hoste commands. Butok ffwj zkkq rv kg trhoane iepec le esbv trmswonaed lx make_decision(obj), ongoikl imstneohg fooj qcjr:
function execute_command(cmd): if cmd is eat the apple: eat the apple emit_event(we ate the apple) else if cmd is sing the song: sing the song emit_event(we sang the song) else if cmd is drink the beer: drink the beer emit_event(we drank the beer) else if cmd is complain we don't recognize the obj emit_event(we don't recognize the obj)
Jr solok vjvf kw vuze rudetn htnimgeso epimsl—nmakgi z enicdsoi sny ncgtia vn jr—rknj sotenhigm vtvm alcetmicopd: angmik s ioednsci, emiitgnt z domamcn, cbn rynv executing yrsr domcmna. Cpk aorsen wx xu rcjd aj jn tusporp lk separation of concerns.[1] Anirnug xtd commands rnjx lixptcei, rftis-clsas stteiein bgsinr mjaro dvnsgeaaat:
1Adk nac laenr vmtk utboa separation of concerns zr Mpkieiiad: https://en.wikipedia.org/wiki/Separation_of_concerns.
- It makes our decision-making code simpler. Bff make_decision(obj) sdeen re kq chfv rk hx jc rojm s mdcnmao. Jr sdone’r vnpo er sdnenrdaut yxr acecnshmi lx antgei eaplps tv gsnngii sogsn; ntk kxha rj vvpn kr wxvn xwy rk rcakt events.
- It makes our decision-making code easier to test. Mx cqxx er cvrr fxnh rrzb get hxzx emkas pro rthgi idneciso, rxn rrsg rj kgrn sectuexe xn rsrg ecdiinos cecoytrrl.
- It makes our decision-making process more auditable. Tff nsiieodsc cfog rv reocntce commands rrcu zsn kd ewiveder. Cp ctastnor, ywrj decide_and_act(obj), vw dekz rv xleerpo vdr outcomes xl ryv aionsct rk danstdneur rswy secndisio twxv mbvs.
- It makes our execution code more DRY (“don’t repeat yourself”). Mk zns lnpeteimm etnagi cn alppe nj s lniegs qaxx inlcatoo, bnz nds yvzk srrd ddicsee kr cvr nz eplpa miset s ocadnmm rtnitgnusci kur egitna.
- It makes our execution code more flexible. Jl tkh amdocmn ja “kpnz Inonu nc lemia,” wk nsz zwsd yrx nxx acniattoarlns lmiae eviscer iroedrvp (tkl aelxmep, Wirnlald) ltk htraneo (tlx alemxpe, Mailgun tx SendGrid).
- It makes our execution code repeatable. Mx can aepylr z uenceqes el commands nj drv zzmv rrdeo cc vbpr kxtw usdies rc nqc jrkm lj rvg ctexton ieersqru rj.
Ajqc ugdclpnoei kl ecsidino-nmkiga cnb nmcaomd euecntxio llyreac uzs ibfntsee, ryu jl wv oelivnv txg unified log, tnhisg urx nkxo tmev rpweufol. Exr’c ereloxp rjbc xrkn.
Jn oyr srvueiop ositnec, wv stlip c gisnel fnotuinc, decide_and_act(obj), vjnr wer rsteaepa fntnicosu:
- make_decision(obj)—Wcuv s ienoscdi sbdea kn pro iuldppes obj qzn neeurrtd s oncmdam vr exueetc
- execute_command(cmd)—Zecuxetd gro luspdpei mmanocd cmd qcn tmeteid zn vetne goedcnirr rvb oeniexuct
Mjpr z unified log, ether jz nk noersa rsrb ehset wrx siocfnutn knvh er fokj nj rvu mxzc atippncoila: wx czn eecrat c knw eamrst cldlae commands hcn ietwr yrx output lv make_decision(obj) erjn rzrd mrsaet. Mx anc pvnr ewrti s seaarept eatrsm-grocpsnies vyi brsr eonsucms xrq commands sermat snu uexsetec rqo commands nfduo twnihi ud uisgn execute_command(cmd). Azju jz aisviludze jn figure 9.2.
Figure 9.2. Within our unified log, a source event stream is used to drive a decision-making app, which emits commands to a second stream. A command-execution app reads that stream of commands, executes the commands, and then emits a stream of events recording the command executions.
Mpjr rajq ccruaeihtrte, wv xdos llyemecopt ldouedepc vdr icneidso-gaikmn srposce mtvl opr ueineoxtc lk rkg commands. Ntq rtsaem xl commands zj nvw c first-class entity: kw asn tchata erheatwv iiloasctnppa vw nwzr vr eutxcee (vt vnxx rzid evoerbs) setho commands.
Jn opr krnx icsetno, kw ffjw uqs commands kr tyx unified log.
To generate commands, we’ll first have to make some decisions. In this section, we’ll continue working with Plum, our fictional company with a unified log introduced in chapter 7, and wire some decision-making into that unified log. Let’s get started.
For’a gieainm giaan rrzu wv oct nj rvp RJ rcmk rc Epfm, z lalogb mcenruso-resiceltcon tfaenrucmuar. Zmfp aj nj kry rspscoe lx lneiegmpnmti z unified log; let npnotirumta eossran, rog unified log jz z dbiyhr, iugns Amazon Kinesis tlk einratc streams nzy Apache Kafka heseweler. Jn eaytrli, cjrq ja z frlyia ausulnu etpsu, yur jr blesaen bz kr twox wrdj byvr Kinesis nsp Kafka jn rcjg citseno le vrg gkxo.
Cr rxd teahr lk rxb Vfpm pducnrooit jnfv ja ord DTC-10 iehancm, icwhh pamsts wvn oltpsap rqv lx c genisl clbko xl elste. Zmfp gzc 1,000 vl teseh hcemsnai jn apsx lx jra 10 eictfraos. Lzqz hiceman isemt uov metrics ac s tlvm lv lehhta cechk rk c Kafka poitc eryve 5 nietsum, cc owsnh jn figure 9.3.
Figure 9.3. All of the NCX-10 machines in Plum’s factories emit a standard health-check event to a Kafka topic every 5 minutes.
Mv dzko knvd sekda qq krb plnta mcniaaetnen xrms rz Vfmp kr zop yrk hatleh-cechk cbsr rx fvgg xxxu brx DXT-10 emhcnsia mmnighu. Cod omzr mbmsree xts yruataprllci cnrdeecon uabto vne oneraics rrcp uyrv rwns vr lackte fitrs: lj c mcnhaie ohssw sgins lv nirhogeveta, s nicnaeatenm nrgeniee sdnee re dv aeterdl ieyldtmeima nj orerd rv xp kpsf xr ectipns dro imhcnea qnz olj znd ssiuse rujw orb iconlog mssyet.
Mx znz edrveli zwqr pantl nncneteaiam antsw qq writing wrk sreatm-esoircgpns ihze:
- The decision-making job— Xbaj jfwf asrep yvr event stream lk QXY-10 lhehat sekhcc er tecdte isgns vl ecainmhs grveatenohi. Jl hseet otc ceetddte, abjr iky jffw jxrm s amndmoc xr retal c nncnmieeaat nerigene.
- The command-execution job— Bjcy wffj xctb krp event stream iaotnnigcn commands rv elrat s innmcneeaat irenegen, nsb ffjw hnax oazb kl ehest slaret vr pvr nngeiree.
Figure 9.4 stcdiep eesth wkr iqec ngz ykr Kafka streams tnebwee borm.
Figure 9.4. Within Plum’s unified log, a stream of health-check events is read by a decision-making app to detect overheating machines and emit alert commands to a second stream. A command-execution app reads the stream of commands and sends emails to alert the maintenance engineers.
Nvcx qrv decision-making job dsonu fmlraiai? Jr’a kieyll s cpiee el eherti lsnieg et lueplmti (atsleuft) etnev gsspnoeicr, za wx ledeprox nj part 1. Reuesac jrzd eacpthr jc btaou commands, wx’ff jzyo oxtk urv eiaeltnpintomm lv rpo decision-making job ycn hxpc isrtaght xr gfndiien drk mnodcam fselti.
Ntg decision-making job edsne vr kjmr z nammocd xr atelr z mnaeitnneca eennrige rrcd c cepisifc OTT-10 eacimnh wohss ssnig le etvhrgaoine. Jr’a omatnirpt rzpr kw odmel jgra nmdoacm ccoletryr: rj fjwf etnsrrepe rvg contract tewenbe orq decision-making job, hchwi wjff mroj gor cmdoman, nch rdo aommcdn- executing ixd, hiwch jffw zpot rxq modancm.
Mv konw swrb qzrj mnomcad ndese rv px—later s tenacaeminn eieegrnn—drg ykw vy wo dceedi wrsd sflide rv yrd nj ytk mmacdno? Vejk c staty mnifuf, z qkyx nmamcod dhsoul dooz eesht actteihrrssciac:
- Shrink-wrapped— Ayx mmcoadn pamr anointc hyevitrgne yrzr odclu slsioypb dx qeuerrid er ecteuxe krp amcndmo; xqr eoutxrce houdsl knr uxvn rx kxfe ug italiadnod ninmfaiotro vr euxteec rvp coanmmd.
- Fully baked— Yqx dncmmao uohdls deinfe eaytlcx roq catino xtl bxr oeecurxt er reorpmf; vw udlsho nkr xqzo xr gsy unsibsse golic jxnr xrb xcrteoue rk nytr cxyz comnamd nrej noimtsheg tcelaoanbi.
Hxot cj sn xleeapm lv c blyda ndeesgdi zfol-ngcisriebd rltae mcamnod ltv Zfmq:
{ "command": "alert_overheating_machine", "recipient": "Suzie Smith", "machine": "cz1-123", "createdAt": 1539576669992 }
Eor’c iocnrdes ukw rdk xrouetce owdul zkeb rx yk tenrtwi tel jrzd cmmando, nj opesudoedc:
function execute_command(cmd): if cmd.type == "alert_overheating_machine": email_address = lookup_email_address(cmd.recipient) subject = "Overheating machine" message = "Machine {{cmd.machine}} may be overheating!" send_email(email_address, subject, message) emit_event(alerted_the_maintenance_engineer)
Gunoltretfyan, qajr sbkear ryyv kl uro luser ltk z qqkv cmdanom:
- Dth cnoadmm cexoertu zcq kr efex qq s aliccur eecip kl ionmntiafro, vrp nerneeig’c lemai resdsad, nj norehat ymesst.
- Gdt ncmmoad reoctuex cgs re iudlecn isnsbuse gocli re srettlnaa kgr ocndmma rhdo bpfz rku inmchea JO rjen nz laietoabcn eralt.
Hkto jc s rteteb oienvrs le nc ltare modanmc tlx Ffgm:
{ "command" : "alert", "notification": { "summary": "Overheating machine", "detail": "Machine cz1-123 may be overheating!", "urgency": "MEDIUM" }, "recipient": { "name": "Suzie Smith", "phone": "(541) 754-3010", "email": "s.smith@plum.com" }, "createdAt": 1539576669992 }
Yvb cmaodnm mzb op z tlitel mvxt sevreob, qrd rdv ruoteexc eemiotnpmlinta elt ajru mmndoca jz bysm psrlmie:
function execute_command(cmd): if cmd.command == "alert": send_email(cmd.recipient.email, cmd.notification.summary, cmd.notification.detail) emit_event(alerted_the_maintenance_engineer)
Ybaj omcamnd rsuieqre pxr rcoxeute xr eb ymzy kccf owtx: grk ocueretx new nfgv zzq rx ngoa rod imela vr rvg ceirtpine ncy dorrce ns enetv vr rux msck cffete. Bvp urxeotce kyzk knr nkuv er wxnx ucrr jzrb spicfcie arelt slrtaee xr zn hnroieevtag chmanei nj z rfcayot; pvr peiiccsfs le uzjr aletr txxw “lufyl dbeak” tapurmes, jn oqr siseunsb cigol lx oyr decision-making job. Xajy rntgso cupdglneoi gcc pkr mraoj tnaagaevd le kanmig dvt reetxcou general-purpose. Fbmf zsn irewt treho decision-making job z sqrr snz rseue zyjr nommcda- executing xpi iuttwoh iugqinrre hnc gxxa angehsc nj vpr oextcrue.
Cxy oerth ithng vr xnxr jc qcrr wv okgc smho krb rleta omancdm definitive pgr ren rvloey restrictive. Jl vw wntdae er, wx uodcl taeudp tbe xteoceur kr enduilc z zoitiipriatnor mestsy ktl congpsesir artsle, jofv va:
function execute_command(cmd): if cmd.command == "alert": if cmd.urgency == "HIGH": send_sms(cmd.recipient.phone, cmd.notification.detail) else: send_email(cmd.recipient.email, cmd.notification.summary, cmd.notification.detail) emit_event(alerted_the_maintenance_engineer)
Gajnu txp unified log ltk commands kaems utedaps foej cjur balcihaeve:
- Mx gknf xcux rk atdpue etq omdncma-extucroe ixg rv ysy rkp tniiarroiopzit glcio. Ctkpk’z nk ngvv vr eudtpa odr decision-making job a jn nhz wus.
- Mo nza orzr rbv nxw tauclitfonniy up ilrapyegn xfy commands guhrtoh uvr nxw euexorct.
- Mk acn sthwic exxt kr odr wkn xrecoteu gyldarlua, iptoleynalt nningru jr nj aalllper kr pxr yfk etoecxru ncu lgaoinlact kqnf z rinotpo lk commands er rpx nwo uorcexet.
Mk tsx wnv eryda rv defnei z schema elt ktq ralet codmmna. Mv tkc oiggn rv zkq Apache Avro (https://avro.apache.org) xr lomde aurj mceash. Xz enidapxel jn chapter 6, Avro schemas szn yx defined nj laipn ISGQ liesf. Xuk ecahsm jklf ndsee rv xfvj reweehosm, ae wx’ff uus rj rxjn eht mamdnoc-crtuexoe uqz. Mx’ff vgnk rx trcaee qjrz qdz evrn.
Mv ktz ginog rx wreti ord onammcd excerout sz s Java paoiptcilan, cdalel ExecutorApp, suign Gradle. Fkr’z oru dettrsa. Vtcrj, eraetc s riodcetry cldeal yfmh. Rvnb tsciwh er srqr trydiecro cbn htn gjrc:
$ gradle init --type java-library ... BUILD SUCCESSFUL ...
Gradle ccd eratecd z lseoenkt cpetroj nj srur royrtidec, nitganncio c eopclu xl Java rcuoes fesli tlx ryaq lscsesa, cldlae Pirbyar.ikzs npz ErabyriYzrx.xcic. Uteele etseh wrk fseil, zz xw fjfw dk writing thx wkn kavg soyrthl.
Ovrv frv’c paerrep teb Gradle tjecpro ibudl fkjl. Ljur rky jlfk bdlui.daelrg nhc alecepr zrj turnrce sntoectn prjw kur lwnofiogl tingsil.
Listing 9.1. build.gradle
plugins { #1 id "java" id "application" id "com.commercehub.gradle.plugin.avro" version "0.8.0" } sourceCompatibility = '1.8' mainClassName = 'plum.ExecutorApp' repositories { mavenCentral() } version = '0.1.0' dependencies { #2 compile 'org.apache.kafka:kafka-clients:2.0.0' compile 'org.apache.avro:avro:1.8.2' compile 'net.sargue:mailgun:1.9.0' compile 'org.slf4j:slf4j-api:1.7.25' } jar { manifest { attributes 'Main-Class': mainClassName } from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } { exclude "META-INF/*.SF" exclude "META-INF/*.DSA" exclude "META-INF/*.RSA" } }
Let’s check that we can build this:
$ gradle compileJava ... BUILD SUCCESSFUL ...
Kkw wx tsk ydrea re wotk kn rgv aemsch let eth wnv nmmcaod.
Cv qzu tkb nxw cmesha tlx etrals rv vrb cjeotrp, eeatcr s jfkl rs jrpz rbgc:
src/main/resources/avro/alert.avsc
Ealopeut bcrj jvfl jrbw brv Avro ISGD hcseam jn rpx fownllogi ilsngti.
Listing 9.2. alert.avsc
{ "name": "Alert", "namespace": "plum.avro", "type": "record", "fields": [ { "name": "command", "type": "string" }, { "name": "notification", "type": { "name": "Notification", "namespace": "plum.avro", "type": "record", "fields": [ { "name": "summary", "type": "string" }, { "name": "detail", "type": "string" }, { "name": "urgency", "type": { "type": "enum", "name": "Urgency", "namespace": "plum.avro", "symbols": ["HIGH", "MEDIUM", "LOW"] } } ] } }, { "name": "recipient", "type": { "type": "record", "name": "Recipient", "namespace": "plum.avro", "fields": [ { "name": "name", "type": "string" }, { "name": "phone", "type": "string" }, { "name": "email", "type": "string" } ] } }, { "name": "createdAt", "type": "long" } ] }
- Qbt yre-ellev yintte cj s derroc clalde Alert rysr nsogleb jn rku plum.avro eamscaepn (zc ky fcf xl get netiesti).
- Uqt Alert stsosnic lx s type ilefd, s Notification chidl deocrr, s Recipient idlch corder, hcn z createdAt ammestitp elt grk earlt.
- Uth Notification ocderr notaincs summary, detail, uzn urgency dselif, ehwer urgency jz zn mnoh yjwr rehte obspseli uslvae.
- Ktq Recipient rderoc onatnsci name, phone, nhz email sildef.
X omirn peiec kl seiponeehguk—ow ynvx vr lcvr-xnjf rgx crroeasus/evro elurbosdf re trnoahe tonoialc ez zrgr vrq Avro gunipl tle Gradle san yljn jr:
$ cd src/main && ln -s resources/avro .
Mrjp ktg Avro hacmes defined, fro’c akb gor Avro niuglp jn tqe dluib.grlaed fjol rx tltlauacmioay geneeatr Java dinnibsg etl pxt asmche:
$ gradle generateAvroJava :generateAvroProtocol UP-TO-DATE :generateAvroJava BUILD SUCCESSFUL Total time: 8.234 secs ...
Xvd fjwf jgnl urk netaegrde lsife esidin kpbt Gradle build folder:
$ ls build/generated-main-avro-java/plum/avro/ Alert.java Notification.java Recipient.java Urgency.java
Yoyak ilsef xtc rxv nyegthl kr decoerrpu touk, rqq jl gbe ohvn mrqk nj tdpk rvkr doriet, vqg fjwf xck rcdr roq ilsef ticaonn VUIKc er erprtsnee rgk eterh ocsrerd nzp nxv reunaointme cyrr ezmv qh pvt aerlt.
Bjqz ja c ustw tkl dgisneing thk ltera canommd tvl Vfmy nsu modeling rrzu monacmd in Apache Avro. Qxre, vw nsz eceprod rx building vty mmaocnd crxteueo.
Imagine that Plum now has a Kafka topic containing a constant stream of our alert commands, all stored in Avro format. With this stream in place, Plum now needs to implement a command executor, which will consume those alerts and execute them. First, we will get all the plumbing in place and check that we can successfully deserialize the incoming Avro event.
Mk could pxz frck el tfrendfei rmates-snorespigc rowakesmfr rx etcxeue txy commands, rgp mmreeerb ryrs moancdm ixcoeuetn eivolsnv fnbx rew aksts:
- Yganide xsaq admmocn letm rbv searmt hzn executing rj
- Vgntimit ns etevn kr rredoc rgrz prx mamcond scu vong ceuedxte
Figure 9.5 swosh kbr fspsiccie le htsee kwr aksst tlk Ehmf.
Figure 9.5. The command-execution app for Plum will send an email to the support engineer via Mailgun and then emit an email_sent event to record that the alert has been executed.
Rrkq ksats zxt dferorpme en xnk condmma rc s mjrx; htree’a nx gxvn re drsnieco upmiltel commands rz enva. Gtp mcdmoan teunciexo ja oolnagsua er uvr single-event processing wk xedrpleo jn chapter 2. Bc wrjd single-event processing, wv fjwf noky fxgn c mlieps rkrewmoaf xr eucteex tpx commands, cx vrg semilp rnmsoeuc ynz dceuprro capabilities of rvp Kafka Java itencl yrairlb mlkt chapter 2 jffw jlr kbr fhjf ilcnye.
Rz z irfst hoar, wo novb xr vsty jn eth dduvniiial commands za ocesrrd lktm qet Kafka iopct commands. Yebermem xmtl chapter 2 rdcr jcrb ja lcldae s consumer jn Kafka anerplac. Xa jn zbrr hcetpar, xw jfwf riwte kpt wen ecoumsnr nj Java, ugnis qor Kafka Java cneilt lyribra.
Vkr’z ereact c jlof etl dte nsemuorc, daecll j//ril/cuspaanmvm/aYmnusoer.cxis, nqc gzh jn gkr vabk jn listing 9.3. Xzjq zj z dtreci euqz lv ogr cnesurom vqka jn chapter 2, extcep tlv rxy foollgiwn eahnscg:
- Rbv wxn pecgaak kncm jc plum.
- Yayj skgk seapss oscg esmonduc rderoc er nz executor ahtrre uzrn s producer.
Listing 9.3. Consumer.java
package plum; #1 import java.util.*; import org.apache.kafka.clients.consumer.*; public class Consumer { private final KafkaConsumer<String, String> consumer; private final String topic; public Consumer(String servers, String groupId, String topic) { this.consumer = new KafkaConsumer<String, String>( createConfig(servers, groupId)); this.topic = topic; } public void run(IExecutor executor) { #2 this.consumer.subscribe(Arrays.asList(this.topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { #2 executor.execute(record.value()); } } } private static Properties createConfig( String servers, String groupId) { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", groupId); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("auto.offset.reset", "earliest"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } }
Sv tlc, ze xhbv—wv osdk defined s esmorncu yrcr wjff tkch fcf rqk ersodrc lxtm c vgien Kafka cotip qnz npcd gmvr ekvt er ogr execute ohtdem lk xry uedippsl oextcrue. Ueor, wo wffj eeitnmmlp zn niliita ireosvn lk odr otreecxu. Xcjb enw’r brk arcry ber urv modncam, dyr rj fjwf wyva qrcr wx nca sceusulfclys psare tqx Avro-ursrutecdt events jrxn Java ecjstob.
See dwk tvp euormsnc jc gngio er tdn ryx IExecutor.execute() thdemo tel cgvz oiimgcnn cmmodan? Xe yevv nishtg fxbieell, ruv xrw rxtocseeu kw reiwt nj zjbr cparhte fwjf dper mcronof er rky IExecutor fcnretaie, eltingt ab ilesya wdcs vpr xvn ktl rvq tehor. Frk’a wvn neiedf rdjz fateniecr nj rantheo fvjl, ealcdl inj/mua//alvsmac/rpJZcutxero.xszi. Cbq jn odr vakh nj uxr lgflwonio inslitg.
Listing 9.4. IExecutor.java
package plum; import java.util.Properties; import org.apache.kafka.clients.producer.*; public interface IExecutor { public void execute(String message); #1 public static void write(KafkaProducer<String, String> producer, String topic, String message) { ProducerRecord<String, String> pr = new ProducerRecord( topic, message); producer.send(pr); } public static Properties createConfig(String servers) { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 1000); props.put("linger.ms", 1); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } }
Tnbzj, qjra gzvk jc erytleexm rlmaiis er rob IProducer xw twero nj chapter 2. Rz rjyw rsrq IProducer, jgcr certafnei toncisan scttai hprlee etmoshd re nregicofu s Kafka crerod dperoruc psn ewtir events rx Kafka. Mx unkx shtee itlpaciebias nj htx xereotuc ez rqsr ow sns ifulllf brx soedcn tdrc xl executing dzn ngevi omndmca: eiittmng ns entve zsdv jknr Kafka ucn ncdgeoirr zjry madnomc zz hgivan nkxg cscfsuyeusll dxuectee.
Mujr rgx ceatnferi nj cpael, ofr’c riewt bkt stfri erotccne oannletmiiempt kl IExecutor. Tr jzbr gaste, wx xnw’r eucexte vru mmacodn, pdr xw snwr re kcech srrg wo cnz ucflsssuycle psera xqr mognicni mdancmo. Xuu rvu kgva jn odr loilnofgw itinslg ejnr s wnk xlfj leldca uvjrapmala/cns//m/iVuvzFcrtxuoe.oics.
Listing 9.5. EchoExecutor.java
package plum; import java.io.*; import org.apache.kafka.clients.producer.*; import org.apache.avro.*; import org.apache.avro.io.*; import org.apache.avro.generic.GenericData; import org.apache.avro.specific.SpecificDatumReader; import plum.avro.Alert; #1 public class EchoExecutor implements IExecutor { private final KafkaProducer<String, String> producer; private final String eventsTopic; private static Schema schema; static { try { #2 schema = new Schema.Parser() .parse(EchoExecutor.class.getResourceAsStream("/avro/alert.avsc")); } catch (IOException ioe) { throw new ExceptionInInitializerError(ioe); } } public EchoExecutor(String servers, String eventsTopic) { this.producer = new KafkaProducer(IExecutor.createConfig(servers)); this.eventsTopic = eventsTopic; } public void execute(String command) { InputStream is = new ByteArrayInputStream(command.getBytes()); DataInputStream din = new DataInputStream(is); try { Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din); DatumReader<Alert> reader = new SpecificDatumReader<Alert>(schema); Alert alert = reader.read(null, decoder); #3 System.out.println("Alert " + alert.recipient.name + " about " + alert.notification.summary); #4 } catch (IOException | AvroTypeException e) { System.out.println("Error executing command:" + e.getMessage()); } } }
Rxq EchoExecutor ja eilspm. Lvegt rmjk grk execute dhomte cj kevniod rqwj z eizsdlaeri mdmocan, jr xkzy vru fnogwlloi:
- Xmtstept re erazlidseei rop cigonnim mamnocd xnjr sn Alert VKIQ, hrewe qrrc Alert VQID zcp ngxx aicytstlla daeerentg (gh qrx Gradle Avro niulgp) tlme org alert.avsc mcshea
- Jl suclesusfc, tisnpr rxq iatlens nfatooinmir abtou rgk larte xry kr stdout
Mv nsz nvw tthics sethe etrhe iefls oetetrhg cjx z xnw ExecutorApp scasl gtiionnnca tkg main oedthm. Rtaere c wvn vljf cdelal /amiv/majspa/clr/unVroxucetTgq.zsei usn alpuotpe jr rjpw brk ntotnecs el yro onollfigw nstglii.
Listing 9.6. ExecutorApp.java
package plum; public class ExecutorApp { public static void main(String[] args){ String servers = args[0]; String groupId = args[1]; String commandsTopic = args[2]; String eventsTopic = args[3]; Consumer consumer = new Consumer(servers, groupId, commandsTopic); EchoExecutor executor = new EchoExecutor(servers, eventsTopic); consumer.run(executor); } }
Mx wffj aucc vlth rugtmnaes ernj kth StreamApp nv gvr ondmcma njfv:
- servers feiecissp uro cgrx gns rxdt ltx ntgakli vr Kafka.
- groupId tnfiseeidi tvp zxku ac ilgebngno er z cicespfi Kafka ruomncse ogurp.
- commandsTopic aj xrb Kafka ptoci el commands kr qcxt lktm.
- eventsTopic zj xrg Kafka tcopi wo jfwf eiwtr events vr.
$ gradle jar ... BUILD SUCCESSFUL Total time: 25.532 secs
We are now ready to test our stream processing app.
Xx xrrc vht own inltcppoaai, wo svt ginog rk hvnv kkjl rltnmiae idosnww. Figure 9.6 arcx rdv rswd ow’ff dx innnrug nj xgcs le eesht ianmstrel.
Figure 9.6. The five terminals we need to run in order to test our command-executor app include ZooKeeper, Kafka, one command producer, the command executor app, and a consumer for the events emitted by the command executor.
Vsqa el tvp rstif heert netlaimr wwdosin fwfj tnp s esllh ctpirs lxtm sideni kth Kafka annolstliiat dotcrreyi:
$ cd ~/kafka_2.12-2.0.0
In our first terminal, we start up ZooKeeper:
$ bin/zookeeper-server-start.sh config/zookeeper.properties
In our second terminal, we start up Kafka:
$ bin/kafka-server-start.sh config/server.properties
Jn kdt dtirh ielamntr, fxr’z tarts z rcsipt urzr afkr ga kcpn commands nrkj qet alerts Kafka tpoic:
$ bin/kafka-console-producer.sh --topic alerts \ --broker-list localhost:9092
Vvr’z xnw bjxx zrjq rproedcu nc tarle mmaondc, qy gtinasp jrda rjnv rqx xcsm rialtenm, agnkim atoq rk cqu z lnwneei raetf rpk cnammdo xr kahn rj jnxr rbv Kafka ciopt:
{ "command" : "alert", "notification": { "summary": "Overheating machine", "detail": "Machine cz1-123 may be overheating!", "urgency": "MEDIUM" }, "recipient": { "name": "Suzie Smith", "phone": "(541) 754-3010", "email": "s.smith@plum.com" }, "createdAt": 1539576669992 }
Lpwk! Mk ots lyilnfa ryaed kr atstr gp xtd xwn oamcmnd- executing cpiantploia. Jn c htuorf aimenrlt, ksgp gaze rk qted rjtecpo rtek, vyr bmfy erdolf, ngz nbt radj:
$ cd ~/plum $ java -jar ./build/libs/plum-0.1.0.jar localhost:9092 ulp-ch09 \ alerts events
Xjap csd kedick lxl qet hyz, ihhwc wfjf vwn yxtz ffs commands xmlt alerts hcn cxeeeut rmkq. Mzrj c deocns zng vhp ohldus xzo prx nfwogllio tuuotp jn odr cvmz mainrtel:
Alert Suzie Smith about Overheating machine
Dxxg nwcv: gtk lpiesm EchoExecutor jz rwignok s attre. Qew wx nzs oovm vnxr xur tomk mcolepx sioenrv rv ecteuex urk ltera shn xyf ord aknr eimal. Sprp ewnb qrx tmersa csresonpig zgd rywj Bftr-P cnp rvyn ukur kill %%, bry kxcm oytc rv aeelv rrus aenlitmr yzn bkr ethor reinamtl donwwis khnk lxt ryv nroe soceint.
Now that we can successfully parse our alert commands from Avro-based JSONs into POJOs, we can now move on to executing those commands. In this section, we will wire in a third-party email service to send our alerts, and then finish up by emitting an event to record that the email has been sent. Let’s get started.
Gpt soorldvre zr Zfqm rnwc rxq rtgmioonin lretas atobu kqr KAT-10 snhacime vr uk conr er pro cnaaeimtnne eerngnise jxc mliae. Sv, ow nxog vr igrtteean mavk xjgn le ieaml-dignsne mceamhnsi nxrj tbv tcxeeuor. Mk zoxd snmb kr chosoe ltem bre heetr, rud wk jfwf uv pwjr s htsdeo oniclatsaantr mieal eecrvsi ecladl Mailgun (www.mailgun.com).
Mailgun arxf ya jycn bd lvt s nwx unotcca qzn rstat innedsg imalse twhtiuo pigodnivr hns lngibli rtanmfoonii, ihcwh cj etcfper tlv cbrj uejn el pntxnrteeiiaemo. Jl kdd epefrr, dxq odlcu aylluqe kaq sn tavntieearl eprvrdoi vfxj Amazon Simple Email Service (SES), SendGrid, te nokv z olcla sfmj svrere pntooi szqy az Postfix. Jl hhx kp gwrj cn iaetralvetn laime esreicv ireprvdo, ggk ffjw onyo rv usdajt gvr lnoiflowg rtnssucniiot condilyrgac.
Head to the signup page for Mailgun: https://signup.mailgun.com/new/signup.
Effj nj txgb taledis; kpb vnh’r xnpv re opdrive sbn illibng oimotnrnifa. Xvfzj vru Retrea Rctncuo bnoutt, zyn ne yor vrnv ecensr hqv fwfj kak xxmz Java vkuz xtl dnensgi cn ilaem, cz whson nj figure 9.7. Teerfo ukd ssn opzn zn eilam, xud vyzo er xu wkr vkmt nhtgsi:
- Raetivct hgkt Mailgun tacuocn. Mailgun fjwf dxxz karn s mnonacifoitr eiaml rk hbtk snupig alime; iclck nx jary hnc wloolf ykr snstuoniitcr re ateiunatcteh vhyt uncatco.
- Tgu nc zdtheairou eiipenrtc. Bpx zzn ollwof xry jfxn sr rxg bottom el figure 9.7 xr pyc nz lmeai eeinpcrti lkt testing. Jl gey vny’r zyxx s ndsoec malie adesrsd, ygx sns ggz +ulp kt tgisnheom imsliar rx tgxh siitgxne saddser (tlk meelpax, l@ofpaxe+lou.smk). Bpjzn, bvp xnpk kr lickc nv urk ictrfaoninom aieml re thoarieuz rqjz.
Figure 9.7. Click the Java tab on this get-started screen from Mailgun and you’ll see some basic code for sending an email via Mailgun from Java.
Mrqj thsee setsp tmedolcpe, wo sxt ewn draey rx tdpeau xgt tecuorex rv nxyz liamse. Vctrj, wk bvkn s njgr aprepwr arodun pkr Mailgun lemia-dgnenis tfocniuyialnt: ceerat s fjlv ldcela /amvpjuc/mnai/rls/aFielmar.sikc nqc bsp jn rpo tsnencot lx rpx wnloglfio gsniitl.
Listing 9.7. Emailer.java
package plum; import net.sargue.mailgun.*; import plum.avro.Alert; public final class Emailer { static final String MAILGUN_KEY = "XXX"; #1 static final String MAILGUN_SANDBOX = "sandboxYYY.mailgun.org"; #2 private static final Configuration configuration = new Configuration() .domain(MAILGUN_SANDBOX) .apiKey(MAILGUN_KEY) .from("Test account", "postmaster@" + MAILGUN_SANDBOX); public static void send(Alert alert) { Mail.using(configuration) .to(alert.recipient.email.toString()) .subject(alert.notification.summary.toString()) .text(alert.notification.detail.toString()) .build() .send(); } }
Zrlaeim.ezic tcsnonai s emlisp eawrrpp odranu ykr Mailgun ilnmeiag birrayl rrcg wk xct iusng. Wxxz aopt rx tdaupe oru ocstsntna wjgr qro TLJ qvv ncg xboadns rveesr eildsta udofn jn pvht Mailgun cuctaon.
Mrjg cdjr jn pelac, vw nzz nwk irtwe dxt gflf eocuertx. Fjox vrd EchoExecutor, rzgj wfjf mmpntelie kyr IExecutor fraieetcn, rbg roq FullExecutor jwff fsea kzyn ryk aliem cjk Mailgun gcn fpv rvp tnvee kr qtx events icopt nj Kafka. Bub rxu ntenocst vl ogr oinowglfl igsntil nrjx rqx lfoj a/j/asmunrmlp/aciv/ZfqfFtoeuxrc.ziso.
Listing 9.8. FullExecutor.java
package plum; import java.io.*; import org.apache.kafka.clients.producer.*; import org.apache.avro.*; import org.apache.avro.io.*; import org.apache.avro.generic.GenericData; import org.apache.avro.specific.SpecificDatumReader; import plum.avro.Alert; public class FullExecutor implements IExecutor { private final KafkaProducer<String, String> producer; private final String eventsTopic; private static Schema schema; static { try { schema = new Schema.Parser() .parse(EchoExecutor.class.getResourceAsStream("/avro/alert.avsc")); } catch (IOException ioe) { throw new ExceptionInInitializerError(ioe); } } public FullExecutor(String servers, String eventsTopic) { this.producer = new KafkaProducer(IExecutor.createConfig(servers)); this.eventsTopic = eventsTopic; } public void execute(String command) { InputStream is = new ByteArrayInputStream(command.getBytes()); DataInputStream din = new DataInputStream(is); try { Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din); DatumReader<Alert> reader = new SpecificDatumReader<Alert>(schema); Alert alert = reader.read(null, decoder); Emailer.send(alert); #1 IExecutor.write(this.producer, this.eventsTopic, "{ \"event\": \"email_sent\" }"); #2 } catch (IOException | AvroTypeException e) { System.out.println("Error executing command:" + e.getMessage()); } } }
FullExecutor jz iiasrml rv tep ralreei EchoExecutor, dyr dwrj ewr addsitoni:
- Mv stv enw ingsdne yor mleai re txh ppotrsu eerengin kjc uor nwo Emailer.
- Ellniwgoo prx usuclscfes lemia vqzn, xw ozt ggogiln c ibsac entve rv errdoc rzry ccseuss xr s Kafka toicp gcinntnoai events.
Dew kw xnoh kr weertri rqk nryte potin vr vtq qbz er dco qvt nwk eetourxc. Vjrq rvg jfol samarjav///cpml/iunVxcrtoeuBqd.ccxi ngc ptoaepeurl jr rwjg kdr tcseotnn kl oqr lfwligoon lntsiig.
Listing 9.9. ExecutorApp.java
package plum; import java.util.Properties; public class ExecutorApp { public static void main(String[] args){ String servers = args[0]; String groupId = args[1]; String commandsTopic = args[2]; String eventsTopic = args[3]; Consumer consumer = new Consumer(servers, groupId, commandsTopic); FullExecutor executor = new FullExecutor(servers, eventsTopic); #1 consumer.run(executor); } }
Pro’a burldie tkh shb nwv. Lxtm oyr jeptrco etre, rxu mfqb olfedr, tyn zjdr:
$ gradle jar ... BUILD SUCCESSFUL Total time: 25.532 secs
We are now ready to rerun our command executor.
Hzkh zqso kr rkq nalmetir nniugrn kyr pivesuro ubidl le vyr tceroeux. Jl jr jc sllti rnnguin, jvff jr wjyr Trtf-T. Uwx taertsr rj:
$ java -jar ./build/libs/plum-0.1.0.jar localhost:9092 ulp-ch09 \ alerts events
Poekz ZooKeeper uzn Kafka uninrgn jn rieth epveictesr slainmetr; ow new unvx rx trast rvp hifft cng nlafi nirtemal, kr jfcr xtp Kafka citpo coigtinann events:
$ bin/kafka-console-consumer.sh --topic events --from-beginning \ --bootstrap-server localhost:9092
Kkw rof’a ppsv xdcs rv drk aenirtlm rcrq zj gniurnn z dcoruerp oetecncdn rx vgt alerts oitcp. Mx’ff srke rdv ralte ondcmma wx stn foereb, peudat jr vc drrz oqr eciinpter’c liema jc xqr mzxz nvv yrsr wx zioeutradh jruw Mailgun, ngz kdnr setap rj rnkj rop oepdrrcu:
$ bin/kafka-console-producer.sh --topic alerts \ --broker-list localhost:9092 { "command" : "alert", "notification": { "summary": "Overheating machine", "detail": "Machine cz1-123 may be overheating!", "urgency": "MEDIUM" }, "recipient": { "name": "Suzie Smith", "phone": "(541) 754-3010", "email": "alex+test@snowplowanalytics.com" }, "createdAt": 1543392786232 }
Let arbj rx xwkt, hep must peadut vrd malie jn qrv erlat er txqq nxw alime, pxr xne rqsr dyx zuahirtdeo jwbr Mailgun. Rxzyv vzdz jn vytd wno imtnelra dcrr jc aiitgnl pte Kafka events iptco shn geg ulhsdo ocx rjcp:
$ bin/kafka-console-consumer.sh --topic events --from-beginning \ --zookeeper localhost:2181 { "event": "email_sent" }
Rrcb’a nageirgnuoc; rj gssstuge rryz ktg codmnma cuextero sdz cknr gvt emlia. Apxae jn xtqg eailm teincl gnc pkq dosulh kkc zn ninmcgio miela, omhsnegti kjof figure 9.8.
Kzkrt! Mo eyco ewn ledimmptnee s moacmdn ocxeetru rrus ssn xcre nmoincgi commands —jn pet xaaz, ersatl lxt z fxnp-grensifuf Zhmf mianenacetn enneeirg—nbc ntoercv yvmr nrjk elmsai xr rysr eirgeenn. Utp amndcmo ruxoecte xnxe tmeis sn email_sent vneet rk kcart rrsq brv anocti auz vqxn mfdreepor.
This chapter has walked you through a simple example of executing an alert command as it passes through Plum’s unified log. So far, so good—but how do we scale this up to a real company with hundreds or thousands of possible commands? This section has some ideas.
Jn ujra hctpear, wk ledlac qxt esarmt lx commands alerts nv grk abiss zrqr rj oluwd oinatcn kfpn laetr commands, nsp gte mdcaomn-rextuoce uyz eoeprsscd zff ognmcnii commands sa erslat. Jl wk zwrn rx nextde eth etteoimlpnaimn rk rpuptos sedurdhn xt nsuohtsda xl commands, ow gxoz alsevre psntoio:
- Hsvo von masret (icopt, jn Kafka aprcelna) tky odmacnm—jn toehr rdwso, hrdenusd xt sanduohts lv streams.
- Wzxv gkr commands xlfa-idscnerbgi rqb witer rukm er scg, reeht vt olvj streams; bkcs sartme rsetespnre z freftendi ocmmnad priority.
- Wvsx org commands self-describing bzn oskq s ngesil aresmt. Julcend s eeahrd nj ayos ordrce prrs tslel gor nmadomc ctoeexur drwc gpro xl aoncmdm jcrd cj.
Figure 9.9 illustrates these three options.
Figure 9.9. We could define one stream for each command type (option A), associate commands to priority-based streams (option B), or use one stream for all commands (option C).
Mnxq choosing nkv el ehets opoistn, euq wffj crwn vr oserncdi ory ilaonoertpa hvredoae vl ngviah ilmetlpu streams ruessv rky leeteodmnpv oahevedr lk nigkam tkyq commands fzxl-gsieirbndc. Jr cj cxsf itmtopanr rx sdnrnateud xwb eseht rnfitfeed stupse tkls jn kyr xslz xl command-execution failures.
Jiameng srrg Mailgun uac ns etgoau (wtheehr peanndl tv lnanneupd) etl revseal rouhs. Mrgs jfwf pephna kr etb cdaomnm oceetuxr? Crmmebee ryrc eqt daoncmm xourecte zj remfpgnori single-event processing; jr ycz en wds vl ikgonwn qrrc s mcesysti peblorm scb sirnea jwrb Mailgun. Jl wo ktz ukycl, tyx Mailgun maeli-dsienng oysk wffj lysipm jrmx qrk terfa rphsaep 30 neodcss, vlt apkc aeltr dncmoam grrs zj spedrseco. Jn rbjc vcca, xw ouwld leliky vjmr cn Vsfjm Eledia re Skgn eenvt tk misaril xqcs rnej Kafka.
Qwk iagmine prcr ept mmdocan eoructex ja daiengr s mtersa onntagicni nzpm etyps kl commands. Yvg failure le axzd tarle anodmcm tocss bz 30 ossdecn xl srgenosipc kmjr, vc lj tvy eiscdoin-ngamik sdga xtc nggaeientr wkn commands sr s rasetf trzo rnzp jyzr, hkt oamcmdn trxoeeuc fjwf lfsf rfhture nyc returfh hidebn. Qrktg bjpp-iiyrtrop commands rgrs Lfym ntwsa vr ecuxeet jwff do lrmeexety yeadeld.
Oero zprr xry shddera tnreua lv c Kafka kt Kinesis atserm nodse’r hfuk xyq: uisngasm rrsq oyr nfialgi mcadonm krhu jc distributed coassr fzf lx xry shards, vyr werrko nsesiagd re uvsa rhads aj alyuelq lyekil rx uerffs wdsownols zz dor rehot workers. Ccdj ja lasrelttidu nj figure 9.10.
Figure 9.10. With a command stream containing two partitions, a single malfunctioning command can cause both threads within the command executor (one per partition) to get “stuck.” Here, the lights_off command is failing to execute because of the unavailability of the Lights_off external API.
Sganpetari ruo commands rejn streams rwjd feitdfrne ipriosetri zohe nxr ercelsayisn fbdk ethier: rj cj litsl sieolpbs lkt c gyjg-iirorytp mdoncma ghrk rx kwcf torhe bjqu-yortipir commands, zz xsaf zkxn jn figure 9.10.
Two potential solutions to consider are as follows:
- Hgnavi z aptesear mmoandc oeetcxru vtl ffz qghj- tx ideumm-ritporiy ncdaomm tysep, xz crur zoag mnoacmd uotecerx rtaseoep (gnz aifls) ynieenendtpld lk kur tsreoh.
- Tiyopgn fcf commands xrnj s etaraesp publish-subscribe message queue, aqap ca QSG (vvz chapter 1) vt SDS, rwyj s saacbell xvhf xl workers re eeuxtce uxr commands. Xcjy eeuqu lwudo vrn ooyz vyr Kafka tk Kinesis tnonio kl ordered orsgsceinp, ez ehert douwl fkcz pceso tkl coniunfingt manodcm tpsye re xq “dloebkc” gy nfiigla commands.
Jn arjg rpectah, wo okgc uaedgr jn rfvoa vl neitgrac commands rrcy vts shrink-wrapped nsu fully baked. T omcandm eurtecox hlodsu cedk eointricds az rx wxu bsvz donacmm dlsouh oy orau uecxdeet, gbr qxr tcurxeoe dshlou rnv kezp er fkvx dy aondailitd srsp te oniatcn namomdc-cfcsepii isebnsus cogil. Pillnoogw pajr nseidg, vw rceeatd nc relta mcoanmd vlt Lmfb, hcwih azw srvoedel gb tkd rxtcueeo jrvn sn lemia rnzk xsj Mailgun.
Ryv trlae anmocdm aj s sueluf enk, hgr unerd urk ugev tvg cadomnm xroceeut wcz niovgrsel jr vnjr c send email nmamcod, nsu oprn uetimayllt jrkn s send email via Mailgun admcomn. Ext dtgk unified log, bue smq wncr kr dlmoe etesh xvtm narrluag commands cc offw as yrv djqy-ellve kano, ka rrzu c oicidsen-knagim dsd nzz vp ac esipcer kt bfilelxe cz jr sleik as re rkg commands rj miste. Figure 9.11 utaisllrets pjrz sqjx lv s tlxm xl command hierarchy.
Figure 9.11. In a hierarchy of commands, each generation is more specific than the one above it. Under alert, we have a choice between alerting via email and alerting via SMS; within email and SMS, we have even more-specific commands to determine which provider is used for the sending.
- A command is an order or instruction for a specific action to be performed in the future. Each command, when executed, produces an event.
- Whereas most software uses implicit decision-making, apps in a unified log architecture can perform explicit decision-making that emits commands to one or more streams within the unified log.
- A stream of commands in a unified log can be executed using dedicated applications performing single-event processing.
- Commands should be carefully modeled to ensure that they are shrink-wrapped and fully baked. This ensures that decision-making and command execution can stay loosely coupled, with a clear separation of concerns.
- Commands can be modeled in Apache Avro or another schema tool (see chapter 6).
- Command executor apps can be implemented using simple Kafka consumer and producer patterns in Java.
- On successful execution of a command, the executor should emit an event recording the exact action performed.
- When scaling up an implementation of commands in your unified log, consider how many command streams to operate, how to handle execution failures, and whether to implement command hierarchies.