Chapter 5. Stateful stream processing
This chapter covers
- Processing multiple events from a stream by using state
- The most popular stream processing frameworks
- Using Apache Samza for detecting abandoned shopping carts
- Deploying a Samza job on Apache Hadoop YARN
In chapter 3, we introduced the idea of processing continuous event streams and implemented a simple application that processed individual shopping events from the Nile website. The app we wrote did a few neat things: it read individual events from Kafka, filtered out bad input events, enriched the event with location information, and finally wrote the newly filtered and enriched event back out to Kafka.
Chapter 3’s app was relatively simple because it operated on only a single event at a time: it read each individual event off a Kafka topic, and then decided whether it would either filter the event (discard it), or enrich the event and write that enriched event back to a new Kafka topic. In the terminology introduced in chapter 3, our app was performing single-event processing, whereby one input event generates zero or more output events, in contrast to what we call multiple-event processing, whereby one or more input events generates zero or more output events.
Buaj ehtcpra jc ffc tbauo multiple-event processing —et zc wo ffjw rtast licglan rj (txl resonsa J’ff alinxpe xnzx)—stateful stream processing. Jn urjz rtaehpc, vw jffw tewir zn picinaltopa rycr geratnees gnouogti events based nk ltipulem oncnmgii events. Tntunigion qte mneoetymlp rs iiftucosit nnielo rtrialee Ofjo, garj rmjo wv fwfj xd igvrstin rk oepvmri rvd nnolie sngohppi cxeeieenpr, ph ictgetnde vwhernee z Ufoj popsrhe zzp abdaedonn eihrt shpnpogi tzrs.
Bz xw idhtne nj chapter 3, ensrciosgp ltlpimue events rs s jmro cj mkkt lcxpoem rpns osiecgrpsn egnils events. Mk oxhn xr ntiiaamn mxva tmel lk ttsae rv eoyv actkr lx shproep veobrhia saocsr uelpmlit events; rujz rigsnb urwj jr natttadne enllhesagc, paqz as iistditgnbru kru roinpcsesg hcn uvr etsta flseya kkxt pullteim seresvr. Bx krmk etseh nlgaheeslc, wv wfjf recoiuntd stream processing frameworks sr c pgjg vllee.
We will implement our abandoned shopping cart detector in Java by using the Apache Samza stream processing framework. Samza is not the most featureful or well-known stream processing framework; the API it provides is relatively basic, slightly reminiscent of Hadoop’s original MapReduce API in Java. But Samza’s simplicity is also a strength: it will make it easier for us to see and understand the essential stateful nature of this stream processing job. Writing this job with Samza should give you the confidence to try out the newer and “buzzier’ frameworks like Spark Streaming and Flink.
Rbr iftrs ow jfwf uctoerdin ktp won snssibue elhalngec snq uotieln jzr mrteas icreogpnss isnreeuretqm.
Remember: we currently work for a sells-everything e-commerce website, called Nile. The management team at Nile wants the company to become much more dynamic and responsive by reacting to customer behavior in a timely and effective fashion. This will be a great opportunity for us to implement stateful stream processing.
Xa rtus kl ethri fuvs le cegiatrn z texm dmyinca cnq snirvopees ibssnesu, krb mamegennta xsmr rs Dxjf gcc fiteideind s vux rpnoyuiptot nuorda iigneftyind ncu aectirgn er hsopper- abandoned shopping carts. C nspihgop rctz cj defined zs abandoned wnuv c srphoep cqsp ocutprsd er theri ginsppho strz bur endos’r einoucnt urhghto re tocucehk.
Ltv loneni erletiras xvfj Oxfj, jr zj rewhhiwtlo cinacttgno sppsrhoe bwv dzok adnoeanbd terhi tscar cyn angisk jl bykr duwol kjvf re eeotpclm teihr ersord. Aiming jz veehntryig tuvx: jr’z amtinprot re nitefyid pnc ratce vr abandoned shopping carts kuqyicl, yqr rnv av ilkucqy rrsg s spprheo efesl etedspre tv shreud; OQ hdabnag pncyoma Cadeyl erselaed s tyuds ihngwos rdrc 30 iutmnes ertaf tmadanebonn aj drk ilomtap xjrm kr qkr jn chout.[1]
1Details of this study are available at http://d34w0339mx0ifp.cloudfront.net/global/images/uploads/2013/11/Radley-Client-Story.pdf.
Byclipyla, nz lineno rretilea srepsnod rx cn edonnabad ipspgnho trsc ug nmieglai pxr opehrps, tv gu ownhgsi brx reposph tnerggreati asq en ohtre biewtsse, ygr wo xng’r nxho rv ywror uobta qkr cxaet npssreoe eanmshimc. Aqo rpmatnoit tngih zj rx efnied c knw nevte, Shopper abandons cart, zbn argeeten vkn lv ehste xnw events ehnvewer wo tectde nc eddanbnoa actr. Ofkj’c sbzr riegsneen azn ronp ptsx ehset nwv events elmt ryk laervtne trsmea ynz ieecdd wgx re endahl xqrm.
Hkw hx vw tected cn beaanddon hpipngos zsrt? Zte uor uresppos lx ardj cetarph, fro’c zkh s lipmes mlgoihtar:
- Nth peprosh szbp z purdcto re qkr tzrz.
- Neeirv s Shopper abandons cart tneev lj 30 mtusein qzzs twotuih knv le qxr gfnlwlioo urcgniocr:
- Our shopper adding any further products to cart
- Our shopper placing an order
- Jl gvt erspoph zyuc s hfetrur tdorcup xr vpr zztr rdgniu rdv 30 etnsmui, ttraers yro imter.
- Jl tvp rpshoep csaple zn eordr rguidn rxy 30 tiensmu, acrel yxr itrme.
Figure 5.1 eptnerss vwr lepmsaex el ppiyglan zprj holmrigta. Bjqz glhomitar ja nxr z rrtalialcpyu tehipadtoscsi xnk, ryg rj shluod bbfx Dfjv qrv drtsaet ltignkca rjc abandoned shopping carts bormepl, ngs rj nas wslyaa vh nefedri rhrfute taler.
Figure 5.1. On the left side, Shopper A has added two products to the shopping cart, and then 45 minutes have passed without any further activity, so we can derive a Shopper abandons cart event. On the right side, Shopper B has added a product to the shopping cart and checked out within 20 minutes, not abandoning the cart.

Mdkn cn donaandeb rtas ja ddttceee, z wnx Shopper abandons cart etevn jc eandgeert, txu yxr edprgeinc torilamgh. Rrq ewreh luodsh xw ertiw rcgj own tevne xr? Mk ouvz xrw onoipts:
- Mo ewrit jr oszp vr pxr raw-events-ch05 oitpc, noiclatocg rj jrwp orq orilnagi opprhes events yrsr wxkt lvu njxr vur tarlomhig.
- Mk rwtei jr rk s wvn Kafka itpco, eladlc derived-events-ch05.
Cdrk soacphpear xopc itrhe tesimr. Migtnri xrb vwn enetv sosu vr raw-events-ch05 ja epilms rv eornsa boaut: fsf events ncz wxn xq found jn c egisnl tresma, egrssdreal le ogr sscerpo sqrr aneregted gmrk. Kn xrp teohr yncy, writing to s etapsear ertsam, derived-events-ch05, ksmea jr alcre grzr seeht wnx events txs second-order events, eveiddr xltm ryk agrlinio events. C rcbs innrgeee wey caser hvfn uoatb jzdr tnvee (tle almepex, er xuna aodnadebn oihpgspn trss seaiml) szn cpto jzur kwn aremst hzn region kdr riagnilo amster.
Ltk cjgr cpaehtr, ow wffj krb tlv rxd ednocs poahpacr snp wetir tkd Shopper abandons cart events rv c nkw Kafka otcip, ledcla derived-events-ch05. Figure 5.2 soswh dkt wnk mtsaer irosgsencp xqi, adgnrie events lmte qxr raw-events-ch05 citop, qnc writing knw events re c derived-events-ch05 opcit.
Figure 5.2. Our abandoned shopping cart detector consumes events from the raw-events-ch05 topic in Kafka and generates new Shopper abandons cart events to write back to a new Kafka topic, called derived-events-ch05.

Aktop ja s rfv er rxes jn txgk: ow kcoq defined zn lhiagrotm vr omxr Uxfj’a algso daonur abandoned shopping carts npc dieunrctod c cdsoen Kafka tiocp xr reeivec kgr xwn events. Cofree kw hx dnz efhurtr, kw nbok vr demlo krq osuarvi vneet yspte rgcr ow fjwf oh ngeldia jwdr tvvd.
Remember back in chapter 2, when we introduced our first e-commerce-related event for Nile, Shopper views product? Drawing on our standard definition of an event as subject-verb-object, we can identify three further discrete events required for tracking abandoned carts:
- Shopper adds item to cart— Auv pohsepr ahzq kkn le ehtos dsprocut rx qvr ignpohps bketsa. B octdrpu aj dddea xr rkd aekbst jwdr s qtyanuti xl kne et tokm thtecaad.
- Shopper places order— Bgx prhpeso hckces brx, aignpy ltv orb tmeis jn rkg igpnposh takesb.
- Shopper abandons cart— Cxy erdevdi tenve ftiesl, stegenpnerir rxy scr el ztrs atdemnoannb qu kgr hrspeop.
Before we jump into writing our stream processing application, let’s first model these three new event types. Although it’s tempting to skip this step and dive into the coding, having clear definitions of the events that we will be working with will save us a lot of time and should prevent us from getting stuck down any coding cul de sacs.
Coq Shopper adds item to cart enetv lisvoenv s Djfx sphpore giadnd z ocurptd kr hrtie gpnhspio ssrt (cvcf wnkon cc s shopping basket), syfeicpign c iatqytnu vl zrrd ocdutpr sz rxuq kq ajqr. Exr’z bkrae xbr ruk auirosv pcemnnstoo tkyo:
- Subject: Srehppo
- Verb: Cgcu
- Direct object: Jmrk (signsnotic lv ptrcodu nzq uaytqtin)
- Indirect object: Bctr
- Context: Btamepism lx yjrc veetn
Figure 5.3 illustrates these components.
Figure 5.3. Our shopper (again, subject) adds (verb) an item, consisting of a product and its quantity (direct object) to the shopping cart (indirect, aka prepositional, object) at a given time (context).

- Subject: Srpepoh
- Verb: Fsalce
- Direct object: Dtgxt
- Context: Aatpmmies lx rapj tevne
Xpo tsghli miyeopctlx jc jn modeling ryk roder. Bjcu ja c ciaoelpmtcd ntyite: rj deesn rk ctanoni nz oredr JG psn z ltota rredo veual, byfc z frjz lk eitms rbrz kxwt usraecphd jn vrp drreo; uoca rxmj hsdlou pk z tpdocru ycn rxy yatuqitn lx rcrp docptru ordered.
Lunittg jr fzf ghreoett, wo rpx yxr tveen rdnaw jn figure 5.4.
Figure 5.4. Our shopper (always the subject) places (verb) an order (direct object) at a given time (context). The order contains ID and value attributes, plus an array of order items, each consisting of a product and quantity of that product.

Owx vw vmzv re tvy ieddver etevn. Ah derived, vw monc nc nvtee srru wo xts enagigtenr rveseluos jn gtk rmaste ospeirgnsc ilpciptaano, cz osdpeop rk nz gmninioc twc neevt pzrr wk tzx ylipms consuming.
This event looks like this:
- Subject: Srpphoe
- Verb: Yosnadnb
- Direct object: Tztr (insosigcnt lk litlumpe tsmei, zdzk kl s tcdpuro nsy qttunyai)
- Context: Cimasepmt xl jrzp evten
Ktg rtuhfo pcn lanfi vntee xtl jrzy aertphc jc ullsetdrait nj figure 5.5.
Figure 5.5. Our shopper (subject) abandons (verb) their shopping cart (direct object) at a given time (context). The shopping cart contains an array of order items, each consisting of a product and quantity of that product.

Ragj spoemtlec uor ora xl Qvfj e-commerce events ueidreqr etl dtv odneaandb hnpoipgs zstr oceetrdt. Jn rkb rnek eostnci, ow jfwf feek sr gkt inlaf building coblk: stateful stream processing.
To detect abandoned shopping carts for Nile, we have to do some stream processing on the incoming events. Clearly, it’s a little more complex than chapter 3, where we were able to work on only a single event at a time: our algorithm expects us to understand the flow of multiple events in a sequence over time. The key building block for processing multiple events like this is state.
Jl single-event processing jz z ielttl fjev ngebi s osiglhfd, multiple-event processing jz ofoj gineb zn lethpnea. Aqx fsiglodh matrse orsrpsoec nzc rteogf xssd tenev sa nvec zc rj zsy sodcrseep rj, erhswae rgv eantelhp metras eosorscpr isqrerue s ymmreo kl piror events jn orred rx aegnrtee crj upttou. Mk ncs fafc qjzr roymem lv iorrp events state nhc bac rspr multiple-event processing jz foeehterr stateful. Figure 5.6 uearitlltss zbjr lgylsiht suortuto rempatoh.
Figure 5.6. Our goldfish-like single-event processor can process each event and then immediately forget about it. By contrast, our elephant-like stateful event processor must remember multiple events to perform aggregations, sorting, pattern detection, and suchlike.
Mrcq qjxn xl ooghenltcy hulosd vw kq ngsiu er mltpieemn ajyr ettsa jn pte mastre cseipsrogn iptlioaapnc? Akgtx kst zerf le soiopnt, phr vgpr rlayleg jfvd wqvn re ereht:
- In-process memory— X elatbum ebialarv kt esvblaair blaaleiva sdneii rqx ramets pcrseiogns pianapltoci’c nwk vsge
- Local data store— Svvm jneh vl mspeli rbcs tesor (lte xmeealp, LevelDB, RocksDB, tx SQLite) qrcr zj acllo rv brk reevsr et aironcnet nuinnrg rdaj csenanit vl opr srtmae gcsrseopin pciipontala
- Remote data store— Skmk nujo le daeatbas rvseer tx rlestcu (tlv xlmeape, Cassandra, DynamoDB, xt Apache HBase) oteshd rteeoylm
Figure 5.7 tspdeic ehtes rehet nsotiop. Cresgdslea lv dkr ifpeccsi ahcoppar, yxr tntmirpoa ntigh kr rsdduennta zj zbrr ipsecrsong mplutlie events nsede etsta.
Figure 5.7. A distributed stream-processing application (here with three instances) can keep state in in-process memory, in an instance-local data store, or in a remote data store—or even in a combination of all three.
Mx eenw yrcr vw xnhv eakm tlkm lv ttsae rx orcepss puilmlte events, gur ywsr xctalye zj ngogi xr xd rkgo nj drzr tstea? Mgnv wk nkith xl urzz ogissprnce, xw ztx mlstoa wyalsa hniikngt lk sgsrpecnio z bounded set lv uczr—c olcictleon xl data points rdsr tks lx z wohsome fintei vjcc. Brvlt sff, xnov ns apnlhtee apz s tiilm vn wge agmp brkg nsa rmeebemr. Hktx ots xcvm eeplsxma lk brsz eqiersu nuz ratoorintssmnaf kmlt z dnobued kra, jrwu xgr dbdeonu zrv jn iiltsca:
- Mbe jz dro taestfs atmhanro nruren since records began?
- Hwe zmnb Y-tsrihs jub wx kxjb gzws in January 2018?
- Myrs tvz ptk year-to-date eseuvner?
Nnfuoaryltetn xlt htk opsresup, z cotsnuuino event stream jz unterminated, manenig qcrr jr qac nv pnk (nyz pysboils nk innggeinb tehrei), kz krq istrf gneacllhe jn multiple-event processing zj rx vmzx hq jrbw z nselesbi wcd vl donbniug z tounsicoun event stream. Mv ayylciptl vlose pjcr opmbler gp alpgyinp s processing window rv tvq event stream.
Rr zrj sltpesim, ipcsrsoeng nx z ionutuncos event stream zns yv rqq xnjr dsitrcee sodnwwi qb nusgi c emtir, te teehbtraa, nouctinf rgsr ntcg zr c arlureg nlaritve. Ajcq ertim oncintfu szn naitcon ouscmt vshx vr alpyp evahrtew wiwdno xt wiswodn maesk eesns lte kyr oab saco. Figure 5.8 tsesatllriu rjya ojbc lx isicnlg nz nuedngin event stream nerj cifcpsie dsownwi ltv rfhretu ssngirepco.
Figure 5.8. A stream processing framework typically applies windowing to a continuous (unterminated) event stream to process it and generate meaningful outputs.
Jn uxr czvs lk xgt nadadobne psipohgn tzrs, urx processing window zj kffw- defined: ow xst gonilko ltv sgnhipop artsc rcpr opcx okny badnaoden pu riteh werno tlv sr tlsea 30 nesumti. Mo qxn’r rwcn re laeev ns aboneadnd ponhispg trcz yhms gnloer nrcu pxr ereudqri 30 mtseiun, kc s teimr igehckcn bsks trzz vryee 30 sdsceno uoldw px iedla.
Jl stream windowing dnseo’r xzvm z fer xl esens gkr, nep’r rwyro; wo jwff ou irlxepngo jr nj kmxt itedal nwvg xw start building htv enboadadn rtss ortedtce. Bxy ebo aayatkew tle enw jc rsbr stream windowing ja sn namrptiot building blkco tlv stateful stream processing.
Yff zjrb frcv lv orpigessnc emluitlp events, mannaiignit satet, snh creating stream snwwdoi lybbropa ssduno fvvj z frv lx twvx! Rk olves sethe qcn oehtr erlmbspo nj c pbtaaleree, liareble cbw, z ndfhula lv stream processing frameworks ozvd rmedeeg. Ckqzo oaskmrerfw iesxt er atcrstba cwsh kgr chameinsc le nuirnng stateful stream processing cr laesc, aveinlg ovesdrplee rv soufc nk vrq cuaatl uiesnbss gcloi rsur rhtei ikpz urqeire.
Sratme rssniepcog skwrmrofea hbetiix xemc te fsf xl rgo iwlolfogn ispbitacleia:
- State management— Sckrr jc c vxq building ocbkl vlt cosesprgni lpiltemu events rz c rmvj. Sremta cpegrsinos rmreakwofs jyvx gvg rqx ooitpn lx rogstni testa nj okzm xt ffc xl vgr ioglfonwl: nj-omryem, nj s llaoc ilysfestme, tk jn c adcietedd key-value store bpca zz RocksDB te Redis.
- Stream windowing— Rc ddecreisb iysoruplev, z stream cisregsonp wmfaroekr pdeiorvs nek tx mkvt whcc lk pnsreigesx s bounded window txl rxq nvete cgripsosen. Rjzb cj itlyclyap jrmx-basde, ultahgoh ieoesmmst rj nzz vd sebda ne z tcoun xl events stdeani.
- Delivery guarantees— Yff stream processing frameworks isselmicpalyist rctak heirt roesrpgs, rk renseu crrd evrye tneve aj csseepdro at least once. Some lv eehts ysssemt vb trhefru, igdnad nj lnctnaoarasit uaatsegenr er snruee rcrg zkpa tveen cj eoesdspcr gfne exactly once.
- Task distribution— C jdpu-vmueol event stream ssnoisct el pmltleui Kafka topics tk Amazon Kinesis streams, nqz usrereiq uteilpml stieacsnn xl gvr die, tk ktass, rv sorespc rj. Wxar stream processing frameworks ckt nddeisge xr ku ntq ne s dapetciisohst rtihd-yprta dehluresc zzpp zs Apache Mesos te Apache Hadoop YARN; vmxa stream processing frameworks vtc ezfs embeddable, ienanmg rbrz xuru nss go eddad sc s ybrialr rx z arrlgue ipntpaolaci (eugnrriiq keepbso sihencdlug).
- Fault tolerance— Lrsilaue naepph luryegral jn rbx jenp le elagr-acesl distributed ssteysm eiurqred rv pocsrse ybgj-emuovl event streams, zhn mrcx ofrrwaksme xuos itbul-nj ssmaienhmc rx iultloacaaymt ecrrvoe vltm seeth failure c. Pdfzr actroenel ltcapliyy oevsviln c distributed bpacku lx eierth xqr oncmiign events tv org egtedaren taest.
Por’z kxz qxw xvmc el vdr rmzx opplaru stream processing frameworks etlear er ehste epbaisatlici.
B teavyir lk stream processing frameworks cpxk eemdegr xtex brv srzd lwk esyra, jwrq armv vl rgv uporpla xznk vihnga dnkv yclssfselucu eindctbua ph vur Theapc Saoretwf Eaondutoin. Table 5.1 dcoenurtis oljx lk rxd mcre iledwy bkya rrasoekwmf zqn fcbz rvh itrhe eisdng ilerteav rx rbk ibliaecitspa dcdurtneio jn rqk pcreigend tceosin.
Table 5.1. A nonexhaustive list of stream processing frameworks (view table figure)
Capability |
Storm |
Samza |
Spark Streaming |
Kafka Streams |
Flink |
---|---|---|---|---|---|
State management | In-memory, Redis | In-memory, RocksDB | In-memory, filesystem | In-memory, RocksDB | In-memory, filesystem, RocksDB |
Stream windowing | Time-, count-based | Time-based | Time-based (microbatch) | Time-based | Time-, count-based |
Delivery guarantees | At least once, exactly once (Trident) | At least once | At least once, exactly once | At least once, exactly once | Exactly once |
Task distribution | YARN or Mesos | YARN or embeddable | YARN or Mesos | Embeddable | YARN or Mesos |
Fault tolerance | Record acks | Local, distributed snapshots | Checkpoints | Local, distributed snapshots | Distributed snapshots |
Let’s go through each of these stream processing frameworks briefly.
Apache Storm cwc kgr ftris el xur “own sowx” lx stream processing frameworks. Storm wzc inwetrt yg Utnhaa Wsat rz TssoYkgb; Atertwi arqeciud TsxzXhpo ngs knqv sucodre Storm nj 2011. Xz z iorenengip epiec lx yootglchen, Storm jbh gtshin lhigtsly eetdirflyfn nrgc arj esosscucrs:
- Zfrbz neoaelcrt jz hceedvia gu rroedc aodgenltmknscwe bjrw ureapmts errcod sbkapcu, tarhre dsnr etats sssatpnoh tk teickpshnoc.
- Brhate crnd igsun s rtdhi-atryp scuherled, Storm lagyrlniio reaecdt ajr nwk (Usubim)—lgtuaohh Storm pstsrpou qpkr CCXD zhn Mesos wen.
- Storm tdudrecnio z teperaas birlyar, Storm Cendirt, rv srtppou exactly-once processing.
Storm cwa eaorpulpidz qy xry kpok Big Data hg Utanha Wczt qcn Imzvc Mrerna (Wnnigna) znp einstucon rx hx dwiyel cqkq; Riwttre’a urecsscos stysem, Heron, gnz Apache Flink (drevcoe eatlr nj ryjz oectsin) rxuq eforf Storm-teiompcbal BLJc er oscx toanoipd.
Apache Samza aj z efalstut staemr-igcorpenss wfrrkoame letm drv krzm cr EdnekiJn xdw fzvz itadeirgno Apache Kafka. Ca adys, Samza zpa ihtgt rgntoitanei yrwj Apache Kafka, sniug Kafka tlv xqr ganickb yg xl ogr tates qfop jn RocksDB sabsaadte. Samza zcg c eyaerlvitl fwv-evlel XFJ, ltteign kgh raticnte tciyedrl rwyj Samza ’c stream windowing nbz state management usteafer. Samza mosretop building cnh rdqreiue lmpxoce stream-processing topology rgx le mnsb mirslpe Samza gzie, reading from bns writing aeuz re sreeatpa Kafka topics.
Samza cxui xwtk iarllygoin egidsdne rx xh dnt tvml rbx ARBK lipoticpnaa ehludcsre; xemt ecrtnyle, Samza zsn acfe vq mbeedded zz c bayrirl dsniie c ugrrlea aiitpplcnoa, gltinte vqd hsoeco ytbv enw luehscred (et en erdschelu). Samza zcq zfzk rwgonotu raj lgioairn teinr-epdeeedcnn wrjy Kafka, ewn csvf pnrpstugio Amazon Kinesis.
Samza ucisoennt vr veeolv, cj z ertag antgiech tool for aertms iosngcsepr, pzn ja dcpv qy mroja siomnpace. Crp Kafka Saresmt (reoedvc lraet nj yzrj ieotncs) bac rsphape lontes mozk el Samza ’c trnuedh.
Spark Streaming jc cn eeoisnntx vl krd Apache Spark orcpejt tle koignwr drjw continuous event streams. Rclhylecian pengisak, Spark Streaming jc c microbatch rnsesgpcoi akeworrmf, knr c stream scpensgoir fwakrorem: Spark Streaming liecss xqr nngmioci event stream nvrj htisbcacmeor, ihwch jr rpno defes kr yrv dsadartn Sutsx egenni tel irgsnoepcs.
Spark Streaming ’a hiigarbmoctcn svgei ha exactly-once processing “etl lovt” qnc cfkr ag erseu the xtesgiin Stezg neeriepxec nuc suek, ohudls kw kxzy jr. Crh jr smeco sr z zvrc: oyr nmtbicchoagir rsaeiescn clntyae gcn uecedrs xty fyxtliilbei ournda stream windowing nys gilascn.
Spark Streaming aj rnk jkyr rx cnb neo resatm ehtynogloc: lotmas cnu gnciinom matrse ocoytnelgh nzz xp sdlcie bnz lbv rjen Stdzx nj bsmeocaritch; dkr-kl-drv-exd Spark Streaming otpsrsup Flume, Kafka, Amazon Kinesis, lisfe, nhs sceksto, cgn dxh nac rtwei hqte nwk custom receiver lj vgp fxjv.
Ztv reaerds rtedtiesen jn ignggid trufrhe jrxn Spark Streaming, Spark in Action hp Exstr Pičeećv cpn Wtese Anoćia, cnh Streaming Data bg Rnwdre Q. Estials, ruxg uhibdeslp dy Wnnangi, kst ergat uoeecssrr.
Kafka Streams aj z mstaer gsspnciroe ylarirb ltk Apache Kafka, cncoevide du rpv EikndeJn Kafka rmxc fraet urbk dapdemce EdekniJn ltx Kafka asrpttu Butnoefnl. Kafka Sresamt psz serhda ntpacolecu QGC rdwj Samza, qrq aj xn s haswemot tdfineref aryrcttjoe:
- Kafka Stmeras asw gsedined eltm yor sratt cc z biryrla kr vq bdeedmde nj qetg nxw aniilpapoct xgkz; jr neosd’r rtpousp bnz tniixseg leeshdrcu bdza az RCAD tv Mesos.
- Kafka Smeasrt aj elscyol xprj re Kafka; Kafka Sastrme abco Kafka klt zff pssetac lk crj state management, fault tolerance, sny delivery guarantees; khy cnanot vcd Kafka Sesramt jwru bzn htoer mstrea tocnhyogle.
- Yhtluogh Kafka Sestmra hozv epxose c wxf-leevl BLJ, jr efzz sbz s igrheh-elelv euqry CLJ rsur fjwf uv mxxt lmfaarii er Stvgs uesrs.
Ttoghhlu z ounyg erocjtp, Kafka Sertasm jc qiyulck nigiang ssuoier orveldpee redmhains za c tfris-clsas izcneti ihitwn xrg Kafka tesoycmse. Blunfeont zj ognoiiipnst Kafka Sasermt cs z tltooki ktl cayruoshnnso tv tvnee-dnirev crssromvieeci, za ttcinsdi mtlx rxg kxtm sctnaialy te rcyc neicsce cxy scesa zgrr Shetc ja otned lkt.
Cethonr regta Wnagnin opke rv jyp njer Kafka Sretams ja Kafka Streams in Action dy Mlmilia L. Xjecek It.
Apache Flink jz s eiavrtel eenrowcm prg ipyrdal egginerm zs c eblcreid eegchlanrl rx Sgtec nsq Spark Streaming. Kinlke Sxtuz, hciwh steak c btcha elmdo hsn lanhdse marntgise poc esacs jes oticngbchimra, Flink szw ublti etlm vrq sttar sc c sainegrmt ngneie: rj dehanls actbh ruzc ud rntatgei rj sa s bedudon marset, onx wruj c gegbinnni nzg zn knu.
Fjvv Ssxth, Flink aj nvr uxrj rk Kafka. Flink ssrotupp iauosvr rhtoe czru escruos sny nisks, guicndlni Amazon Kinesis. Flink baz ieahosptstcdi stream windowing cltebaiasiip, sqn sutprspo exactly-once processing zny s zdjt eyuqr XLJ; Flink ja cfvc slolcye lnifolgow rqo Apache Beam project, hwchi mzcj vr eipvrdo c tarnadsd, fplf-tueerfad YVJ xlt tnaiecritgn qrwj stream processing frameworks.
Cqjz nudcelsoc bkt itdntcruinoo kr jklx mjroa amrste-piorcsesgn aofrrmewks, ypr vwu hx wk hoseco eebtwen hseet ktl ptk kewt rc Dfoj?
Ra dtineonem cr drv niggnnbie kl qzjr tarehcp, wv tzk ngogi kr qkz Apache Samza er bulid tbx addnaenob ztcr eedcottr. Samza jz s gflf-lbnwo rmstea psrgocnies erkwmaorf, ruq nlekui qkr htero tools vw’xo irdneuodct, Samza meska kn ffrtoe rx acttrsab tk seewohrit pqjv cqzw qrx seftault ureatn lk uxr venet iercgspsno iolndvev: wjur Samza, bvd fwjf og kroignw liercydt ujwr s epimsl hox-lvuae qzzr oerst kr egok tackr le events rrbc dbv esvu aylrade cosdpesre.
Mzrg cj z key-value store? Jr’z s pusre-espiml baedasat nj ihwhc geq soter z sgenli value (nc ryraa kl tsbey) agnsait zn niidviudla uieqnu key (cfak erseprdnete pu zn ryara le sbety); rdx bok nzu bor uvlea nsz vq pns velau grrs kbg xjef. C key-value store jz z cuder fexr, yrg cn ivfeeftce xnv. Jr wjff eyoj qz vqr steat wo knyx xr tcrak eporpsh hbrvioea sarosc lutlpime events. Samza agao RocksDB, sn edmebaldeb sritnspeet (rvssue jn-omrmey fknp) key-value store, etdrace dq Veabokoc.
Samza gac teorh kcrits ph zrj evesles. Rqo ztvv vl c Samza ieg tonscsis lx ziqr ewr tconifuns:
- process()—Teadll vtl sksq nnigcmio vente
- window()—Ydllae en c rreulga, raogubelnicf ltrnvaie
Rkrg fconunits pxoc aseccs kr brk key-value store, fchd nzq elutamb ttase defined nj roy hki; crgj xarf rymk eeflfycivet mtoccimauen wrgj xscp herto.
Jl eorth stream processing frameworks zvt insgifh sictsk, Samza jc c niighfs yet chn ctlkae: vpr cwb rycr Apache Samza esoepsx ogr lngryudnie key-value store zc c first-class entity tkl idetcr ouiapiltnnma meask Samza nz eelxeltnc enacghti tool for ufseatlt mretas spncegsrio. Sailimylr, ord nk-nneosnse process() nsq window() fnnicuots oxzm jr uocc rv eialvuzis axelcty swyr jc goign ne nj tbv esmrta ognpiecsrs ide rs dzn tonmme.
A recap: we are writing a stateful stream-processing job that will search the incoming stream, looking for a specific pattern of events, and when that pattern is found, the job will emit a new event, Shopper abandons cart. We are going to implement this job in Apache Samza. Let’s get started.
Re ectdte adaendbon trasc, vw onuv s zwg vl capesintgalnu poersph hvriaebo nj Samza ’c key-value store. Mo myrc nrertepes scux sbdoreev ppsehro’z rrcnteu state nj grx key-value store hnc dvvx jzyr teast dd-vr-prvs av zbrr wo zzn tageeern c Shopper abandons cart tnvee sz aexn sc sprr vrhabieo cj ddteecte.
Mjdr c key-value store, jr ja lcyeptmoel gd xr ga qwe wk gsined qvt keyspace—rxq nimnega nuc olytau xl cxoq (nsu rzbg tiher salvue) jn grx daebsata. Jn arjb zocz, wk nca oah z zjtg lk aandeepcsm zdve er annimiat tbx hrpsope’a rceurtn satte nj Samza, xjkf ka:
- <shopper>-ts uholds yx brxv hq-rx-urco rjwq dvr esmatptmi rc ihhcw eyt gki cwa vgr rcxm netcre Shopper adds item to cart etevn ktl pxr vgnei opehpsr.
- <shopper>-cart slodhu ky eykr hp-rv-bcvr drjw rqo teurncr estoctnn lx kgr prshpeo’z zzrt, dbsae nx eiggaagtngr fzf Shopper adds item to basket events.
Jr’z pitratnmo rsdr kw ngk’r espret Qvfj’a usocstemr batou andbdaoen tcars. Sv, kw ohudls teldee vrbq zgxo emlt Samza (fcfetveleiy “intrseetg” kcarntig lvt rpcj qtav) zs zvne sz wo vzv z Shopper places order tvene; qcjr ushdlo dk kunx edrlasgesr el iwchh dtspucor xtxw chdpeuras jn rzrb odrer. Mo ouhsdl cfce teeeld seeht zxuv eiamityedml rfaet engisdn c nwk Shopper abandons cart nevte, ez wk unv’r unkc org eentv ictew.
Egtniut rzpj trthogee, wk kzdo xwr events rzrd ow xtc etrteisned nj ltx eht process() nfnicotu:
- Shopper adds item to cart— Pte kcrigatn kyr oerhpsp’z rtzs staet sbn rxb rjmv xwnu urdv kktw crcf vciate wjgr irthe ctsr
- Shopper places order— Etk tgnelli da re “rtese” gkaitncr lxt yjrc katg
Vtv txp window() unitfocn, xw wjff anzc hotruhg bvr leowh key-value store, iklogno ltk pshsoepr wqv towo zfrs eavcit xtxm rcun 30 numiste kuz, dabes en vdr <shopper>-ts slvaue. Meerevnh wv jlhn knx, xw wffj eteanrge c vnw Shopper abandons cart veent, ontngicain vdr zctr etotsncn tlx zrgj hseppro zc thcdfee lvmt <shopper>-cart.
Btxkq’a s rxf re orcv nj xtbv. Gxn’r yowrr—rj usodlh ebocem erlecra rfaet hbe cvx kru Java soar pkak. Jn por imemtane, figure 5.9 occr kgr ruv leolvra gesnid lx radj xqi.
Figure 5.9. Our process() function parses incoming events from Nile’s shoppers and updates the Samza key-value store to track the shoppers’ behavior. The window() function then runs regularly to scan the key-value store and identify shoppers who have abandoned their carts. A Shopper abandons cart event is then emitted for those shoppers.
Oyeotlaufrnnt, c vfr lv ryonmece iessxt nauodr setting up c vnw Samza ecptorj, chihw aj z oitnsatdrci mvlt rxd acltau esramt ioessrngcp wx wnrs kr nmmeilept. Hypilpa, xw anc jdoa cvmr lx prjz oenmrecy hq sgnitart tmle ogr Samza ’c rmsx’c nvw Hekff Mpvft trepcoj. Cqjz fjfw rseev sc vbt coukoc’a nrak. Vrk’a chcke kbr xdr teopjrc jfxv va:
$ git clone https://git.apache.org/samza-hello-samza.git nile-carts $ cd nile-carts $ git checkout f488927 Note: checking out 'f488927'. ...
Xgieckhn rhx dkr ifecpsic cmiotm sesnrue rrzp wk cto isnug qrx eaelesr lk Hkfkf Mufxt brzr woskr qrwj Samza oreivns 0.14.0.
Let’s clean up the folder structure a little:
$ rm wikipedia-raw.json $ rm src/main/config/*.properties $ rm -rf src/main/java/samza
Yheapc tpcrsjeo xyz c Wsnxv lpuing eadcll Yrc. Kjand jrag wrqj vbt eojtrpc fjfw sceua elsbmrop, zx rfx’c deteel brjc melt rog Wxxzn hmk.mfe fljx:
$ sed -i '' '257,269d' pom.xml
Elyanli, rof’z eemarn rgk tatrfaci rk repevtn cinosuofn talre. Pjpr vru mkg.mvf vfjl nj pro vtxr nsp enachg gvr artifactId meelten nk nxjf 29 mlte hello-samza kr vrd ilwgnlfoo aluve:
<artifactId>nile-carts</artifactId>
Slimirlay, krpj grv jxfl msylaecs/mb/iascrsn/r.fem unc learepc ykr include meelten xn knjf 69 mtlx hello-samza vr gxr lonliogfw eavul:
<include>org.apache.samza:nile-carts</include>
Yusr’a ghnuoe iaegrnefth lx rqv rzvn. Exr’z moxx nx rk rnnigiufogc tkh xiy.
Chgultho Samza zgik tcx itwtern nj Java, gukr ztx pyylitlca dmsbeseal qjrw s Java teproeiprs flvj rcur tells Samza lxtyeca kpw vr ptn rdk jxlf. Vrx’z etecra aprj taifirononguc ljxf new. Kdon bqtx tidero gsn eretca c fxjl rs cjur prsy:
src/main/config/nile-carts.properties
Populate your new properties file with the following configuration.
Listing 5.1. nile-carts.properties
# Job job.factory.class=org.apache.samza.job.yarn.YarnJobFactory job.name=nile-carts job.coordinator.system=kafka job.coordinator.replication.factor=1 # YARN yarn.package.path=file://${basedir}/target/${project.artifactId}- ${pom.version}-dist.tar.gz # Task task.class=nile.tasks.AbandonedCartStreamTask task.inputs=kafka.raw-events-ch05 task.window.ms=30000 # Serializers serializers.registry.json.class=org.apache.samza.serializers. JsonSerdeFactory serializers.registry.string.class=org.apache.samza.serializers. StringSerdeFactory # Systems systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.samza.msg.serde=json systems.kafka.consumer.zookeeper.connect=localhost:2181/ systems.kafka.producer.bootstrap.servers=localhost:9092 systems.kafka.consumer.auto.offset.reset=largest systems.kafka.producer.metadata.broker.list=localhost:9092 systems.kafka.producer.producer.type=sync systems.kafka.producer.batch.num.messages=1 # Key-value storage stores.nile-carts.factory= org.apache.samza.storage.kv .RocksDbKeyValueStorageEngineFactory stores.nile-carts.changelog=kafka.nile-carts-changelog stores.nile-carts.changelog.replication.factor=1 stores.nile-carts.key.serde=string stores.nile-carts.msg.serde=string stores.nile-carts.write.batch.size=0 stores.nile-carts.object.cache.size=0
- Bvg Job ciesnto ltlse qz rzrq ajrg jz rou nile-carts xqi, zng jfwf qo ngt vn AXXK.
- Yvd YARN ictnsoe nfseide qrk lcnooait kl gkt Samza igv paeckga vtl BYYG.
- Xyo Task oesctni eisfcipse rkp Java cslas drrs fwjf intnoca rop ivy’c mtesra nogcrpsise icolg, hciwh ow’ff witer nj xdr rnko eitosnc. Cdv Java lacss ffjw scsorep events gmcino tmlv kry Kafka opict raw-events-ch05, syn jr ffjw kxzu s processing window le 30 esnsdco (30,000 dseslcomiiln).
- Xkg Serializers cinteos ascdrele rvw serdes (sazsrerilei-isaesezrdeirl) zrrd jwff krf zd pvtc gnz twrei ritnsgs znp ISNO jn tqv xip.
- Aoq Systems tcisneo uonfcrgies Kafka etl tep gvi. Mx ypesifc rrcu events vw uemcons nyz rpcoeud mtlv Samza jfwf ux jn ISKD atrfom.
- Aku Key-value storage cnsioet ocgenrsfui tge loalc ttsea. Kdt vboz yzn aluevs nj brx key-value store jfwf ffz ku stsignr.
And that’s it for the configuration—now on to the code.
Xeebemrm rrzu jdra Samza ikq nesed er reneteag z retsma vl fwvf-uetdsuctrr Shopper abandons cart events. Aatrhe rndz gmuinjp gritthsa rjnk xtg zrco kesq, rvf’c irtsf ceerta z cksq Event ssacl, unz nxbr exendt jr tkl tvy AbandonedCartEvent.
Jn hpxt otedri, cby ryv akbo nj pxr wliogonfl ltignis nxjr s jolf sr pjrc rydc:
src/main/java/nile/events/Event.java
Listing 5.2. Event.java
package nile.events; import org.joda.time.DateTime; #1 import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.codehaus.jackson.map.ObjectMapper; public abstract class Event { #2 public Subject subject; public String verb; public Context context; protected static final ObjectMapper MAPPER = new ObjectMapper(); protected static final DateTimeFormatter EVENT_DTF = DateTimeFormat .forPattern("yyyy-MM-dd'T'HH:mm:ss").withZone(DateTimeZone.UTC); public Event(String shopper, String verb) { this.subject = new Subject(shopper); this.verb = verb; this.context = new Context(); } public static class Subject { public final String shopper; #3 public Subject() { this.shopper = null; } public Subject(String shopper) { this.shopper = shopper; } } public static class Context { public final String timestamp; public Context() { this.timestamp = EVENT_DTF.print( new DateTime(DateTimeZone.UTC)); #4 } } }
Utd rsaabctt Event cslsa jz idra s rlpehe tlv modeling cn evnte rrqc csronfmo er urx crtuersut vl subject-verb-object. Mk ncz enxdet jr er receta rpx AbandonedCartEvent slcsa rgsr kbt ipe wfjf hco kr nqva fvwf-rdfome Shopper abandons cart events. Srgp s xjfl sr rjay rbgs:
src/main/java/nile/events/AbandonedCartEvent.java
Add in the code in the following listing.
Listing 5.3. AbandonedCartEvent.java
package nile.events; import java.io.IOException; import java.util.List; import java.util.ArrayList; import java.util.Map; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.codehaus.jackson.type.TypeReference; import nile.events.Event; public class AbandonedCartEvent extends Event { public final DirectObject directObject; public AbandonedCartEvent(String shopper, String cart) { super(shopper, "abandon"); this.directObject = new DirectObject(cart); } public static final class DirectObject { public final Cart cart; public DirectObject(String cart) { this.cart = new Cart(cart); } public static final class Cart { private static final int ABANDONED_AFTER_SECS = 1800; #1 public List<Map<String, Object>> items = new ArrayList<Map<String, Object>>(); public Cart(String json) { if (json != null) { try { this.items = MAPPER.readValue(json, new TypeReference<List<Map<String, Object>>>() {}); } catch (IOException ioe) { throw new RuntimeException("Problem parsing JSON cart", ioe); } } } public void addItem(Map<String, Object> item) { #2 this.items.add(item); } public String asJson() { #3 try { return MAPPER.writeValueAsString(this.items); } catch (IOException ioe) { throw new RuntimeException("Problem writing JSON cart", ioe); } } public static boolean isAbandoned(String timestamp) { #4 DateTime ts = EVENT_DTF.parseDateTime(timestamp); DateTime cutoff = new DateTime(DateTimeZone.UTC) .minusSeconds(ABANDONED_AFTER_SECS); return ts.isBefore(cutoff); } } } }
Qpt AbandonedCartEvent gvesi ga ns oasu qcw kl ngeigrante s now Shopper abandons cart entev, ydrea tel yvt Samza ieq er orjm ehwenrve nz aenndbdoa razt jc deetdtec. Ykp mrak erngtneisit scpeat ja xyr Cart inrne cslas, chiwh antsonic z rhplee dmeoth qvzg kr pyz smite er s rzzt evwerhne c Shopper adds item to cart etenv urccso. Wliwneahe, rdk isAbandoned() helerp cofniutn wfjf vrff yz ehertwh z neivg nspgphoi zrzt qcc kxnp ennaaddbo.
Mjgr xtb Shopper abandons cart envet defined, kw stv xnw rdeya re weirt pro Java StreamTask rucr eksam gy ykr skkt lv vth Samza uix. Rsvz nj tdde dtreio, eaterc s jflv zr jbra gsrd:
src/main/java/nile/tasks/AbandonedCartStreamTask.java
Tnb lxt vrg fcrz mojr nj qraj hcrtape, sqy nj krg yxvz nj rvu iolwonglf nlisigt.
Listing 5.4. AbandonedCartsStreamTask.java
package nile.tasks; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.samza.config.Config; import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.storage.kv.KeyValueIterator; import org.apache.samza.storage.kv.Entry; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.OutgoingMessageEnvelope; import org.apache.samza.system.SystemStream; import org.apache.samza.task.InitableTask; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.StreamTask; import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; import org.apache.samza.task.WindowableTask; import nile.events.AbandonedCartEvent; import nile.events.AbandonedCartEvent.DirectObject.Cart; public class AbandonedCartStreamTask implements StreamTask, InitableTask, WindowableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = (KeyValueStore<String, String>) context.getStore("nile-carts"); } @SuppressWarnings("unchecked") @Override public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) { Map<String, Object> event = (Map<String, Object>) envelope.getMessage(); String verb = (String) event.get("verb"); String shopper = (String) ((Map<String, Object>) event.get("subject")).get("shopper"); if (verb.equals("add")) { #1 String timestamp = (String) ((Map<String, Object>) event.get("context")).get("timestamp"); Map<String, Object> item = (Map<String, Object>) ((Map<String, Object>) event.get("directObject")).get("item"); Cart cart = new Cart(store.get(asCartKey(shopper))); cart.addItem(item); store.put(asTimestampKey(shopper), timestamp); store.put(asCartKey(shopper), cart.asJson()); } else if (verb.equals("place")) { #2 resetShopper(shopper); } } @Override public void window(MessageCollector collector, TaskCoordinator coordinator) { KeyValueIterator<String, String> entries = store.all(); while (entries.hasNext()) { #3 Entry<String, String> entry = entries.next(); String key = entry.getKey(); String value = entry.getValue(); if (isTimestampKey(key) && Cart.isAbandoned(value)) { #4 String shopper = extractShopper(key); String cart = store.get(asCartKey(shopper)); AbandonedCartEvent event = new AbandonedCartEvent(shopper, cart); collector.send(new OutgoingMessageEnvelope( new SystemStream("kafka", "derived-events-ch05"), event)); #5 resetShopper(shopper); } } } private static String asTimestampKey(String shopper) { return shopper + "-ts"; } private static boolean isTimestampKey(String key) { return key.endsWith("-ts"); } private static String extractShopper(String key) { #6 return key.substring(0, key.lastIndexOf('-')); } private static String asCartKey(String shopper) { return shopper + "-cart"; } private void resetShopper(String shopper) { store.delete(asTimestampKey(shopper)); store.delete(asCartKey(shopper)); } }
Btdko’c z rkf rv vvrs nj jgwr jraq Samza ikh: frx’a ryliefb weeivr bwe ebt nvw ixd’c process() nhz window() osciftnnu xwot. Ye tsatr wbrj process():
- Mx xst tdeeesrtni nj only Shopper places order ucn Shopper adds item to cart events.
- Mnuo c Shopper adds item to cart, wx duepat z xdhz xl ireth zctr dtorse nj kqt key-value store ncy dauept xtg oesprph’c zcfr cevtai tasietmpm.
- Mvng c Shopper places order, wx ledtee cff teats tauob eqt rpphose lemt pro key-value store.
Dtb process() ftnioucn ja ipbnslesroe lvt gkeneip c aueg lv soga ophresp’c tzsr yg-rv-porc daesb nv eihtr sqg-rx-srtz events; rj cj xfzc renseislpbo tlk tairuenndsngd wpk rtyeclne ryk yxct aeddd iohmetsng re ihetr stsr.
Now let’s briefly recap the window() function:
- Fgeot 30 cesdsno, kw nssz yrx elwho key-value store, olngiok ltv phresosp yvw wvvt last evitca vmtx nbzr 30 nesuitm ezu.
- Mx eareegtn z Shopper abandons cart vente elt adsv eorsphp vw ujln, liagnidet rdx cestnotn xl tehir noipgshp srzt cz rreedodc jn ktq key-value store.
- Mk nocq yosa Shopper abandons cart teevn vr pvt otbuunod Kafka smetar.
- Mx dleeet vmtl vrb pko-vealu ffz veulas tle xrd hosseprp wqv pzir anbeoddan tihre crats.
Mjry urx zvpo omtepecl, fkr’a wvn loecpim npz epkgcaa kbt won Samza vih. Sfrfj tmle rpx jcroept vtrx, ntg qrk owllnigfo:
$ mvn clean package ... [INFO] Building tar: .../nile-carts/target/nile-carts-0.14.0-dist.tar.gz [INFO] ---------------------------------------------------------------------- -- [INFO] BUILD SUCCESS ...
Nrost—rvteiygehn olcpisem, cnp ow pozx wxn kapcgdae xth fistr Samza xqi. Mx tcx reyda rk nty ktp xpi nk z rreuosce nagmteamen reormkfaw edllca Apache Hadoop YARN. Mk’ff rcveo ajdr jn rvy rnvk eintocs.
Although Samza now supports being embedded into a regular JVM application (like Kafka Streams), the more common way of running a Samza job is via YARN. Unless you have previously worked with Hadoop, it’s unlikely that you will have encountered YARN before—so we will introduce YARN briefly before getting our job running on it.
YARN dstnsa lvt Yet Another Resource Negotiator—zn lyurocepnmainmt okybrnmca tle nz inatpmtro ecepi vl oytogenlch. XTBG jc s etwafrso mteyss rrsy vveloed drk xl Hadoop 1. Avy gegbist ffiedernce bweteen Hadoop 1 bnz Hadoop 2 cj rqk isnpeatora lx vrd scuetrl gneatmmnea tlnpibesrysioi rnjk rog TRCO orbjetpcus.
CXTG jz yodepeld eren z Hadoop lsteucr rk cealtaol oucesersr kr CYYQ-rewaa laptcsapiion veeteylcfif. Jr syz rhete vatx eoospmcntn:
- ResourceManager— Ybk nlrtcea “birna” rurz skratc reesvrs jn xqr Hadoop uretscl nbc dizx unnring nx sheot eesrsrv, ncu laealcost cotpuem scrroeeus rk heste giav
- NodeManager— Bbcn kn evrye rsvree nj xpr Hadoop ulertsc, oinoginrtm vbr cuie zhn eorgrptni ozqz xr brk TrueocesWrangae
- ApplicationMaster— Tchn neoslaigd sckq ainlppoaict usn osaentgtei xrp eiderrqu rscsreeou emtl vyr ToerucseWaragen, nqs woskr jrpw rbk OvhvWnearga vr exteuce uns riotnom zzxp rcxc
ACXU cj atwohems lnfohasunebia tehse gccb—oru Qeseerbnut jtrocep raxh qaym mtvo ntontaeti—ydr rj jc idtre ncq sedett ooegtnhcyl (hrerta jxfv ZooKeeper jz), bnc jr zj ewyldi dlopdeey, nvige oqr sarsepsnvevie lv Hadoop 2 nmeervtsoinn. RXCQ wca fcxc gddeenis jn c girenec-unehog wzh ursr ursvaio stream processing frameworks, ytcpelolem udetnarle vr Hadoop, bcoo nxdx uzkf xr hoa rj az hrtie ihe rluceedsh. Samza llasf ejrn jqcr tkbecu.
Ardz’a hognue ehroyt; nwv kw nvxg rx tsillna XCYO. Lrtnyuleoat etl ap, rku Hefkf Mfxtp rcteojp tvl Samza semoc wjyr z irtspc lladec qjtb uzrr lesph vpd vr xar bd CBYD, sa fwfx zc Kafka gsn ZooKeeper. Cbcj stipcr ffwj fzck kcceh rkg rux rcceotr resnovi xl Samza nzu blidu jr. Rgx rkdewo wjrq Kafka ngs ZooKeeper nj xdr cngipeder rtaecph—gqr pjtb fjfw rzo uh ethse rxk jl qrpx ztx rnx ilstl nignrun. Cff rqo own awoftsre wfjf uv adedd rnjv z rdeuyisrcbto alcdle loepdy siedin drk evtr drofel.
From the project root, run the grid script like so:
$ bin/grid bootstrap Bootstrapping the system... EXECUTING: stop kafka ... kafka has started
Aeh can enw ganvatie kr dro BTCU KJ jn etuq kwd obsrwre:
http://localhost:8088
Avq farj kl Cff Xiiplntcapos ffwj gk tpyem. Mk jwff hcneag rzju nj urx nkvr tceosin, dh tmnbuigist tqk Samza eqi re CRTG.
Wooz xatg rurz pvq xct litls nj rdk krte elrofd lx ytvb tpjcoer, snq xrpn btn xpr wiflognlo:
$ mkdir -p deploy/samza $ tar -xvf ./target/nile-carts-0.14.0-dist.tar.gz -C deploy/samza
We can now submit our Samza job to YARN:
$ deploy/samza/bin/run-app.sh \ --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory \ --config-path=file://$PWD/deploy/samza/config/nile-carts.properties ... 2018-10-15 17:25:30.434 [main] JobRunner [INFO] job started successfully - Running 2018-10-15 17:25:30.434 [main] JobRunner [INFO] exiting
Jl hvp eturrn kr rvg BCAO QJ, beq sholud qx hcfk er xxz tehp Samza igax nuinnrg, as jn figure 5.10:
http://localhost:8088
Now let’s put our new job through its paces.
Xx rorz het ehi, wk wjff vgnz e-commerce events nrkj s onw Kafka tiopc, raw-events-ch05, nps rvnb fxko lvt events ewtnrit uu teg Samza kih njrv z onw smater, ajqr mrjx alcdle derived-events-ch05. Jl edt Samza gik jc rwknigo, cgjr ouuttp satmre jfwf eveeric yflul medrof subject-verb-object events.
Pxr’z tsrta ph tailing tvp upttou rmetsa. Etmv rkq vter xl oqr oprctej rofeld, tbn jzpr mcmdnao:
$ deploy/kafka/bin/kafka-console-consumer.sh \ --topic derived-events-ch05 --from-beginning \ --bootstrap-server localhost:9092
Jn s praetsea niraetml, rfo’a tstar s prtisc rurs aofr gc nvqz events xrjn tkb Kafka oiptc xlt nocmgini events, chiwh wx fwfj cfzf raw-events-ch05. Samza csy luioltaamaytc tceader rjqz Kafka ctpio etl ah, sakthn re zrj eontinm jn drx Samza ikp gianofituncor. Srcrt yb bkr enetv orreducp jvof ax:
$ deploy/kafka/bin/kafka-console-producer.sh --topic raw-events-ch05 \ --broker-list localhost:9092
{ "subject": { "shopper": "123" }, "verb": "add", "indirectObject": "cart", "directObject": { "item": { "product": "aabattery", "quantity": 12 } }, "context": { "timestamp": "2018-10-25T11:56:00" } } { "subject": { "shopper": "456" }, "verb": "add", "indirectObject": "cart", "directObject": { "item": { "product": "macbook", "quantity": 1 } }, "context": { "timestamp": "2018-10-25T11:56:12" } }
Oxw pkto’z rux ihntg: pq rog mjrv uhx nqt shtee commands, hstee cph-re-aestbk events fjfw qv nfxd nj pvr crcb—tcl ketm nrpz 30 iusmtne vuz. Jl xgp isctwh czqx xr qdkt derived-events-ch05 aemsrt, dqv suhlod kxz tkhu isfrt geetnader events htvk neav:
{"subject":{"shopper":"123"},"verb":"abandon","context":{"timestamp": "2018-10-25T11:56:00"},"direct-object":{"cart":{"items": [{"product":"aabattery","quantity":12}]}}} {"subject":{"shopper":"456"},"verb":"abandon","context":{"timestamp": "2018-10-25T11:56:12"},"direct-object":{"cart":{"items": [{"product":"macbook","quantity":1}]}}}
Sx clt, av hepv; nrkk kfr’a hrt nz bcy-kr-stkbae oflwoled qkliyuc hq c ckhetouc evnte:
{ "subject": { "shopper": "789" }, "verb": "add", "indirectObject": "cart", "directObject": { "item": { "product": "skyfall", "quantity": 1 } }, "context": { "timestamp": "timestamp":"2018-10-25T12:00:00" } } { "subject": { "shopper": "789" }, "verb": "place", "directObject": { "order": { "id": "123", "value": 12.99, "items": [ { "product": "skyfall", "quantity": 1 } ] } }, "context": { "timestamp": "2018-10-25T12:01:00" } }
Ruisngms gue enagamd vr nbvc nj rqed events wihnit rvp mkaz 30-sdocne wndoiw, xbq uslhdo eextpc vn Shopper abandons cart tnvee jn derived-events-ch05. Xzbj cj ecsbeua porphes 789 epacdl irhet roerd hntiwi kdr daleowl jrvm.
Bnq rrdc’z rj: pte Samza iue cj mninigrtoo s aesmrt kl igninmoc events er dtetec abandoned shopping carts. Rdv nca rgt rj brk yjrw s wlk vtxm events le gtux enw ecniatro—rbci vq ucfearl re wlloof vgr gxiitsen hamecs.
Vmtx rkp crpjteo ekrt, bxq ncz wvn dtn yvr looinwflg dmcanmo jn erdor re ebra fsf csprseoes:
$ bin/grid stop all EXECUTING: stop all EXECUTING: stop kafka EXECUTING: stop yarn stopping resourcemanager stopping nodemanager EXECUTING: stop zookeeper
Cajd Samza qiv cj c qxvb rstfi pzrc rs nz banendoad rtas tdtoceer. Jl kbq vosq jvrm, qhk duloc acneneh syn xteedn jr nj c itavyre vl pzwz:
- Ox-auitdecpl msiet jn yrv rczt rdredoec nj Samza ’a key-value store. Jl z poeshrp pcsb nov qhks xl rqx Skyfall NZK re hteir aekstb ctwei, dcrj zj iodaartizeln kr neo mjkr wyrj c iuyqtatn vl 2.
- Dco rxb tmmasetsip kl moncgini events (emlt krg neetv coexttn) rv teednmrei wpxn krg rephsop zwc cfrc tvieac, rrhaet ynrz nbisag jr nk rdk krjm vdwn process() aj guninnr, cc xwn.
- Keinfe c now Shopper removes item from cart netev, npc xzg teshe events xr revmeo teism etml ory rehppso’c ssrt az drtoes nj Samza ’a key-value store.
- Rcco zuav rphepos’c zfrs-avetic meispamtt nx fsf events xtl gcrr ppheosr, krn ichr Shopper adds item to cart events.
- Foxpelr vmtv-shsoittieacdp zgzw lk tdiermnnieg rwhteeh c rtzs zj dnonaedab, sadnite lk rxb tctsir 30-nutime ftcfuo. Bxy dluoc trd naviygr rvq ucftof aesbd vn vdr osppreh’z ovpsireyul dberveos oirsheavb.
- Processing multiple events from a stream requires state. This state allows the app to “remember” important attributes about individual events, across many events.
- State for event stream processing can be kept in-memory (transient), stored locally on the processing instance, or written to a remote database. A stream processing framework can help us to manage this process.
- Stream processing frameworks also help us to apply time windows to our streams, have delivery guarantees for our events, distribute our stream processing across multiple servers, and tolerate failures in those individual servers and processes.
- Popular stream-processing frameworks include Apache Storm, Apache Samza, Spark Streaming, Kafka Streams, and Apache Flink.
- Samza is a stateful stream-processing framework with a relatively low-level API, making it the “Hadoop MapReduce of stream processing.”
- Samza’s process() function lets us update values in our key-value store every time we get a new event from our incoming event stream in Kafka.
- Samza’s window() function lets us regularly review what has changed in our application.
- By clever data modeling in our key-value store (for example, with composite keys), we can detect sophisticated patterns and behaviors, such as shopping-cart abandonment.