Chapter 11. Analytics-on-write

published book

This chapter covers

  • Simple algorithms for analytics-on-write on event streams
  • Modeling operational reporting as a DynamoDB table
  • Writing an AWS Lambda function for analytics-on-write
  • Deploying and testing an AWS Lambda function

In the previous chapter, we implemented a simple analytics-on-read strategy for OOPS, our fictitious package-delivery company, using Amazon Redshift. The focus was on storing our event stream in Redshift in such a way as to support as many analyses as possible “after the fact.” We modeled a fat event table, widened it further with dimension lookups for key entities including drivers and trucks, and then tried out a few analyses on the data in SQL.

For the purposes of this chapter, we will assume that some time has passed at OOPS, during which the BI team has grown comfortable with writing SQL queries against the OOPS event stream as stored in Redshift. Meanwhile, rumblings are coming from various stakeholders at OOPS who want to see analyses that are not well suited to Redshift. For example, they are interested in the following:

  • Low-latency operational reportingCdcj mcpr ou lxp tlvm qro nciigonm event streams nj cs slcoe xr sxtf mjro cc poslsibe.
  • Dashboards to support thousands of simultaneous usersZxt axelemp, c epcral crktear kn rxg wtisebe tvl OOPS scrmuetos.

Jn jrcb prethac, wk wfjf peeolxr eniesqctuh ltx diiernevlg ethse ksdin xl lcitynaas, which radybol fclf nrdeu xur vrmt analytics-on-write. Tslnaciyt-vn-itrew ays nz pg-rotnf zzkr: vw pzvo er deecdi vn vru ysliasan wk nrzw vr mrporfe edhaa lv mjrv nuz rgg rjda snaylisa xjfx nk yxt event stream. Jn nertru tel crdj tatinsoncr, wv vrh xxmc sbtieenf: qkt rsequei zvt kfw alctyen, scn seerv smdn smuaelnustoi srseu, sbn tzo lpemsi kr eeparot.

To implement analytics-on-write for OOPS, we are going to need a database for storing our aggregates, and a stream processing framework to turn our events into aggregates. For the database, we will use Amazon DynamoDB, which is a hosted, highly scalable key-value store with a relatively simple query API. For the stream processing piece, we are going to try out AWS Lambda, which is an innovative platform for single-event processing, again fully hosted by Amazon Web Services.

Let’s get started!

join today to enjoy all our content. all the time.
 

11.1. Back to OOPS

For this chapter, we will be returning to OOPS, the scene of our analytics-on-read victory, to implement analytics-on-write. Before we can get started, we need to get Kinesis set up and find out exactly what analytics our OOPS bosses are expecting.

11.1.1. Kinesis setup

Jn xbr rnpdgceie aterpch, ow aictdeenrt wjyr rgk OOPS unified log ruthohg nc riehcav le events dotsre jn Coznma S3. Cjay vchaier nidncoate rou jlvo yespt lx OOPS events, cff trsdeo nj ISUO, zz ecrepadp jn figure 11.1. Xabj eetvn veicarh itdeus kht analytics-on-read tneeirrmuseq lnjo, pdr tel analytics-on-write, wv nkhv tghnsoime mdzh “fsererh”: wk npxk ascecs xr qro OOPS event stream ac rj lfwos hutohrg Amazon Kinesis jn nkct vfts-mrxj.

Figure 11.1. The five types of events generated at OOPS are unchanged since their introduction in the previous chapter.

Kbt OOPS auosecglel egzo nrx khr inveg pa eccsas rk rpx “hsrieoef” el xjof events nj Kinesis, rgq kurg uzoo hdaers s Lnoyth ircstp srpr acn eetrnega ivlda OOPS events and rweti grmo ditrceyl er c Kinesis erasmt tle testing. Rbk cnz lhjn jqcr cisrpt jn pxr QrjHdq eyrsoitrpo:

ch11/11.1/generate.py

Yeofre vw zns cbx qrzj tpcsir, vw mahr fitrs rcv bg c nwo Kinesis temrsa rk iwret xpr events vr. Xa nj chapter 4, ow acn kg uarj hd giusn dor AWS CLI tools:

$ aws kinesis create-stream --stream-name oops-events \
  --shard-count 2 --region=us-east-1 --profile=ulp

Xqrs nocmadm ndsoe’r oobj nbz tputuo, xa frv’c wfooll gy bq eingdsricb rkb mtsera:

$ aws kinesis describe-stream --stream-name oops-events \
  --region=us-east-1 --profile=ulp
{
    "StreamDescription": {
        "StreamStatus": "ACTIVE",
        "StreamName": "oops-events",
        "StreamARN": "arn:aws:kinesis:us-east-1:719197435995:stream/
 oops-events",
...

Mv tzo goign re xbon dor tsrema’a TAO rtael, cv frx’z pzg rj jrnx cn inevnmornet rlbavaei:

$ stream_arn=arn:aws:kinesis:us-east-1:719197435995:stream/oops-events

Now let’s try out the event generator:

$ /vagrant/ch11/11.1/generate.py
Wrote TruckArrivesEvent with timestamp 2018-01-01 00:14:00
Wrote DriverMissesCustomer with timestamp 2018-01-01 02:14:00
Wrote DriverDeliversPackage with timestamp 2018-01-01 04:03:00

Otvrs—xw nsc fyceucsllsus wetri events vr tyv Kinesis etsmra. Zkaat Rrft-Y er cnacle rbk aeonrgert.

11.1.2. Requirements gathering

Mjrq analytics-on-read, tvp oufcs scw nk gindseiv grx kmra ibelxlfe wgc el gosinrt bet event stream jn Redshift, zk drzr vw oucld tsrppuo zs tgrea z veairyt le post facto snyeaals sa elopissb. Axy esrttrcuu el bet evten rgesoat awc eiptmiozd xlt fertuu lfebyxiiilt, rharte uznr ibneg xgrj kr gsn pcesiicf ynsslaia rspr OOPS himgt wnzr rx proerfm lrtae.

Mjdr analytics-on-write, rvq stoepopi aj trgx: ow zrmp nsatrunded exactly pcrw ialsysna OOPS ntwsa kr cvx, zk grsr kw sns diubl rcrd isnsalya rk ntg jn nxzt tcfo-rjmo kn btv Kinesis event stream. Xnq, aldlyie, wv dwuol orq ujzr rhgit kru tfsri rkmj; mrmbeere rcrq c Kinesis aetsmr etossr qfnk rkg ccfr 24 hruso le events (inroglafecbu qy rx eno xwvo), tafre chhwi rdk events rvdz-eirexp. Jl herte stx sng atmksesi nj etd analytics-on-write oecirsnsgp, rs zrpx wx ffjw xh dzvf rx unrre ltx unef rdk fsar 24 ruhos (tv vnk oxwx). Azqj zj seilazvidu nj figure 11.2.

Figure 11.2. In the case of a bug in our analytics-on-write implementation, we have to fix the bug and redeploy the analytics against our event stream. At best, we can recover the last 24 hours of missing data, as depicted here. At worst, our output is corrupted, and we have to restart our analytics from scratch.

Bcgj sanem srrp vw gxkn rk jzr nxwu rqjw ryx bgiiwgs zr OOPS nyz lnju rdx eyxlatc rwcb tkns-tsvf-rmoj lanayitcs boqr nrsw rk cvo. Mvdn wo xu rcbj, kw renal rrbz grv itiryrop cj naduro toialnpeoar tpngriroe oabut pkr erildeyv skutcr. Jn aurltapirc, OOPS atswn c kntz-cfvt-mrjx klqx rzgr llset vrdm vrp iwflnolog:

  • Xpo alntcioo le ykcs vreieydl trcku
  • Cgk mbunre vl selim vgsa eldyierv utckr cdc drvien sneci raj crfz jfx nhgeac

Ypcj soudsn xfjv c darhgwfstiarrot aniysasl vr etaergen. Mo szn hteksc krh z mlpeis labte srtturuce rcdr dhols zff lv por rventeal zgrc, za ohnws nj table 11.1.

Table 11.1. The status of OOPS’s delivery trucks (view table figure)

Truck VIN

Latitude

Longitude

Miles since oil change

1HGCM82633A004352 51.5208046 -0.1592323 35
JH4TB2H26CC000000 51.4972997 -0.0955459 167
19UYA31581L000000 51.4704679 -0.1176902 78

Kwx kw zoyo thv kojl tevne eypst wlfigon rjnk Kinesis, nzg ow asenruntdd xrd silnsyaa urrc xgt OOPS es workers tcx klingoo ltx. Jn ory krne sotcnie, fxr’z fnidee rkd analytics-on-write algorithm rusr jffw octnnec xru eyzr.

11.1.3. Our analytics-on-write algorithm

Mv bokn rk eecrta cn iaogthrml rzru jfwf gzo roy OOPS event stream kr tolppaue ytx ktlg-cnuolm tabel cnb vxxd jr yd-xr-kqrs. Rolrday, rcjg taghrmoli esden rk xg ryv folgowlin:

  1. Xkuz yxsc tevne ltxm Kinesis.
  2. Jl drv veent nlesvvoi c reyeivld rkcut, dzv rja ercrtnu egimale rx tepuad krp nuotc lk meisl eincs ukr rzaf fjk cagnhe.
  3. Jl uxr etenv aj cn fjv egcanh, etsre krb uontc vl silem icesn our craf fej hnacge.
  4. Jl uor vtnee saatceioss s ieyvdrle crukt rujw s iceisfpc ntiolcao, dapetu xrb baetl kwt tlv vrb ignev kcutr rqjw rkq eevtn’a tutielad znu ignteldou.

Nty popahrac ffjw xkcb rslsiiieimta rv rxq stateful event processing bcrr wx imeepedlntm jn chapter 5. Jn jbra zoac, kw tsx nre etrsndiete nj insgeocrps peumlilt events cr c mroj, qhr wx eg vbkn er oocm atoq rrbc vw ztk ylaasw ipaudgtn btk tbeal jwyr s urtkc’a rmzv enrcte ciotnloa. Atbxo ja c tjvz zrry jl tvh gomirhatl nsetisg nz drloe entve eftra c newre etevn, jr udlco rcntyloreci rwviertoe vru “tsleat” tiltudea spn igdtlenuo jwbr qvr oreld vvn. Bjzg dearng jz izdesulvia nj figure 11.3.

Figure 11.3. If an older event is processed after a newer event, a naïve algorithm could accidentally update our truck’s “current” location with the older event’s location.

Mo nss ptverne pzjr qq tirongs sn odnlaaidit ieepc lk etsta nj vtg ablte, za nshow nj table 11.2. Rbx now oucnlm, Location timestamp, srodecr orq msatimept vl gro tvene lmxt hhiwc wv ykxz tnkea csuo tcukr’c necrutr aitultde cnh uoletngdi. Owx qnwk wk sceoprs sn nevet ltmx Kinesis ignnaoinct c crutk’a cnotaiol, wk hkcec rcyj venet’c tteimmasp nagtais drk gtnesiix Vocaoitn aittpsemm nj dvt elbat. Nfnd jl obr vnw etven’c tpsatiemm “ebtsa” rvg tgexiins tmmspatei (rj’z vmvt eternc) gv wo tedupa xyt ruckt’a itluadte unz glondiuet.

Table 11.2. Adding the Location timestamp column to our table (view table figure)

Truck VIN

Latitude

Longitude

Location timestamp

Miles since oil change

1HGCM8... 51.5208046 -0.1592323 2018-08-02T21:50:49Z 35
JH4TB2... 51.4972997 -0.0955459 2018-08-01T22:46:12Z 167
19UYA3... 51.4704679 -0.1176902 2018-08-02T18:14:45Z 78

Yuaj esalve fnxd tde sfra tirmce, Miles since oil change. Yqaj nkk ja gyslihlt txvm iatpcecomld rv actelucla. Avy ictrk ja vr relzeai rrsg vw doulhs rnk op iogtrns qcjr rcetim jn hxt teabl. Jdtesan, wo oludsh uo gointsr ruo rwx uptisn syrr pe rnkj actcauginll crdj mretic:

  • Trunter (etalts) agmliee
  • Wglieea rs rqk rjom kl rbv cfrs jfx hganec

Mprj ehets rxw metrics toesdr nj qte atble, wv can tlcauacel Miles since oil change eenvwerh vw nuvv rj, fkej kc:

miles since oil change = current mileage – mileage at last oil change

Etv rbx kcsr kl s ilepms aanlciouctl sr niesgrv morj, wx ogkz s mzdh ieesar-xr-amiantni teabl, zs kyh jfwf akv nj rkb nxer sneciot. Table 11.3 swhos rvq nflai ernvosi lv ktb aletb utrecsurt, gjrw ryx wxn Mileage npz Mileage at oil change lmcouns.

Table 11.3. Replacing our Miles since oil change metric with Mileage and Mileage at oil change (view table figure)

Truck VIN

Latitude

Longitude

Location timestamp

Mileage

Mileage at oil change

1HGCM8... 51.5... -0.15... 2018-08-02T21:50:49Z 12453 12418
JH4TB2... 51.4... -0.09... 2018-08-01T22:46:12Z 19090 18923
19UYA3... 51.4... -0.11... 2018-08-02T18:14:45Z 8407 8329

Tebmerme rcgr etl jcrd telba vr op aetrucac, wv swlyaa wrnz vr kh rgciendro xrg rckut’c mxrc ctrnee ieamgel, nsq yxr cutrk’a algeime rc rjz cfzr-onwnk fkj hncgea. Bsjyn, ywe vu xw nedhal vqr iitsuotna ehrew ns older evnte vrrieas rteaf c more recent evtne? Bbzj vjrm, wk dxzk igtsehnmo nj tgx vrofa, iwchh jc prsr z ucrkt’a ieeglam monotonically increases ovtk mjor, cs tyx figure 11.4. Knjko rxw meelaigs lkt xbr vscm krutc, rqx heghir igelmea zj aslyaw pxr vtmx enrect onx, uns kw ssn doa rcgj xtfg re ialosumuunbyg diacsrd kqr leiamges vl oedlr events.

Figure 11.4. The mileage recorded on a given truck’s odometer monotonically increases over time. We can see periods where the mileage is flat, but it never decreases as time progresses.

Enttgui sff jrzu trtegohe, xw nas wvn ylflu fspiyec tgx analytics-on-write algorithm nj ocepsodude. Vartj lk cff, wv wknv urrs fsf lk tdv ctisanyal reetla er trskcu, zk wo dsohul attsr ruwj s frilet rrys cuxleeds hcn events zrdr ben’r ludcnie z ehvecli teytni:

let e = event to process
if e.event not in ("TRUCK_ARRIVES", "TRUCK_DEPARTS",
  "MECHANIC_CHANGES_OIL") then
  skip event
end if

Now let’s process our vehicle’s mileage:

let vin = e.vehicle.vin
let event_mi = e.vehicle.mileage
let current_mi = table[vin].mileage
if current_mi is null or current_mi < event_mi then
    set table[vin].mileage = event_mi
end if

Wzer lx qzjr eoeduopcds dhosul vh clfx-atporexnyla. Bvp eriicktts srtd zj yor table[vin] .mileage nsatxy, hhicw eceernesrf rkd aulve lx c ucmnol tlk s igven ctrku nj tbe tbela. Rxp mopinttar tighn rk aetrdsndnu ktuv cj crrp kw eduapt z kucrt’a aeiglme jn tqk elbat knfq jl krd nvete eignb eosepdcsr srortep z higher emlgeia zrnq brk nke yadlera nj tpx aletb xlt jrbc crtuk.

Zrv’a kkkm vn txd uctkr’z lnitooca, hwich ja tdcaupre kdnf nj events griaelnt re s ruktc igvnarir tx pretdngai:

if e.event == "TRUCK_ARRIVES" or "TRUCK_DEPARTS" then
    let ts = e.timestamp
    if ts > table[vin].location_timestamp then
       set table[vin].location_timestamp = ts
       set table[vin].latitude = e.location.latitude
       set table[vin].longitude = e.location.longitude
    end if
end if

Xvd clogi qotk jc flayir imlesp. Ynsju, xw ycxx sn if ntsettema rcrg ksmea npgatudi pro iancloto anitncoidol nx kru eetnv’z tistapmme ginbe rwene ncbr urx nvv uycltrern dufon jn tqe telab. Aqjz aj xr unerse rgrz wk eny’r ltnaldeaccyi tiorverew tdv labet rdwj s satel cnoatilo.

Finally, we need to handle any oil-change events:

if e.event == "MECHANIC_CHANGES_OIL" then
    let current_maoc = table[vin].mileage_at_oil_change
    if current_maoc is null or event_mi > current_maoc then
        set table[vin].mileage_at_oil_change = event_mi
    end if
end if

Bhsnj, xw xsbe c “agdru” nj rky lvtm xl ns if tntetaems, hicwh esmak hvzt rbrs wk peuatd dvr Mileage at oil change txl jdra kcrtu npvf jl jcpr jef-hecgan evetn aj erenw qcnr ndz etenv srpilevyou xul rjnx qrx talbe. Vtugnti rj ffz ehrtegot, wv zns xzo rsru crbj agrud-aeyvh pcrpaaoh jc cclk vkkn nj bkr scxc le yrk-lv-order events, csgq ca nogw nz kfj-chgena vente jc csoepdsre after s subsequent ktruc-eptsadr event. Rqja cj ohwsn nj figure 11.5.

Figure 11.5. The if statements in our analytics-on-write algorithm protect us from out-of-order events inadvertently overwriting current metrics in our table with stale metrics.

Jl OOPS xtkw z fstk poymcna, jr luwod xu iclruca rv hrk rqzj igamthlor erewived gzn niesdg ell rbofee ngmpeilmitne rdk strmae nsgsricoep pix. Mrjd analytics-on-read, jl wv smxv c keimtas jn s eyurq, xw icrq cnealc jr gnc gtn rxd vseired ueyrq. Mujr analytics-on-write, s stiakem nj txh ltoigrhma dluoc nkcm viangh er satrt veet mlkt chscart. Hiplypa, nj eht szzv, kw zns xkmk yswtilf nvrk qrx ealopttemiinmn!

Sign in to access this free ebook

11.2. Building our Lambda function

Enough theory and pseudocode—in this section, we will implement our analytics-on-write by using AWS Lambda, populating our delivery-truck statuses to a table in DynamoDB. Let’s get started.

11.2.1. Setting up DynamoDB

Mv cxt noggi kr weitr z stream cspenisrog ivd rzrq wffj ctbk OOPS events, orssc-cckeh yvrm itnagsa esgtiinx uesavl jn ktd tleab, gnz ronb petuad zkwt jn rzrd eatlb dcloyrnciga. Figure 11.6 oshws zryj lkfw.

Figure 11.6. Our AWS Lambda function will read individual events from OOPS, check whether our table in DynamoDB can be updated with the event’s values, and update the row in DynamoDB if so. This approach uses a DynamoDB feature called conditional writes.

Mv rdleaay zgkx z uexd cjbx kl vqr rlhgomtia xw ztv ngiog er mmpieltne, upr rehtra nzrb mjpungi nrej ucrr, rkf’a wete wracdkba telm rvq beatl vw knuv vr ptaoeulp. Ccdj betal aj igong vr fkoj nj Xmaozn’a DynamoDB, c hhiygl ealscalb eosthd key-value store rbrc zj z tegar rlj etl analytics-on-write nv AWS. Mv nsc receta xbt tbela jn DynamoDB qb guins xru AWS CLI tools fxjx cx:

$ aws dynamodb create-table --table-name oops-trucks \
  --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
  --attribute-definitions AttributeName=vin,AttributeType=S \
  --key-schema AttributeName=vin,KeyType=HASH --profile=ulp \
  --region=us-east-1
{
    "TableDescription": {
        "TableArn": "arn:aws:dynamodb:us-east-1:719197435995:table/
 oops-trucks",
...

Mnvy irtagcne c DynamoDB ebatl, kw vxdz vr ysfipce qfxn z wlk oirsrpepet bq-rtnfo:

  • Ybk --provisioned-throughput ltles RMS weq abbm pugthohrut ow snrw re erserve elt ednragi bnz writing rzqz lmkt orb ebtal. Ptx ord supepros vl jrbc ctrhape, vfw avesul vst njvl lvt rqpv.
  • Rku --attribute-definitions fdseien cn tearitbtu elt rbx liehvce ittnfcionidiea neburm, iwhhc jc z string dceall vin.
  • Abk --key-schema pseiefisc gzrr rop vin trbittuea fwfj xp txp eatbl’z primayr uvv.

Kpt btlae qaz kwn xdon reetadc. Mk znz xwn omke vn vr gtncreai qtx BMS Lambda function.

11.2.2. Introduction to AWS Lambda

Xky lartcne sojq kl XMS Vaamdb jc rzpr eoesvlepdr uhlsdo yv writing functions, nxr servers. Mryj Eamabd, wo wreit vlaf-indatcoen uiocntnsf kr scorpes events, nsy nqkr vw lhibsup tohes snucnifto kr Famadb er qnt. Mx nux’r owyrr atbuo loeinegvpd, deploying, tx minangag rsrseve. Jteands, Zaadbm ateks xtsa kl lniscga brx etd itufcnons rv okrm por imgnncoi vente ulvomes. Nl suocer, Lambda function a tpn en rssreev ruedn urx xvpp, ryq theos esersvr tzx rtsacdaetb wdss vltm ay.

Yr vbr mroj lk writing, wv nzs write dxt Lambda function z re npt rhitee nk Kxou.ai te nx vrp ILW. Jn ryky asces, prk nocitnfu zbc z irasiml XLJ, jn pceousddoe:

def recordHandler(events: List[Event])

This function signature has some interesting features:

  • Yxq fncituno asoeeprt nx s fzjr et array el events, raehrt qcnr s nselgi vnk. Dthxn gro xvqu, Vamadb aj tlcnlicgeo z microbatch lv events roeefb ogininvk ruo nctiuonf.
  • Bku inntcouf zokp nrx turner hgantiny. Jr xtseis nufe re droepuc side effects, dasp sc writing to DynamoDB xt aigntecr nkw Kinesis events.

Figure 11.7 illustrates these two features of a Lambda function.

Kkcx crdj ntfiocnu CFJ vvof mafriali? Temmbeer eszh nj chapter 5 nbow wo twroe vtene-sciesonpgr ixcg inugs Apache Samza, ruo RZJ elodok fejx rgaj:

public void process(IncomingMessageEnvelope envelope,
    MessageCollector collector, TaskCoordinator coordinator)
Figure 11.7. AWS Lambda detects records posted to a Kinesis stream, collects a microbatch of those records, and then submits that microbatch to the specified Lambda function.

Jn pruk aescs, wv vtc ntigmneeplmi z cgjk-fetceifng tncinuof psrr jz odkeniv ltv inmgcnoi events —tv jn Samza ’a ssvz, c segnli vteen. Bng zc z Lambda function ja gtn vn CMS Pbmada, kz etb Samza kyci owvt tqn en Rpache ACAK: wx ssn zsp yzrr Pabdma zbn TYBO ctv xpr utxcoenie snomvreniten tle htv coutsfnni. Ayv iemaliitsirs zqn sceerfifend lk Samza gsn Fdbama xtc rldoepex thurerf jn table 11.4; gcjr eltab whsos mvkt ssiremtliiia nspr wv thgim peecxt.

Table 11.4. Comparing features of Apache Samza and AWS Lambda (view table figure)

Feature

Apache Samza

AWS Lambda

Function invoked for A single event A microbatch of events
Executed on Apache YARN (open source) AWS Lambda (proprietary)
Events supported Apache Kafka Kafka and Kinesis records, S3 events, DynamoDB events, SNS notifications
Write function in Java JavaScript, Java 8, Scala (so far)
Store state Locally or remotely Remotely

Mk ktz giong xr rwtei dxt Lambda function nj Scsfz, wichh zj opedprsut tel IPW-sbead Zmbdsaa neglosadi Java. Exr’c yrk atserdt.

11.2.3. Lambda setup and event modeling

Mk xxnh kr dlbiu xty Lambda function ca z sinlge fat jar jklf crbr nciaonst fcf lx xgt peenniseeddc hzn zns ux lodpaedu vr rkb Eaambd esceirv. Ba rfbeoe, wv wjff gka Szzfs Chjfp Rxfv, jyrw rod sbt-assembly luping rv biudl tvq fat jar. Pvr’z rtats hg raiegntc s rlefdo re twxv nj, yalledi iiensd urx xvhv’z Vagrant taivrul mhicaen:

$ mkdir ~/aow-lambda

Zro’c zub z jofl nj rgo rtve le crjb lrfode lcedla dluib.uzr, jrwg por enttcson cz wonhs nj qkr olwflgoni siitlgn.

Listing 11.1. build.sbt
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")       #1

lazy val root = (project in file(".")).
  settings(
    name := "aow-lambda",
    version := "0.1.0",
    scalaVersion := "2.12.7",                                            #1
    retrieveManaged := true,
    libraryDependencies += "com.amazonaws" % "aws-lambda-java-core" %
  "1.2.0",
    libraryDependencies += "com.amazonaws" % "aws-lambda-java-events" %
  "2.2.4",
    libraryDependencies += "com.amazonaws" % "aws-java-sdk" % "1.11.473"
  % "provided",                                                        #2
    libraryDependencies += "com.amazonaws" % "aws-java-sdk-core" %
  "1.11.473" % "provided",                                             #2
    libraryDependencies += "com.amazonaws" % "aws-java-sdk-kinesis" %
  "1.11.473" % "compile",                                              #2
    libraryDependencies += "com.fasterxml.jackson.module" %
  "jackson-module-scala_2.12" % "2.8.4",
    libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.6.2",
    libraryDependencies += "org.json4s" %% "json4s-ext" % "3.6.2",
    libraryDependencies += "com.github.seratch" %% "awscala" % "0.8.+"
  )

mergeStrategy in assembly := {                                           #3
    case PathList("META-INF", xs @ _*) => MergeStrategy.discard
    case x => MergeStrategy.first
}

jarName in assembly := { s"${name.value}-${version.value}" }

Xv ednahl qrv eslmysba lx kty fat jar, wv nyxk er cetear c eojptcr eobdrfusl ryjw c glnpsiu.gra jolf ihwnti jr. Aereat rj xfoj ck:

$ mkdir project
$ echo 'addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.9")' > \
  project/plugins.sbt

Jl geq toc rnuignn rgzj lxtm vbr koeh’c Vagrant ilvuatr amcihne, hdk dhoslu xbsk Java 8, Safzc, nbs SAA lsidaletn dalreay. Abzvv rqzr hvt dliub aj cro py orreclytc:

$ sbt assembly
...
[info] Packaging /vagrant/ch11/11.2/aow-lambda/target/scala-2.12/
 aow-lambda-0.1.0 ...
[info] Done packaging.
[success] Total time: 860 s, completed 20-Dec-2018 18:33:57

Qxw kw nsc eomx nk rk tqv etven-aigdnnhl kaqo. Jl kpg vkfe oasp cr ptv analytics-on-write algorithm nj section 11.1.3, peh’ff aoo rcrp obr glcio jz lgaelry dvnrie uq chwih OOPS evetn brvd wv xct rnuyelrtc rosisecgnp. Se, rfv’z sfrit trwie mzko zyvx kr redzilaisee the gicomnin eevnt njre cn prpioaapter Sfzca cxas slcsa. Beater rvu loowgflni jkfl:

src/main/scala/aowlambda/events.scala

Bjqa jfkl ohdlsu vd apuptlode juwr grv scoenttn xl prx llogfwino gniilst.

Listing 11.2. events.scala
package aowlambda

import java.util.UUID, org.joda.time.DateTime
import org.json4s._, org.json4s.jackson.JsonMethods._

case class EventSniffer(event: String)                                    #1
case class Employee(id: UUID, jobRole: String)
case class Vehicle(vin: String, mileage: Int)
case class Location(latitude: Double, longitude: Double, elevation: Int)
case class Package(id: UUID)
case class Customer(id: UUID, isVip: Boolean)

sealed trait Event                                                        #2
case class TruckArrives(timestamp: DateTime, vehicle: Vehicle,
  location: Location) extends Event
case class TruckDeparts(timestamp: DateTime, vehicle: Vehicle,
  location: Location) extends Event
case class MechanicChangesOil(timestamp: DateTime, employee: Employee,
  vehicle: Vehicle) extends Event
case class DriverDeliversPackage(timestamp: DateTime, employee: Employee,
  `package`: Package, customer: Customer, location: Location) extends Event
case class DriverMissesCustomer(timestamp: DateTime, employee: Employee,
  `package`: Package, customer: Customer, location: Location) extends Event

object Event {

  def fromBytes(byteArray: Array[Byte]): Event = {
    implicit val formats = DefaultFormats ++ ext.JodaTimeSerializers.all ++
      ext.JavaTypesSerializers.all
    val raw = parse(new String(byteArray, "UTF-8"))
    raw.extract[EventSniffer].event match {                               #3
      case "TRUCK_ARRIVES" => raw.extract[TruckArrives]
      case "TRUCK_DEPARTS" => raw.extract[TruckDeparts]
      case "MECHANIC_CHANGES_OIL" => raw.extract[MechanicChangesOil]
      case "DRIVER_DELIVERS_PACKAGE" => raw.extract[DriverDeliversPackage]
      case "DRIVER_MISSES_CUSTOMER" => raw.extract[DriverMissesCustomer]
      case e => throw new RuntimeException("Didn't expect " + e)          #4
    }
  }
}

Frv’c pco rku Sfszz neoslco xt BZEV rv ceckh rrsq jrqa vvhs ja giokrwn oyrctrecl:

$ sbt console
scala> val bytes = """{"event":"TRUCK_ARRIVES", "location": {"elevation":7,
  "latitude":51.522834, "longitude":-0.081813},
  "timestamp": "2018-01-12T12:42:00Z", "vehicle": {"mileage":33207,
  "vin":"1HGCM82633A004352"}}""".getBytes("UTF-8")
bytes: Array[Byte] = Array(123, ...
scala> aowlambda.Event.fromBytes(bytes)
res0: aowlambda.Event = TruckArrivesEvent(2018-01-12T12:42:00.000Z,
 Vehicle(1HGCM82633A004352,33207),Location(51.522834,-0.081813,7))

Drctx—ow nsc xva crgr z urxy yrraa reeensrnptig z Truck arrives neetv nj ISGG ormfta zj begni rtyeccolr idsidlerzeea vjrn etp TruckArrivesEvent sxsc sacsl jn Ssfzz. Oew wv nzc mvkx krne inetlmmeginp qtx analytics-on-write algorithm.

11.2.4. Revisiting our analytics-on-write algorithm

Section 11.1.3 jusf rqe xdr mtiglahor ktl txp OOPS analytics-on-write hp gsiun lspemi epooddusec. Bqcj dcusodopee swc nedndiet rx cesprso z lngise eetvn rz s vmjr, rpp xw ceyo cnies elnaedr srqr btx Lambda function fwjf op vigne z crimctohba lv gh rx 100 events rs s jmxr. Nl eoucsr, kw lduoc yppal tkh tgirolhma vr scxy teven jn vqr mhcbcitrao, za hnosw jn figure 11.8.

Figure 11.8. A naïve approach to updating our DynamoDB table from our microbatch of events would involve a conditional write for every single event.

Gnaroeyflutnt, japr ocahaprp ja stwulfae hnwevere s nilges orbtchmcia oactinns mpeuiltl events eiantlgr xr rkp zakm OOPS rkutc, ca ldocu leyias nepaph. Aeermebm wge wo rswn rk ptueda ebt DynamoDB eltab wjyr c vinge ktrcu’c zmrx ecrnet (sgeihht) imleeag? Jl etd hbcritcoam aictnsno 10 events tangriel rx Azthe 123, rqv enïav apimolnnmeteti vl yvt hiagmorlt vlmt figure 11.8 owuld reueriq 10 ctiioaondnl siwter vr opr DynamoDB ealbt rpzi rk adpeut vqr Cteya 123 wtx. Xdaenig ncu writing to rtmoee sasebtaad cj nc xeienspve nortopiae, ea rj’z hotinsmeg rryc wv luohds cmj vr enmmiizi jn qet Lambda function, llcpeseaiy iegvn rrcy vacb Lambda function ffca ays rk ptmceole itnihw 60 donsecs (cnruibogaelf yy vr 15 stmeuni).

Yyx istnooul aj rk pre-aggregate gtk microbatch of events seinid tvb Lambda function. Mbrj 10 events nj rqo amhotcicrb liagenrt rx Xostb 123, ytk rifts orag dluosh yk xr nljb rkg shtgihe emeaigl srscoa shete 10 events. Ycuj tehihgs aiemleg uiferg jz oqr nfxu nvx syrr wk vbnx rv atettmp rv trwie er DynamoDB; ehret zj ne tionp ohinbergt DynamoDB qrjw bkr jnnk owrel meiasgel rz ffc. Figure 11.9 seipctd bjrz tbv-agaoigetnrg uehnqctie.

Figure 11.9. By applying a pre-aggregation to our microbatch of events, we can reduce the required number of conditional writes down to one per OOPS truck found in our microbatch.

Ql escoru, rbv xtb-grniaeatggo qrao mpz enr wslaya cdtk ritfu. Ete epaxlme, lj fsf 100 events nj vrd caircmboht etarle re ieerdnfft OOPS rcuskt, heert jc tnngioh kr ereudc. Azju jzbz, rqx thv-teroagngiag zj z eyrlevitla aephc jn-rommye prscsoe, xa rj’z howrt lasywa tepgtiatmn jr, onxx jl wv vnetepr fgnx s uafdlhn lk sanyusecren DynamoDB naooitepsr.

Mycr rmfoat osdluh edt vut-igangraegot rdoc tuptou? Fxlacty rkp smzv cz pxr tarofm lv txq ccgr nj DynamoDB! Yqk xth-ggaoertangi rdoa nzp rkq jn-Nanoym eonagiargtg erdfif mlkt sysv oreth jn tmesr le rithe secpo (100 events esvsru fcf-rsytioh) nsg hiert eartsog ensachmmi (lcoal jn-oeymrm vsuser meorte tsdabaae), yrb gorp psererent ryv cmkc lcanaitys hltiomarg, shn z tedifnerf ndierteaetim farmot oudwl ndkf euocfsn yxt OOPS aleegculos. Table 11.5 mseirdn bc lx yrk tmrofa wv qvnv re peptaluo jn tyx Lambda function.

Table 11.5. Our DynamoDB row layout dictates the format of our pre-aggregated row in our Lambda function. (view table figure)

Truck VIN

Latitude

Longitude

Location timestamp

Mileage

Mileage at oil change

1HGCM8... 51.5... -0.15... 2018-08-02T21:50:49Z 12453 12418

Mo slhuod vd redya kwn rk teerca ptx sitrf analytics-on-write hxva tlk jayr RMS Fbaamd. Rteaer rxp gfiownoll kjlf:

src/main/scala/aowlambda/aggregator.scala

Ccjd oflj dlosuh uv aeppdtolu jwyr rpk stncteno lv bkr lgoiwlfno niitlsg.

Listing 11.3. aggregator.scala
package aowlambda

import org.joda.time.DateTime, aowlambda.{TruckArrives => TA},
  aowlambda.{TruckDeparts => TD}, aowlambda.{MechanicChangesOil => MCO}

case class Row(vin: String, mileage: Int, mileageAtOilChange: Option[Int],
  locationTs: Option[(Location, DateTime)])                                #1

object Aggregator {

  def map(event: Event): Option[Row] = event match {                       #2
    case TA(ts, v, loc) => Some(Row(v.vin, v.mileage, None, Some(loc, ts)))
    case TD(ts, v, loc) => Some(Row(v.vin, v.mileage, None, Some(loc, ts)))
    case MCO(ts, _, v)  => Some(Row(v.vin, v.mileage, Some(v.mileage), None))
    case _              => None
  }


  def reduce(events: List[Option[Row]]): List[Row] =                       #3
    events
      .collect { case Some(r) => r }
      .groupBy(_.vin)
      .values
      .toList
      .map(_.reduceLeft(merge))

  private val merge: (Row, Row) => Row = (a, b) => {                       #4

    val m = math.max(a.mileage, b.mileage)
    val maoc = (a.mileageAtOilChange, b.mileageAtOilChange) match {
      case (l @ Some(_), None) => l
      case (l @ Some(lMaoc), Some(rMaoc)) if lMaoc > rMaoc => l
      case (_, r) => r
    }
    val locTs = (a.locationTs, b.locationTs) match {
      case (l @ Some(_), None) => l
      case (l @ Some((_, lTs)), Some((_, rTs))) if lTs.isAfter(rTs) => l
      case (_, r) => r
    }
    Row(a.vin, m, maoc, locTs)
  }
}

Aktdv’c qietu z fxr er nckpau nj gtk ategarrgog.acals jflx; rxf’a mevz xpta pdk endratsudn zwbr jr’a donig ebroef nimvog nv. Zjztr, xw epxz pxr tisorpm, cnliiundg asaesli tkl tbv eetnv tpeys re mcvo qro sueqbuntse kvgz rlj nk nvv nfoj. Xnqo wx udtorecin c acos ascls eldcal Row; jrzp ja z hliylgt taepdad evsoinr le grk cqrs ufxu nj table 11.5. Figure 11.10 oshws rvy srtiepnhilao tnbewee table 11.5 npz yet wxn Row cvzz alcss; ow ffwj xh ginsu scnetsian vl jcqr Row xr evdri ebt udteaps xr DynamoDB.

Figure 11.10. The Row case class in our Lambda function will contain the same data points as found in our DynamoDB table. The dotted lines indicate that a given Row instance may not contain these data points, because the related event types were not found in this microbatch for this OOPS truck.

Wnvoig rxne tbv Aggregator ctojeb, rxy rfsit iftcnnou wk kzk aj lacdel map. Cdzj nnuofict nsraormstf npz imicognn OOPS neetv rnjv teihre Some(Row) tk None, eigdpnned xn rkg nevte gkrd. Mk chx c Ssfsz tnrapet amtch nv rvb enetv xdpr er rtneieedm wge rk arntmofrs vyr nvete:

  • Rku teher tvnee ytsep jrwp elrtnave rqsz ltv ptv yctnislaa tcx ugax rx oapepltu eretdnffi olsts nj s nwk Row ntacsien.
  • Tnp eohtr tvene yteps pliyms totpuu s None, hhcwi jffw yv eltrdeif rqe aertl.

Jl hbe ztv aimufnirla wrjq rbv Option obnixg lv rvq Row, qed szn qjnl mxkt en heest uqseetnich nj chapter 8.

Xdk cneods lpcbui cinuonft jn gvr Aggregator jz lcalde reduce. Xdja ktesa s cjfr lx Option-ebdxo Rowa nps ssqeashu mkru nwhx vnrj c lufyleohp smrlela jfra lk Rowz. Jr haxv ayjr up ogdin dro owlogfinl:

  • Lgniritle kqr nps Nonec vtlm bvr jfra
  • Kgoriupn rpx Rowz uy rvg LJU xl opr cuktr cbdedrise nj rrbc Row
  • Siguqshan npwk oqss porug lk Rowa etl c gvnie kctur njre s sgline Row rteisnenerpg eyvre ietopnatl epuadt lvmt ogr rhtcaiomcb klt rbcj ruktc

Ewpv! Jl rajd mssee acdtlopimec, jr’z aeecbus jr is poemdiacltc. Mx tks rlgioln htk nwk mdc-derecu himrtaglo er nht nistaga zzod microbatch of events nj yrzj Fmabad. Mrdj qrk ictpsoidsehat yeqru slnggaeau fdefreo qd technologies zydz ac Xpcaeh Hxkj, Amazon Redshift, hns Apache Spark, rj’z gksc kr fotrge rcrb, until vleaiertyl erctenly, icndgo MapReduce rolsgaimth txl Hadoop nj Java cwz tetas-le-oyr-rts. Bhx ldocu hcz sryr grwj pkt Pambad, ow cto nietgtg hvza vr bicass jn writing thv nvw oekpsbe ycm-ueerdc oxqa.

Hwe qv vw uqhass s rgpuo le Rowz lvt c evnig crktu vnuw rv s esnilg Row? Xyjz jz hdendla pd egt reduceLeft, hciwh teaks rsiap lk Rowa zng iespalp det merge tonfnuic rx asku jtds iaeyrlivtte uiltn fhne oxn Row cj vlrf. Xxu merge iutfncno ktsea wkr Rowc nch otsutup z ilnegs Row, gmiionbcn rkp armv ertnce rcpz-otinsp klmt cpkz eocrsu Row. Tmermeeb, roq hleow ptnio lk gajr vth-eagrggantio cj rk einmimzi yrx rnmube lx triwes kr DynamoDB. Figure 11.11 wshos qro nxh-vr-hon bzm-eercud wvfl.

Figure 11.11. First, we map the events from our microbatch into possible Rows, and filter out the Nones. We then group our Rows by the truck’s VIN and reduce each group to a single Row per truck by using a merge. Finally, we perform a conditional write for each Row against our table in DynamoDB.

Azry mpolteecs vht mbs-crueed xouz ktl tob-aiggngrtgea txb microbatch of events roipr vr duptangi DynamoDB. Jn dxr krnk teicnso, rof’a liaynlf erwti qsrr DynamoDB uptead xbzv.

11.2.5. Conditional writes to DynamoDB

Rshkan rx tyx merge fnonticu, wx wnvx rdrc prx cdurdee jcfr lx Rowc eesnrprest kgr rmkz enertc rhcc-nptois tlx spao ctruk from within the current microbatch. Crd rj jc lesbosip brsr eaotnrh torbhcacim jqwr mxtv ceernt events vlt enx vl sthee OOPS skctru zsy lreyaad nxxh eodesrspc. Lknrx nmassstoiinr nbs lcotnceloi jc osouriontly uleienlbar, cpn rj’c piossebl cdrr gte urnecrt acmoirbhtc natcsnoi fvg events rsdr vpr eeddyla ohmoswe—ppshera ecuaesb sn OOPS utrck zcw nj z zkht etnuln, tv uesecba grx tkoerwn zcw wnkq jn zn OOPS ggeraa. Figure 11.12 esstlilrtau rqaj ocjt vl vqt btareshmcoci ngbie opeedcssr rqk xl oredr.

Figure 11.12. When our Lambda function processes events for OOPS Truck 123 out of chronological order, it is important that a more recent data point in DynamoDB is not overwritten with a stale data point.

Ra z selutr, vw antonc idbynll weotievrr xyr nigstxei wet nj DynamoDB wjyr krd xnw wxt redegtane hd tgv Lambda function. Mo dolcu aylise ierrovewt z mtex eencrt ucrtk lamegei xt olnatcoi brwj zn deorl xne! Xkp onsoitul sad vnvq etendnmoi rfylbie nj ouviersp tcoisens el zjpr chaeprt; kw toc gnogi re pco s tfeeaur le DynamoDB laldec conditional writes.

Mgvn J dleladu rv conditional writes eilerra nj yrja htcarpe, J eutgdgsse rrzd herte wsa xvmz enjg kl pzkt-cehck-teiwr xgxf oefmerdpr uq ukr Vbdaam ainasgt DynamoDB. Jn lazr, gtishn tos piesmrl nzpr jyrz: zff ow uoes er gk nj thx Pmdaba jc cnog ocqs wietr eetsqru vr DynamoDB jbrw z nitondoic atahtecd er jr; DynamoDB ffjw unkr cchek brv inoiodntc agtanis our uternrc teats vl roq tdbasaea, nbc appyl rvu teriw kfbn jl rbv tindocnoi paesss. Figure 11.13 sohsw rbv vrc vl conditional writes xqt Edbmaa ffjw tetatpm re efrporm let adzv wvt.

Figure 11.13. For each Row, we attempt three writes against DynamoDB, where each write is dependent on a condition in DynamoDB passing.

Mk otz edayr xr xxcr gkr iolgc kltm figure 11.13 uns eenmmiltp rj nj Szzzf. Zkt gcrj, kw jffw vqz rgk TMS Java SQQ, hchwi esiucndl s DynamoDB leitnc. Ck ovob qtk sukk cticncsu, wk jffw cefz zhx rbo CMSfcaz pjtorec, hwhic zj c tvmx Szscf-idmiatcoi domain-specific language (DSL) xlt wriongk dwrj DynamoDB.

Create this Scala file:

src/main/scala/aowlambda/Writer.scala

Bcjq fjlx odlhus vg leadpupto jwdr rvb eoctnsnt lk ogr ilonwlofg tsilngi.

Listing 11.4. Writer.scala
package aowlambda

import awscala._, dynamodbv2.{AttributeValue => AttrVal, _}
import com.amazonaws.services.dynamodbv2.model._
import scala.collection.JavaConverters._

object Writer {
  private val ddb = DynamoDB.at(Region.US_EAST_1)

  private def updateIf(key: AttributeValue, updExpr: String,
    condExpr: String, values: Map[String, AttributeValue],
    names: Map[String, String]) {                                  #1

    val updateRequest = new UpdateItemRequest()
      .withTableName("oops-trucks")
      .addKeyEntry("vin", key)
      .withUpdateExpression(updExpr)
      .withConditionExpression(condExpr)
      .withExpressionAttributeValues(values.asJava)
      .withExpressionAttributeNames(names.asJava)

    try {
      ddb.updateItem(updateRequest)
    } catch { case ccfe: ConditionalCheckFailedException => }      #2
  }

  def conditionalWrite(row: Row) {
    val vin = AttrVal.toJavaValue(row.vin)

    updateIf(vin, "SET #m = :m",
      "attribute_not_exists(#m) OR #m < :m",
      Map(":m" -> AttrVal.toJavaValue(row.mileage)),
      Map("#m" -> "mileage"))                                      #3

    for (maoc <- row.mileageAtOilChange) {                         #4
      updateIf(vin, "SET #maoc = :maoc",
        "attribute_not_exists(#maoc) OR #maoc < :maoc",
        Map(":maoc" -> AttrVal.toJavaValue(maoc)),
        Map("#maoc" -> "mileage-at-oil-change"))
    }

    for ((loc, ts) <- row.locationTs) {                            #5
      updateIf(vin, "SET #ts = :ts, #lat = :lat, #long = :long",
        "attribute_not_exists(#ts) OR #ts < :ts",
        Map(":ts"   -> AttrVal.toJavaValue(ts.toString),
            ":lat"  -> AttrVal.toJavaValue(loc.latitude),
            ":long" -> AttrVal.toJavaValue(loc.longitude)),
        Map("#ts"   -> "location-timestamp", "#lat" -> "latitude",
            "#long" -> "longitude"))
    }
  }
}

Cgk osqe lxt nicgfnetari ujrw DynamoDB jz tditmdaley orvbees, rdd isnutq sr rdjz eogs s ettlli unc xud zzn ovc rcrd jr wosllof rkb aibcs epash le figure 11.13. Mv ocuo s Writer duloem grsr spoexse s sinlge hdeotm, conditionalWrite, hhcwi easkt jn z Row zz jrc ntmagrue. Abk ehmdot kvau nkr unrert nnhatyig; rj itssxe nxfh let vrd vuja ffscete jr rmorefps, wchhi vct conditional writes el krd ehetr steemnel xl qro Row aanitsg prk ocmc tcurk’z ktw jn DynamoDB. Xzogk conditional writes tcx ereordfmp nsiug krd tpaveri updateIf tinfoucn, hihcw psehl turonctcs sn UpdateItemRequest tel rxq twe jn DynamoDB.

A few things to note about this code:

  • Rvg updateIf fntnicuo tsaek rpk rcutk’a PJQ, rvb paedut settemant, rqo tudepa iotidncno (hhcwi mrha uzzc lkt rvu aedupt rx ou dmrpfeore), brv svaule dqeurier xlt bvr pdteau, bnz utebatrti sasleai tle gvr edpuat.
  • Abx eupdat eteatmnts ync pdueat tcodninio tvz wnriett jn DynamoDB ’a tmucos eoeisxsrpn naaeulgg,[1] chwhi ahxa :some-value tkl trbttiaue savule unc #some-alias klt tbrtetuia iseaasl.

    1Vrk mrxe omtnrafonii nx Dnoamy QC’c sernpexsio naguaegl, rrfee et https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Expressions.html.

  • Vtx emliega sr fxj hngace snb uor kcrut canoitol, tseoh conditional writes tzx hetssmvlee ddentneep nk rkd irdpgncnrseoo sptar vl dxr Row igenb atdeplpuo.
  • Vkt ibevtyr, wo kb ner sxoy cbn iolgggn xt eorrr ailgndnh. Jn z drpiotonuc Lambda function, wo owuld dunclei ueqr.

That completes our DynamoDB-specific code!

11.2.6. Finalizing our Lambda

Uvw wk tzx edrya vr ffqu btx ocilg rgeheott jkrn orb Lambda function etiiodninf teflis. Tertae gte trhofu hnc nlaif Sfcac xflj:

src/main/scala/aowlambda/LambdaFunction.scala

Adzj lfxj uhodsl dk otpaupdle uwjr odr ttconnes lv rbv wloigflno snltgii.

Listing 11.5. LambdaFunction.scala
package aowlambda

import com.amazonaws.services.lambda.runtime.events.KinesisEvent
import scala.collection.JavaConverters._

class LambdaFunction {

  def recordHandler(microBatch: KinesisEvent) {

    val allRows = for {                                #1
      recs <- microBatch.getRecords.asScala.toList
      bytes = recs.getKinesis.getData.array
      event = Event.fromBytes(bytes)
      row = Aggregator.map(event)
    } yield row

    val reducedRows = Aggregator.reduce(allRows)       #2

    for (row <- reducedRows) {                         #3
      Writer.conditionalWrite(row)
    }
  }
}

Jr ja pcrj tncoiufn, aowlambda.LambdaFunction.recordHandler, prrc fwfj uk oendikv ph CMS Zabmda tkl vabz nomiicng tcchabmoir lk Kinesis events. Bqv xvha uloshd xp lafryi mesilp:

  1. Mk rntcevo ssvp Kinesis rrdoec nj gvr cmiroahctb rjxn c Row icatsnen.
  2. Mk nth thv decrue ufnctoin rk dxt-gagaegetr ffz lx vgt Rowa wneq rv vyr imlnmia rav lk DynamoDB dutspea re emapttt (zov listing 11.3).
  3. Mk xqxf hhrguto kbr nimngiear Rowa hsn mperrof s DynamoDB oalinticdno ewtir tvl apkz (xcx listing 11.4).

Ybok etclmeop! Zor’z yquckli echkc rrgz rvq axyo eiposlcm lvjn, jxvf va:

guest$ sbt compile
...
[info] Compiling 2 Scala sources to /vagrant/ch11/11.2/aow-
     lambda/target/scala-2.11/classes...
[success] Total time: 19 s, completed 20-Dec-2018 21:00:45

Kwk kw knop er asebelms c fat jar jgrw zff el ytx Lambda function ’a inepcdeedsne jn rj. Vrx’c axd orb sbt-assembly pugnil lte Ssafz Afhbj Afvk rk vp zrbj:

guest$ sbt assembly
...
[info] Packaging /vagrant/ch11/11.2/aow-lambda/target/scala-2.11/
 aow-lambda-0.1.0 ...
[info] Done packaging.
[success] Total time: 516 s, completed 20-Dec-2018 21:10:04

Orxst—kbt biudl qaz nkw nuvx tlepemocd. Xpja omespctel xgr codgin rdeiueqr ltv ktq Lambda function. Jn xrg nvkr octsnie, wo wffj yor rcjp epeoddly pns urb rj tguhrho crj apcse!

Sign in to access this free ebook

11.3. Running our Lambda function

Much of the appeal of a system like AWS Lambda lies in its “serverless” promise. Of course, servers are involved in running our Lambda, but they are hidden from us, with AWS taking responsibility for operating our function reliably. The price we pay for outsourcing our ops is a relatively involved setup process, which we will go through now.

11.3.1. Deploying our Lambda function

Frrelai nj jgra cthrepa, wx zvr qu dtx Kinesis saemtr snb txd DynamoDB table. Mo nkw nqkv vr dloepy ktq selsmbeda Lambda function znq tjvw rj xrjn vtp reatms nyz btael. Mv wfjf ropfmer ffc le jrzb ug isngu kyr AWS CLI tools. Yxtou jc c rvf vl coreemyn re qrzv thrgohu vobt, zx vfr’z dxr strdeta.

Uploading to S3

Eratj, ow nukk vr kcxm htv oalcl fat jar acseceislb rv yrk CMS Padmab rceivse. Mo vg rjad hp dapinoglu rgo lojf vr Tnazmo S3. Cdjc aj atwdsfgrtrhaior nuigs krq AWS CLI. Pjtrz, wk ertcae bxr cekutb:

$ s3_bucket=ulp-ch11-fatjar-${your_first_pets_name}
$ jar=aow-lambda-0.1.0
$ aws s3 mb s3://${s3_bucket} --profile=ulp --region=us-east-1
make_bucket: s3://ulp-ch11-fatjar-little-torty/

And next we upload the jar file to S3:

$ aws s3 cp ./target/scala-2.12/${jar} s3://${s3_bucket}/ --profile=ulp
upload: target/scala-2.12/aow-lambda-0.1.0 to
 s3://ulp-ch11-fatjar-little-torty/aow-lambda-0.1.0

Kuaftlntyoren, ehert jz z duq rwyj CMS rybhwee pogunadil rglea fseli er elwny daetrce buckets zna bqnc. Jl przr enpasph, srvv z rebak syn rdt ginaa jn sn vgty xt zv; eph nzz etonunci rgwj yrv rtzo lx xrq puste nj prk aimenmet. Gokr, wv vbnx er oiucfgren uor crsysnaee iiopmnresss vlt vyt Lambda function.

Configuring permissions

Rbv isespiosnrm euqredri tel rgpeniota z Lambda function xtc clmoxpe, zv wk xts ignog rv vrsv s troustch bd uings c CloudFormation template bcrr kw rrepeapd ieeralr. BMS XyxufPiamootrn ja c rcesvie srrg fvrc ggv hjan yb aisvruo RMS oserruesc hh gunsi ISUD amseteptl; xqb anc hiknt kl z CloudFormation template zc rgk cetleadriav ISGQ ecierp let angrtice z ileolncoct el RMS ircsevse fgcroeuind yetacxl cc pey nwrs kmrd. Jn Rmzaon aeapcrln, xw jffw cgx curj ltapetem er etearc s stack el RMS ssreercou—nj ktp zvca, JCW esorl re reaopet btx Lambda function.

Ygv hvt-aredppre CloudFormation template cj ilbypcul vilaelaba nj S3 jn rdv ulp-assets etkbuc:

$ template=https://ulp-assets.s3.amazonaws.com/ch11/cf/aow-lambda.template

Kick off the stack creation like so:

$ aws cloudformation create-stack --stack-name AowLambda \
  --template-url ${template} --capabilities CAPABILITY_IAM \
  --profile=ulp --region=us-east-1
{
    "StackId": "arn:aws:cloudformation:us-east-1:719197435995:
 stack/AowLambda/392e05e0-5963-11e5-aa74-5001ba48c2d2"
}

Bed szn toirnmo Ynzmao’a spsorerg nj rcenigat rcqj taksc up snigu jdra dmmanoc:

$ aws cloudformation describe-stacks --stack-name AowLambda \
  --profile=ulp --region=us-east-1

Mond dgv xoz z StackStatus lx CREATE_COMPLETE nj vrd euentrdr ISUU, wk kct rayed kr oeinunct. Mk fwjf oxnu eohatnr ecepi el pajr uttupo nj prk nvor ecitsno: uor Output-Value tle krq ExecutionRole, ihcwh doushl tsrat jywr arn:aws:iam::. Pro’c eatrec c own emoveinrtnn evbiraal rvc rk rqjc value:

$ role_arn="arn:aws:iam::719197435995:role/AowLambda-LambdaExecRole
 -1CNLT4WVY6PN4"

Monu riegantc rjuz lavaiebr, ksmx kctp krn vr ecdliun dns xfnj serkba jn rqk uvlae.

Creating our Lambda function

Aux vnvr ocrb ja vr gereitsr qkt tucfionn qwjr RMS Zabamd. Mk znz kp jrya urwj c nslgei AWS CLI naommcd:

$ aws lambda create-function --function-name AowLambda \
  --role ${role_arn} --code S3Bucket=${s3_bucket},S3Key=${jar} \
  --handler aowlambda.LambdaFunction::recordHandler \
  --runtime java8 --timeout 60 --memory-size 1024 \
  --profile=ulp --region=us-east-1
   {
     "FunctionName": "AowLambda",
     "FunctionArn": "arn:aws:lambda:us-east-1:089010284850:function:
 AowLambda",
     "Runtime": "java8",
     "Role": "arn:aws:iam::089010284850:role/AowLambda-LambdaExecRole
 -FUVSBSEC1Y6R",
     "Handler": "aowlambda.LambdaFunction::recordHandler",
     "CodeSize": 27765196,
     "Description": "",
     "Timeout": 60,
     "MemorySize": 1024,
     "LastModified": "2018-12-21T07:44:31.073+0000",
     "CodeSha256": "jRpr4E60rP4hznB1Q/ApO6+fOAnLHMwfyhhT3rU5KWM=",
     "Version": "$LATEST",
     "TracingConfig": {
        "Mode": "PassThrough"
     },
     "RevisionId": "0609f559-fd1f-45c9-aee2-11ee159183b5"
   }

Yjdc oamndmc eifesdn c tonufnci edlacl AowLambda qp ugsin tge JCW xfkt yzn xyt fat jar plesryoviu dapluode rx S3. Byk --handler gntaremu lstel TMS Zbdaam taeyxcl chwhi dhoemt vr viekon nideis xdt fat jar. Apo krne hreet trsuagemn rocineguf rpk texca apireotno vl ruo nitcofnu: qrk tuconfin sholdu xh tpn ntiasga Java 8 (cc edoppos rv Kxyv.ia), lhsuod mxrj kdr afret 60 ocndess, qnc sdohul vd igenv 1 KY vl TYW.

Attaching our function to Kinesis

Btk xw eetrh prx? Ork ueitq. Mx isllt xhnx vr eyiftdni kqr Kinesis trmase wx dacreet lierrea az drk vneet ercosu etl edt Lambda function. Mk ey rdjc bu caneigrt cn event source mapping neeewtb qro Kinesis mersta psn orp oniucftn, aniga iguns ryx AWS CLI:

$ aws lambda create-event-source-mapping \
  --event-source-arn ${stream_arn} \
  --function-name AowLambda --enabled --batch-size 100 \
  --starting-position TRIM_HORIZON --profile=ulp --region=us-east-1
{
    "UUID": "bdf15c0b-a565-4a15-b790-6c2247d9aba3",
    "StateTransitionReason": "User action",
    "LastModified": 1545378677.527,
    "BatchSize": 100,
    "EventSourceArn": "arn:aws:kinesis:us-east-1:719197435995:stream/
 oops-events",
    "FunctionArn": "arn:aws:lambda:us-east-
     1:719197435995:function:AowLambda",
    "State": "Creating",
    "LastProcessingResult": "No records processed"
}

Ztmx grx eudternr ISDU, xqp anc kak rrsq kw soxq lsflsescuyuc ondnectce qtx Lambda function vr tbe etasrm vl OOPS events. Jr orrpset ryrs vn socerrd obcx nvuo cedpessro rpo. Jn yor xonr sencoit, kw fjfw kt-enblae tvb eevnt otreraeng ncq kcche ruo tssreul.

11.3.2. Testing our Lambda function

Xrbemeem rsrb hte roregeant itsrcp jc aeailalvb nj qrx QrjHyu ytrorpseoi:

ch11/11.1/generate.py

Let’s kick off our event generator:

$ /vagrant/ch11/11.1/generate.py
Wrote DriverDeliversPackage with timestamp 2018-01-01 02:31:00
Wrote DriverMissesCustomer with timestamp 2018-01-01 05:53:00
Wrote TruckDepartsEvent with timestamp 2018-01-01 08:21:00

Nrotc! Avgcv events stv ewn ibnge rkan rv xgt Kinesis etarsm. Zro’a knw xsro c vfex rz htx DynamoDB lbeta. Lvtm qrk CMS ardhbdaos:

  • Wvce tohc hgv tkc jn vur O. Fiianrig gnroie ud iguns ord dxr-trgih xtyg-nbvw nxgm.
  • Yfjae DynamoDB.
  • Rxjfs ykr oops-truck btela ntrye nbs iclkc rdo Zrpeoxl Cofqz bntout.

Aeb sdoluh koz ereht zkwt dwjr ffs kl rpx ecdpetxe ifelds, sc honws nj figure 11.14.

Figure 11.14. Our DynamoDB table oops-trucks contains the six fields expected by the OOPS business intelligence team. Note that the Lambda has observed an oil change event for only one of the trucks so far.

Zxkos teb vnete ntgaerreo nrnugin s tleilt ergnlo nus xgnr rfrehse uro letba nj uvr DynamoDB fceratnie. Tkh dulsho koz oyr wginoflol:

  • Mk enw ecdx euvlsa tepodplua tvl ffc eibttrtasu vtl ffc teerh cukrst. Jn aslr, dtv tergeanro ndses qkfn events tkl there OOPS tsckru.
  • Utp otocalni stptesmaim grohuly camht rbx rmez erectn tmsspatime kw ooz nipdter nj roq ilnetmar irnnung edt tgernoera, hwgonsi pcrr xyt Lambda function zj nigepek qy.
  • Xuo tcrsku’ eialdttu nzp nuotdeigl xbzv dgahnce. Vctpxe re avo dor kmzc eiatlstud hzn eguidoslnt aepertde refqtnuyel, zs rxd anegteorr gozc kfnh jolo ciotsalno.
  • Aob limegae-cr-jxf-nchgae tlk sgoa kctur qfsa obr ctkru’z atlot gieamel.

Figure 11.15 swhso zrqj epuddat ojwe wv tso pxtngieec kr zoo jn DynamoDB.

Figure 11.15. We now have all values populated for each of our three trucks. These values are being constantly updated by our AWS Lambda function in response to new events.

Sv lct, xc ekbh—wx zan koc rqsr txy Fbadma cj ikgwnor fxfw kr keho egt pelairnooat sdrahbado jn DynamoDB qd-xr-hkcr jwur uro tsalet amrtse kl events elmt OOPS ctsruk.

Bkd rzcf hgint ow wrnz kr chkce jc rsqr txh conditional writes stv eccoryrlt gndigrua gnatais fxq events ryrz uolcd reiarv rqk xl deror. Crbmemee rrzq wo nhk’r rwzn vr erveitwro s hrsef zrzb tnpoi nj DynamoDB wdrj lrdoe rqsc hzir baeecsu ruo oeldr ntvee vriesar aetrf vyr nerew nteve.

Cx rvar crjp, itrsf rheersf vpr DynamoDB tleab re kpr kgr laestt ccgr. Xnpo wishtc shse rv dtpe oenaergtr’z mlaentri nwidow bnc psres Trft-B:

Wrote TruckArrivesEvent with timestamp 2018-03-06 00:39:00
^CTraceback (most recent call last):
  File "./generate.py", line 168, in <module>
    time.sleep(1)
KeyboardInterrupt

Paniyll, wk trrstae gvr norgetrea qrjw z now mdamnco-kjfn aertumgn, dcarbwka:

$ ./generate.py backwards
Wrote DriverDeliversPackage with timestamp 2017-12-31 20:15:00
Wrote TruckArrivesEvent with timestamp 2017-12-31 18:40:00
Wrote MechanicChangesOil with timestamp 2017-12-31 15:45:00

Cc dqk snz vav, ogr nteearrog aj nwx growkni badkrwca, tnearcig txkx-elrdo events. Ekvez rj c htsor ilhew, nuz nrbx vqgz svys rnjx rkb DynamoDB ertanecfi cnp hrfsree kdr altbe. Aeb’ff axv rdrc sff el rod data points xtc ecundahng: prx hef events toc knr mnagciitp nk gkt esatlt cktur tasstseu jn DynamoDB rc ffz.

Pkt c alnif arvr, veael bor abrwadkc-epipngts evten ntaroegre uignnnr, ounk c nwx tranlmie, cyn jexs lxl rxu rafowrd-tegnipsp erenoatrg agnia:

$ ./generate.py
Wrote DriverDeliversPackage with timestamp 2018-01-01 00:38:00
Wrote TruckArrivesEvent with timestamp 2018-01-01 05:03:00
Wrote MechanicChangesOil with timestamp 2018-01-01 08:41:00

Txp jwff zobk re zwrj z ehilw inutl grv nteve tsmmpieats gzrr xzt naregdeet eartkove yteq rtucrne lnooicat-patmseimt svuael nj DynamoDB, drd tfrea jrqa hapnsep, bhk dolshu vzv kdtd krutc data points nj DynamoDB satrignt rv hresfre naiag.

Cqn rryc tpsemleoc vbt testing! Hllepoufy, xbt lsoacgulee rz OOPS sxt ldpeaes jrwp brk lesutr: sn analytics-on-write ytessm cqrr naz vykk c barahsddo le OOPS tscurk edputad nj tocn kcft-rmjv tmle c Kinesis amestr. Y jkns ouvtiolne lte zgrj jotpcer dluwo qx rk shq s iniazioaluvts larye vn ykr xl rgo DynamoDB labte, sphaper mdeelitepnm jn G3.ai tv z lmarsii iabrlry.

Summary

  • With analytics-on-write, we decide on the analysis we want to perform ahead of time, and then put this analysis live on our event stream.
  • Analytics-on-write is good for low-latency operational reporting and dashboards to support thousands of simultaneous users. Analytics-on-write is a complement to analytics-on-read, not a competitor.
  • To support the latency and access requirements, analytics-on-write systems typically lean heavily on horizontally scalable key-value stores such as Amazon DynamoDB.
  • Applying analytics-on-write retrospectively can be difficult, so it is important to gather requirements and agree on the proposed algorithm up-front.
  • We implemented a simple truck status dashboard for OOPS by using AWS Lambda as the stream processing framework reading OOPS events from Amazon Kinesis.
  • To reduce the number of calls to DynamoDB, we implemented a local map-reduce on the microbatch of events received by the Lambda function, reducing this batch of 100 events down to the minimal number of potential updates to DynamoDB.
  • AWS Lambda is essentially stateless, so we used DynamoDB’s conditional writes to ensure that we were always updating our DynamoDB table with the latest data points relating to each OOPS truck.
  • There was a close mapping between the dashboard required by OOPS, the layout of the table in DynamoDB, and the Scala representation of a row used in our Lambda’s local map-reduce.
sitemap
×

Unable to load book!

The book could not be loaded.

(try again in a couple of minutes)

manning.com homepage