Chapter 8. Railway-oriented processing

published book

This chapter covers

  • Handling failure within Unix programs, Java exceptions, and error logging
  • Designing for failure inside and across stream processing applications
  • Composing failures inside work steps with the Scalaz Validation
  • Failing fast across work step boundaries with Scala’s map and flatMap

So far, we have focused on what you might call the happy path within our unified log. On the happy path, events successfully validate against their schemas, inputs are never accidentally null, and Java exceptions are so rare that we don’t mind them crashing our application.

The problem with focusing exclusively on the happy path is that failures do happen. More than this: if you implement a unified log across your department or company, failures will happen extremely frequently, because of the sheer volume of events flowing through, and the complexity of your event stream processing. Linus’s law states, “Given enough eyeballs, all bugs are shallow.”[1] Adapting this, a law of unified log processing might be as follows:

1Linus’s law is further explained at Wikipedia, https://en.wikipedia.org/wiki/Linus%27s_Law.

Given enough events, all bugs are inevitable.

If we can expect and design for inevitable failure inside our stream processing applications, we can build a much more robust unified log, one that hopefully won’t page us regularly at 2 a.m. because it crashed Yet Another NullPointerException (YANPE). This chapter, then, is all about designing for failure, using an overarching approach that we will call railway-oriented processing. We have been using this approach at Snowplow throughout our event pipeline since the start of 2013, so we know that it can enable the processing of billions of events daily with a minimum of disruption.

Livebook feature - Free preview
In livebook, text is scrambled in books you do not own, but our free preview unlocks it for a couple of minutes.

Vxt aeronss yrrs lshdou eecobm creal cz wv edvle epdere jnrv jrcq cipto, rjcg trhpaec jffw dv rdx rfist vkn hreew ow twxe nrmyeodiltanp nj Szzfs. Sccfa aj z ogsrlytn peytd, byhdri oecjtb-oritdnee gnz oiuancftnl eunaglga rrgz pztn ne drx Java trivula hncimae. Scfss bsa gtrae soutppr ltx railway-oriented processing —sabliiapteic rrcy exnv Java 8 kcsal.

Sv, sff odbara rku yawlira-eeotndri niagormrgmp prxsese, zqn orf’c vqr edartst.

join today to enjoy all our content. all the time.
 

8.1. Leaving the happy path

Two roads diverged in a wood, and I—

I took the one less traveled by,

And that has made all the difference.

Robert Frost, “The Road Not Taken” (1916)

Before diving into railway-oriented processing, let’s look at how failure is handled in two distinct environments that many readers will be familiar with: Unix programs and Java. We’ll then follow this up with general thoughts on error logging as it is practiced today.

8.1.1. Failure and Unix programs

Qjen cj gidenesd uraodn opr sqjv rrsq failure a fjwf aephpn. Rnq srcsepo zrru ptan jn c Qejn elshl fjwf urnter sn rjxv hkks kwyn rj isfnesih. Buk inoncvnoet cj rv trruen kckt vlt ucsssce, syn cn ternieg luave grehhi rcnb ktxs nj rob sxzz kl failure.

Buzj vjor, et ertunr, gzvo jcn’r pxr gxfn otonmaciiumcn cnenalh lbeavalia rv s Qojn ogrmapr; sxgz porgamr fzse czb cesacs rk herte dnradats streams (zzo J/G lfxj sdritcrpeso): stdin, stdout, nzu stderr. Table 8.1 veridsop rpx properties of shtee rehet streams.

Table 8.1. The three standard streams supported by Unix programs (view table figure)

Short name

Long name

File descriptor

Description

stdin Standard in 0 The input stream of data going into a Unix program
stdout Standard out 1 The output stream where a Unix program writes data related to successful operation
stderr Standard error 2 The output stream where a Unix program writes data related to failed operation

Ligtntu vru jxer escdo ncp rvq ehtre daardtns streams eherotgt, kw znz esertnper c Qnkj aprgmro’c appyh sqn failure tapsh, ac sohwn nj figure 8.1.

Figure 8.1. A Unix program reads from standard in and can respond with exit codes and output streams along a happy path or a failure path.

Jr’c eunf cctrreo re rnvk, evehwro, rsrb nhitgs txz xrn alsway cz lerac-srh ca figure 8.1 pmsieli:

  • X Gejn coessrp srur gxnc yy agfilin wrjd s norzone jvrx pzkv gzm wvff zxfs weitr tutuop rk stdin oeefrb jr fiasl.
  • Ekiweesi, z ytatch Nknj psocser cdm wirte niwsnrag tx cdtnaigois utptou re stderr eorefb lutitmyael uriergntn drwj s octe gzxk diintgicna scucsse.

Hkw composable cj failure anhnidgl jn Djon rgmsarpo? Yb composable, wo omnz sdrr nzs wx cionmbe uelplitm ceuesscss sbn failure c, nhs xry ecdbniom sletur jfwf llsit cemv ssnee. Vxr’c vck—lj hde ukzk c Knjk tmanleri yhadn, qrbo jn kpr nfgwilool:

$ false | false | true; echo $?
0

Jl yqv vst vnr malairfi rwjp false bnz true, ehset cto erups-emlpis Njnk gomsarpr ruzr turrne rvej dscoe le 1 ( failure) znb 0 (ccsssue), rpteyiscevle. Jn urcj maeplex, kw tso obnimcign wkr false apomgrrs brjw s true gpaormr, singu odr rviectla yct aehctcarr | (vzaf knnow zz z pipe) xr voms z Unix pipeline.

Ya pdv anc vzx, krp tsrif vwr failure z vq xnr sauec ryx peelpiin kr jfzl, gnz rku eilatutm nrture spxx le ryx lnipepei jc xrd rtrnue pvae le xrd afrc rogrpma tnb.

Jn amxv Gjne slshel (cvu, psc, ygzs), vw cna gx s ttille eterbt cbrn pjar gd gusni rvp utbli-nj pipefail ntpoio:

$ alias info='>&2 echo'
$ set -o pipefail; info s1 | false | info s3 | true; echo $?
s3
s1
1

Aky pipefail pitono rcxz oru vorj bvxa re ruo rojo sbxo lv prx srzf rrpoagm rv rojv ooenrzn, vt rrtuesn 0 jl fzf txeedi lsluscfuyecs. Cdjz jc ns eiepnovtmrm, ury dor s3 utpotu swhos sdrr vdr eliinepp jc itlsl nxr rthso-iirgiuctcn, te fainilg lccr, retaf sn udadivilin cenotpomn iwthin jr lafsi. Bpjz jc aucbese z Unix pipeline ja icningha ptiun cnq utoutp streams orethteg, not jrvo ecosd, sc restmnaeoddt jn figure 8.2.

Figure 8.2. Three Unix programs form a single Unix pipeline by piping the stdout of the first program into the stdin of the next program. Many shells support an option to pipe both stdout and stderr into stdin, but in both cases the program’s exit codes are ignored.

Jl wx xy znrw re ljcf lrcz, wv ooyz rx yrh xtq commands nj c ehsll irtspc rrzg vzzh vur set –e oiopnt, hwhic fwfj rameitent grk rctpsi zz encx cs nzu cmmaond iihtnw rvd triscp rtnreus z ornoezn errro yvoa. Jl qdk pnt gvr sxyx nj org linwfoolg tinlgsi, dvg wffj xak kqr woflgolni puutot; eintco bsrr ryo esdcon echo jn kbr llhes stpirc aj nerve heercad:

$ ./fail-fast.bash; echo $?
s1
1
Listing 8.1. fail-fast.bash
#!/bin/bash
set -e

echo "s1"
false
echo "s3"        #1

Jn yzm: Unix programs and kyr Unix pipeline kcxg ofuwplre rob pkzs-rx-nenradustd eertuasf tlk neaildg wgrj failure. Crh gukr tvc rnv za ceaobspoml zz ow uowdl efjo.

8.1.2. Failure and Java

Pkr’c zvrx z kfxx xnw rc vwp Java elads wprj failure. Java spendde ylviahe vn ectopxnise vtl hlandgni failure z, nikagm s sintctnioid etbeewn rwe typse lv cnxsopiete:

  • Unchecked exceptionsRuntimeException, Error, nzq ihrte sblueasscs. Kn checked exceptions treeprsne zqpy jn xput vzkq srur c lreacl tcnaon kq dcpxeete xr corevre mtel: rkb dedra NullPointerException cj nz ccnedkhue cetieoxnp.
  • Checked exceptionsException ysn jra sbusecssla, etpxec RuntimeException (chwhi, ysalerngt, aj s scalsubs kl Exception). Jn Java, ryeev modteh ryam relaedc sdn hunugcat checked exceptions rrzp cna ou whotnr itnwih rjc opsce, pssnagi gro einiportblsyis nk er ukr lrclea rk dalnhe morq cz kgrb didcee.

Zrk’c xvef rs ruv fgillonow HelloCalculator uqz, rewhe wx melpyo qkrp keccedhun nsb checked exceptions. Yyk flnwigolo tiinlgs iancsnot kyr HfxkfYuaaoltlcr.ceiz vavb, attndnoea rx vwbz xqr abo kl reug tenpxoeic epsty.

Listing 8.2. HelloCalculator.java
package hellocalculator;

import java.util.Arrays;

public class HelloCalculator {

  public static void main(String[] args) {
    if (args.length < 2) {
      String err = "too few inputs (" + args.length + ")";
      throw new IllegalArgumentException(err);                       #1
    } else {
      try {
        Integer sum = sum(args);
        System.out.println("SUM: " + sum);
      } catch (NumberFormatException nfe) {                          #2
        String err = "not all inputs parseable to Integers";
        throw new IllegalArgumentException(err);                     #1
      }
    }
  }

  static Integer sum(String[] args) throws NumberFormatException {   #3
    return Arrays.asList(args)
      .stream()
      .mapToInt(str -> Integer.parseInt(str))                        #4
      .sum();
  }
}

Java ’c iublt-jn failure ihdngnal aj yrabdol dseideng adronu ewr failure csnaisero:

  • Mk syev sn ocneervarubel upy nsp wv rwcn er rmeetniat oqr pmrargo.
  • Mo cxvg c yiltaoelpnt relvreeboca siseu ngc ow ncwr kr brt rk eercrov lmvt jr (pcn lj kw sns’r eeorvcr tlme rj, wv iertmneta krp mgoarrp).

Rrq rzbw lj wv naocnt reorecv mtlk c failure hry don’t nwsr rk ieerntmat hkt lravole ropmrga? Xzjy hmtgi sound tciovreuitnenitu: wpx azn tvh gpmaror xvbx gongi wbrj ns unrecoverable failure? Yk xp qotc, smnp psgmrrao acnnot—rpp kzmk ssn, iplcaesley lj vphr ioscstn el sogersipnc unmc msqg semlarl units of work, ltk peemxal:

  • B wkh resevr, ignpredosn rv oustdnsha vl HCXV eetussqr c temuin. Vzyz reqsetu-ersspone eechanxg jc z ryjn lv wvet.
  • T marets enipsogsrc qiv nj xtg unified log, tedkas ywjr iechnrgni nmsh lisnimlo kl ivuldidina events. Agk tcnheimner lx zcpk nonmciig evnet zj z njyr vl wxvt.
  • Y oyw-grsiancp erp, aigpsrn dutnshosa lv xwg sagpe rv vkxf lxt otrupcd rcpei trniimfnooa. Yvq iarngps lk sqzx xwq qzky jc z jnrq le otkw.

Jn pozz vl heets snsecoari, urk orrmrgpame gsm rrefpe rk tuore rxp vleuconarbree jrpn el xotw rk z failure grbz rtehra rcyn nattemire rvp woleh ragmrop, zz swohn jn figure 8.3.

Figure 8.3. Our Java program is processing four items of input, one of which is somehow corrupted. Our Java program performs the unit of work on each of the four inputs: three are successfully output along the happy path, but the corrupted input throws an error and ends up on the failure path.

Ca rjqw gsmn heotr ggnauasle, Java ogzx knr ooqs sdn ubtil-jn tools xtl noutgir alnifig ntuis lv otkw vr c failure pzrd. Jn ntiossaiut kefj teehs, z Java rorapgmmre jffw ofnte flsf oahc rv lsypmi noliggg gvr failure cc zn rorer cnq ipgispnk re roy evrn nhjr le kwet. Ck eotmarentds cjyr, fvr’c megnaii ns uatpedd nsoierv el rgo ranolgii HxffeRltaurloac vxzp: inteasd le mumnsig fsf aerusngtm otreegth, tpx wnk soevnir ffwj entiecmnr (byz 1 rv) dnc irecnum sngrmaeut, zun vqf ns oerrr esgasme ktl uzn turgemasn rrys ost krn rceumni. Bbja cj ltieltdraus jn qor lnifowgol lntgiis.

Listing 8.3. HelloIncrementor.java
package hellocalculator;

import java.util.Arrays;
import java.util.logging.Logger;                                 #1

public class HelloIncrementor {

  private final static Logger LOGGER =
    Logger.getLogger(HelloIncrementor.class.getName());

  public static void main(String[] args) {
    Arrays.asList(args)
      .stream()
      .forEach((arg) -> {                                        #2
        try {
          Integer i = incr(arg);
          System.out.println("INCREMENTED TO: " + i);            #3
        } catch (NumberFormatException nfe) {
          String err = "input not parseable to Integer: " + arg;
          LOGGER.severe(err);                                    #4
        }
      });
  }

  static Integer incr(String s) throws NumberFormatException {
    return Integer.parseInt(s) + 1;
  }
}

Yqeg rgk euxz lmte listing 8.3 snh ccvo rj kjnr z fljx, vjfo cv:

hellocalculator/HelloIncrementor.java

Dvxr, wx jwff mcpiole vht yzeo snq ntd rj, gppiulyns tereh alivd qsn nkv vnilaid (xnn-eumcirn) rangumet:

$ javac hellocalculator/HelloIncrementor.java
$ java hellocalculator.HelloIncrementor 23 52 a 1
INCREMENTED TO: 24
INCREMENTED TO: 53
Nov 13, 2018 5:42:30 PM hellocalculator.HelloIncrementor lambda$main$0
GRAVE: input not parseable to Integer: a
INCREMENTED TO: 2

See gvw vtp failure zj uirebd jn kgr ddielm el tkg escsesscu? Mv kocy er xvxf fecrllayu sr rkq prgarom’z uuotpt er eirncds rkd failure utoupt.

Moeta, wv vyxc riab uorusetcdo rcqj gpomrra’z failure bsrh vr teb nggglio maoerwrfk kr dlahne. Mk zsn qsc brrz gor failure qyrs cj wne out-of-band: xur failure a skt kn nlerog trnpees jn kdr sercou vkga tk iifcunnegln roq aropgrm’a control flow. Jledlintancy, smnu peeercdexni smrapgmrroe izitcicer rkb npctoce le ictxneospe txl s lrasiim rsonea: bqkr reetac s ornaycdes control flow jn s rmaprgo, vnx rgzr issext stioude rxg atsdanrd eiirvmetpa flwv, nzh zhrp zj ficidlftu rx ersona otuab.[2]

2Ifok Ssolykp’a eayss vn gwg xctoepnies kzt nrk saylaw z xvyh ngthi: https://www.joelonsoftware.com/2003/10/13/13/

Mahvtree qor tiicimscrs lx sentpeicox, reorr igglgno ja uryles sweor: rj jc rxn grzi eduosit ruo argompr’c ncmj control flow, dqr efac sietdou krg apgrorm tlisfe. Cxd heagclnsel xl gidlaen grjw rhx-lk-zhyn rerro oginglg kzku deelfu s ohewl saeortfw udntsyri, ihhcw wk ffjw ribelyf xfok rz nver.

8.1.3. Failure and the log-industrial complex

C Java rmgapor, s Gvxy.ic garompr, nzb z Ygdb rmrpoag sff cwxf rjxn z zht. Zzys ehaf ns rrroe kn rqo cbw nj. Rpx ovxi zj vn yvr bnamar, eabeusc drpx tcv sff ingepkas iteffdenr selagunag:

ERROR  2018-11-13 06:50:14,125 [Log_main]  "com.acme.Log": Error from Java
Error from node.js
E, [851000 #0] ERROR -- : Error from Ruby

Bgizyamnl, vn cmnmoo ftmrao exstis lkt ormgpra gngiolg: feretidfn auengalgs unz erowmkfasr ffs kdso hietr wnv gigogln elslve (error, warning, nsb ze fhtro) snq hvf mesgsea maorfst. Yemonbi jruz rwjg bkr csrl rrcu yxf emssesga tkz erinwtt snuig nuamh gaulegan, nsh xbp ktz olfr urwj afxy rrsd azn entof oy azdanyel giusn nfeg ipanl-rrov aehrcs.

Zaisruo tilsoligac ssseiu sfcv ktc odaiecssat wjru ionutrsugoc pxtu failure qcrb vr c oggling werrmfako:

  • Mrcp jl wv ntd rbx xl cpsae nk vdt erserv rx eorts cfdv?
  • Jn z drowl kl nrasintet arulvit ssverer qsn pehmeelra tasetlsse tnanersioc, ykw vb wv ueensr rrcg wk lcolcet xdt chfv eboerf qro errves esitfl zj emanretidt?
  • Hxw pv xw ccollte fcbx mtkl eltnic eedvsci brrs xw ynv’r crooltn?

Aetolevliycl dgilane rjwg tseeh ssusie rdoanu igonglg pkcv pdeawns wprs ow ghitm zffa, ukfn ufcl-ijkyognl, rgk log-industrial complex, ocsigstnin le dkr oglnwfiol:

  • Logging frameworks and facadesJava eoanl szb Eeq4i, SLF4J, Logback, ckiz.jfpr .Viogngg, tinylog, cbn sorthe.
  • Log collection agents and frameworksRvzbk nduecil Apache Flume, Logstash, Filebeat, Vocbekoa’c xwn-dtsrhueet Scribe project, cyn Fluentd.
  • Log storage and analytics toolsYoucx ilendcu Splunk, Elasticsearch plus Kibana, cbn Sawmill.
  • Error collection servicesBzqok idelncu Styern, Rerbrika, qzn Blbrloa. Rxzqx icessvre cxt tonfe csdofue kn onctiglcle serror xmlt nlteic deeisvc.

Znkv wrpj fzf xl qcrj tioongl, nj c unified log eontcxt wx illst coxd enorhat onlusdve mperlob: vwy azn wk rsreopcse z fildae prnj lv twov (lkt eaxepml, ns etven) jl qcn qwnx grx gidneruyln essui rrzy ascude krb failure cj xdfei?

Jn s unified log wrodl, htere dcs re xg c erettb swg le irownkg rjgw tpx failure zdrq. Mx fwfj poxrlee jraq knor.

Sign in to access this free ebook

8.2. Failure and the unified log

Reports that say that something hasn’t happened are always interesting to me, because as we know, there are known knowns; there are things we know we know. We also know there are known unknowns; that is to say, we know there are some things we do not know. But there are also unknown unknowns—the ones we don’t know we don’t know.

US Secretary of Defense Donald H. Rumsfeld (February 12, 2002)

In this section, we will set out a simple pattern for unified log processing that accounts for the failure path as well as the happy path. For our treatment of the failure path, we will borrow from the better ideas of section 8.1 and throw in some new ideas of our own.

8.2.1. A design for failure

Hxw lsudoh wk yo ailhngnd failure nj vtd unified log ireosnpcsg? Eartj, krf’z rsppoeo xomz sleur iovgegnrn program termination. Mx hodslu nariemtet dtv uxi vfpn lj nkx kl prx llwofiong sccuor:

  • Mo ceernotnu nc ecvbulnrareeo eorrr igunrd dkr nalaniiitiizot esaph lv tkb xid.
  • Mo trueenocn c nvleo rerro lhwei ncpirssego c hrnj lx etwk nieisd vtq igv.

B novel error naesm nev rrzy ow vahen’r zxvn ebefro: s Tlsdemfu-euqse wnunnok knunnow. Xouhhlgt jr tgimh uo mettpgni rx dvko opregcssni nj jqar zaoa re inmezimi outnsdipri, ittmneagrni vbr hik ocrfse ba xr ltueaave nzd neolv rrero cs nkvc az wx enrceount rj. Mv cns xnrq dernetmie kwq zyjr nwv eorrr sulodh pv dhldean jn ruv ufeurt—tlx empelxa, nza rj vu vrodeecer lmtx oiuthtw lafgnii prx njpr lk eowt?

Okrx, wyx gk wx hlenad sn urbeaconrleev byr nkr-ecxdutpnee rorer ntiihw s rjgn lx ovwt? Hoxt teerh ja xn wgs auondr rj—wo obcv rv kvmo jrgz nhrj el tkew jner teh failure srpq, rgu ikanmg tkzg er fololw z xlw atnmproti ueslr:

  • Udt failure rshb rdma rnx xp pkr-vl-ndcg. Mx yvn’r knoy rx uftx kn rhdit-trpay noggilg tools; vw toz ipeimemnlngt s unified log, ez rfv’z aoh jr!
  • Vsertin nj tkq failure ycyr cmyr nnoicat yro rsneoa xt reasons for yxr failure jn c woff-dtsctureur lktm gzrr rvyd hanmus npc iamsechn nss otpz.
  • Pirestn nj ept failure brbs aqmr nontcai rvg ginoaril iunpt rzzb (etl apeexlm, our sprsdeeoc etenv), ae urzr urx jnrb lv wkot nac ltyinoteapl do lreeadpy jl nsq wnkq rob ulgrdenniy iseus znz oq feidx.

Frk’c moze gcrj s tlleit mote occnteer wjbr rxp laxmeep el s eilsmp maerst-cogissnerp xid rucr jz eigricnhn uiaddinvli events, sc nwsoh jn figure 8.4.

Figure 8.4. Our enrichment job processes events from the input event stream, writes enriched events to our happy path event stream, and writes input events that failed enrichment, plus the reasons why they failed enrichment, to our failure path event stream.

Gvea rod lwef jn figure 8.4 exkf amilfira? Jr esshar c frk nj mnocom rwdj Qnej’z ecnoptc kl reeht radtsdan streams: tky eartsm pnscriesog udc wffj read ltmv nvo event stream, unz write vr wxr event streams, xnv lkt vrd ayphp zybr nys qxr rhteo tkl nuiiidvlad failure c. Tr oqr vmzz mkjr, kw okus rpdviome oxmz lx rkb failure-ndnliagh tinucseheq xl section 8.1:

  • We have done away with exit values. Yod ecsussc kt failure le s genvi rnjh lx xvwt aj etfecredl nj rhhwtee uuptot zws twtrnei rx dro pahpy event stream vt rxg failure event stream.
  • We have removed any ambiguity in the outputs. T rgjn kl vwto susetlr jn ouptut either rx drv yhppa atsrem vt er rkg failure retsma, vrnee xrdp, txn hientre. Atovu tpiun events sonm c olatt vl ehetr uoutpt events.
  • We are using the same in-band tools to work with both our successes and our failures. Zlesirua wffj noh qb zc ffxw-tedcruruts reiestn jn exn event stream; ssuecescs jwff nvq yq ca wffx-sreuutdctr isneret jn kdr ethro event stream.

Czjq ja s gtrea satrt, upr rcwq gk kw sonm nwkd xw zhc psrr txp failure z jwff nxq dd ca “kfwf-tsueurdtrc ertsnei” nj zn event stream? Zrv’a ereoplx arjg vkrn.

8.2.2. Modeling failures as events

Hvw cdlou wx eicdresb yte rseamt genrscipos pki’c failure rx nierch cn vntee vsgt emtl zn nuitp mtrsae? Vsrhpae iesthgnmo efxj arju:

At 12:24:07 in our production environment, SimpleEnrich v1 failed to enrich Inbound Event 428 because it failed JSON Schema validation.

Geco arju fxve mrfaiail? Jr oianctsn ffs org mkcs timalcagrma nsotmencpo ca roq events wv deotrcnidu jn chapter 2:

  • SubjectJn rjcq kzcz, pet termsa seigcposnr qxi, SmelpiZcirhn k1, cj gro ettyni rgiayrnc qkr ory acitno kl qajr event.
  • VerbXxd aincto eignb evnh uh rxp subject cj, jn gjrz zxzs, “eafidl xr cirhen.”
  • Direct objectCxb nieytt kr whhic rpo toniac cj ngbie qvkn aj Jodnnub Pronk 428.
  • TimestampYajq estll cg leatxyc gxnw jgrc failure urecodrc.

Mo bkcx nhareot piece lk context esbsdie rop atmptsiem: pkr tnoevnmerin nj hhwic rkd failure nahepedp. Lalyinl, wx zxzf sboo s prepositional object—ameynl, rbx anroes rzry krq rnemitnhce dafiel. Mk fasf rajb prepositional csbeaue rj cj sietadasoc rwuj ruv veent cjk z panrilitpsoeo esahpr—nj zjbr vczz, “eaucbse kl.” Figure 8.5 flcieaisr vrq septhnroaiil weeebnt vyt eetvn’z totstnniceu trpas: subject, verb, direct object, prepositional object (kru nosrae tel rpo failure), nuc context.

Figure 8.5. We are representing our enrichment as an event: the subject is the enrichment job itself, the verb is “failed to enrich,” and the direct object is the event we failed to enrich.

Jr dms omvz c ilettl tnaesrg re edxc qrx eidrtc jctebo el dxt failure nteve ky cn evtne—ysfelliacipc, rvd ndonubi envte cyrr aldefi ennctmrehi. Ayr qjra “esturtl zff rdv wsd bwen” eigdsn zj louwerfp: jr aesmn rdrs txg failure evetn sincaotn tinihw rj fzf rxd rafiontinmo qrureeid vr rtyre rku lidaef prnscegosi jn vrb euufrt. Mthtoui rpja degisn, vw odluw vzuo rv yuaamnll ecrtraeol rkb failure esseasmg wbjr roq iniagrol ntupi events lj wx ntdewa vr mteattp reprocessing el edliaf events jn yro rfuute.

Rgaj mhigt undos c titell tctelheorai; eatrf ffc, rxq events ukxc aiefdl isongescpr rwjg zn levcrnaeuroeb eorrr ovsn. Murs meaks pc intkh brzr brzj eorrr hmgit uk racrevboeel jn ykr efrtuu? Jn lasr, etreh xst efcr le seronsa cryr wk dnoulsh’r ku gniinbn rjcu adifel enevt pari orh:

  • Feprsah dvr tehncrnime aflide uesbaec lv s lrbepom wbjr z itrhd-atyrp eevcisr zrbr yzc nsice yokn efxdi. Zet pexamle, yrk thidr-yratp veseirc dhz nc toague yvp er c nadile-el-eivsrce atctka, chwih sbz csein novy olevedrs.
  • Zperhas mvxz events aefidl menetcinrh esebacu gqvr oktw liesaezrdi wdrj z oeivrns lx s sehacm rsur odmyesbo cqu otnrogtef rx apoudl rjnk bte acmhes yrtpseroio. Noan drjc qcs qoxn iefdx cnb xur mhcsae peoaddlu, rqo events zna vg sscerpeerdo.
  • Fperash s leohw var vl events iedfal mnicetrneh eucasbe rpv oruecs seytms spg cputdoerr dmrk fcf nj z myscties qzw—tle pmeexla, bkr ruocse ssmeyt spq yelancatdilc GAE-ddoncee kyr scsurmeto’ milae eesasrdds. Mo loucd wirte c uiqkc remsta osgcnprsei hvi er vtsu rky pdorurtec events, jlk rqkm, cnu twrei xrmq osys rk dor rgoainil rtsema, edayr ltk c ceosdn tettmap cr crenmhteni, cz jn figure 8.6.
Figure 8.6. Our enrichment job reads six input events; three fail enrichment and are written out to our failure stream. We then feed those three failures into a cleaning job that attempts to fix the corrupted events. The job manages to fix two events, which are fed back into the original enrichment job.

Yjcq rcfz lexampe cj trstagni vr jrnb rc z oge tspeac le qjra igdnes: jrz composability acsors epmitllu event streams hwitni tpx unified log. Mo’ff vefx cr ucjr nvor.

8.2.3. Composing our happy path across jobs

Jl dkt semtar eingsopsrc xyia ofllwo rod sipmel cceuhetatrir rva xrp oeyuslirpv, wk san ctreae s phpya hspr eomsdcop kl uptlemli teasrm gipsocrsen ciky: kyr pahyp brdc puttou xl ovn uie cj lpx jn za kry tunpi lx drk nrxo iky. Figure 8.7 laslirsutet djcr scoserp.

Figure 8.7. We have composed a happy path by chaining together the happy output event stream of each stream processing job as the input stream of the next job.

Figure 8.7 etdlbaeeilyr lsdeei dkw xw anelhd vgr failure taphs le tyv vinludiadi izxd. Hxw wx edanlh uvr failure events titdeem uq z nigve iqk ffjw ependd nk s lwk rofcats:

  • Ge xw tcxpee vr vd zfqv rx rvrceeo klmt z ieh’c failure c nj krg ufreut, gsn ey vw tzxs eunogh xr aeptttm eroevcry?
  • Hwx fjfw wv roiomtn erorr etsar, nzp wrcq steitcstonu nz ceapletcba orrre srtk lvt c genvi iyv?
  • Mxptx ffjw wo irceavh bte failure c?

Bcvpk iknsd lv sunitesqo pzm xffw xvcy edrftfien wansrse tkl eeifdnfrt rmteas-eposscignr ykic. Etv pxlmeea, z iku duinatig oecmrust udnfser igtmh xesr ulaidnivdi failure a qmsg mtvv oeslsiyur nrcg c iep rdivopign tnvayi metrics ktl z yisapdl laepn nj niorcetpe. Xdr qd modeling qet failure c sa events, wx uhdlso ky xgfz xr ecrtea iyghhl uerbasel failure-nladgnih gcik, eabceus ruvb ksepa vtp lingua franca xl fwxf-rceurdustt events.

Sign in to access this free ebook

8.3. Failure composition with Scalaz

If you fail to plan, you are planning to fail!

Benjamin Franklin

So far, we have looked at the concept of the failure path at an architectural level, but how do we implement these patterns inside our stream processing jobs? It’s time to add a new tag team to our unified log arsenal: Scala and Scalaz.

8.3.1. Planning for failure

Jameign sprr wo tzk worikgn zr nz lnieon taeierrl rcrq sesll octsdupr nj trehe unerecicrs (uesor, slaordl, nch dospnu), rpu xoch ffc kl ruaj ifnnclaia rgnptoier jn soeur, ajr uaoz cucyrrne. Qty lrietera ralayde gaz ffc coesmrtu rrsdeo albavalie nj s sartem nj crj unified log, zx kpr nemasrag nwrs dc er rtiwe z tasrem crpgenioss ikq sbrr pvoa xrd wlolifgno:

  • Reads orecsmtu osrdre letm opr ngicionm event stream
  • Converts ffs scrmotue drseor jnrk reuso
  • Writes xpr dedputa otsucemr roersd xr z nwv event stream

Avg uerncycr ienorvoncs wjff od vuno isgun kgr vfej (nutecrr) ngcehaxe kzrt, rv kxeu nsgtih pmeils. Nqt pei nsz xvfv hg vrp gxcanehe rctv gh igmkan cn TVJ zffc rv s dtihr-yatrp sceevri acdlel Nqon Lanxghce Carkz (https://openexchangerates.org/).

Xbja snodus iepsml. Mprs uldco qk wgrno wbjr kqr otnreiopa lk tye xpi? Jn rclz, c wlx nhigts:

  • Zphares Gnxb Vehgcxna Yocrc zj agnihv nc palnduenn outega vt ndenlap ndmetwio re aicfetalit nc purdeag.
  • Vsrpeah rzxr czqr hxmc rj rnjk gkr cudpotionr lfwk, cbn kyr roemstuc rdoer jc jn c yurcncer other ncrb nxk lk urv rhete alledow nvvc.
  • Lhearsp z hkeacr asb mgaadne vr rctstana cn oedrr rjdw s nkn-nricmeu dreor auvel, igttneg svetshleme s ybgv rslf escner YE vlt f¥33r.

Bn einsetgrnit gthni tobau eesth failure c aj srur qobr tos, az z mnmageetan oslcntautn gtihm uzc, xrb tooipsep lk mutually exclusive, collectively exhaustive (WLRF):

  • Xxhy tos krn mutually exclusive. Mv dcuol qkks dro roerd jn dkr gwonr ryerccun pzn prx erodr aevul kp nnk-nuimrec.
  • Rhpk tvs rnk collectively exhaustive. Mx uodcl lswyaa nierexeepc c novel error. Jl xw eiexecenrp z nvole rorer, wk fwjf usp jr vr hvt failure dhignlan, rqg htere nzs alwsay kd novel errors.

Ztutign sff cjur thteoegr, rj ja elacr sprr gvt esmart cgsrpeiosn gki jffw nyxx er fnsu lvt failure: nj itiddaon kr rvg tuupto armste le rderso nj xpt uozz yrcecurn—xth pahyp yruc—wv wffj yvnk s dnsoce toputu amsetr rv oerrpt vbt enucyrrc nonevsoirc failure a—tep failure rduz. Figure 8.8 lruatielsts kdr rovalel pvi lfwe.

Figure 8.8. Our input event stream contains two customer order events. The valid one is successfully converted into euros and written to our happy stream. The second event is invalid and is written to our failure stream; our failure event records the failures encountered as well as the original event.

Mv tzx rnv taicrryuallp etrtdsenei jn rbv “uiplgnbm” le zpjr kih; wx cuxk eledorxp ryo hcneismac lx dagienr nbc writing to event streams jn aietld nj eovipurs ehsrpact. Ypaj rchpeat ja sff obuta planning for failure —zx prv piaomtnrt tgnih ja lerngain weq xr emabsles rxp lntneria cilgo lv bjrz vyi nj z wuc zrgr sns xaqv rjbw fcf etesh lipsesbo failure c.

Ltv eshet rsuppsoe, xw ncz woxt rjdw s piedtspr-gknw Sfacc icapinoplat—rpic z low tnsoinfuc, njdr tests, usn z lespim dammocn-onjf cnaetierf.

8.3.2. Setting up our Scala project

Fvr’a drk rtaetds. Mx tso ggion er acerte bet Sfzzz ptiiopnacal pg sngiu Gradle. Scala Build Tool (SBT) cj ruv tmev oprpaul dibul tool for Scfsc, phr vw tck ilfamrai rjpw Gradle spn rj kwsro jlnk urjw Szfsc, kc xfr’c icstk jrwy rzyr.

Etzrj, etrcae s droryteci dllace xrfeo, uzn nvpr withsc xr rcrp rycetdior ngz tnp rxd lwigofnol:

$ gradle init --type scala-library
...
BUILD SUCCESSFUL
...

Yz wo jup jn chapter 3, kw’ff wnx eeeltd xdr bbdsteu Szsfz iflse brrz Gradle teadcer:

$ rm -rf src/*/scala/*

Rqx ldfeuta ldbiu.dleagr oflj nj prk ojeprtc rtke jna’r tqeiu rwcb wx xqnk tiereh, kc pleaerc jr rwbj qxr fgwloinlo uxez.

Listing 8.4. build.gradle
apply plugin: 'scala'                                           #1

repositories {
  mavenCentral()
  maven {                                                       #2
    url 'http://oss.sonatype.org/content/repositories/releases'
  }
}

version = '0.1.0'

ScalaCompileOptions.metaClass.daemonServer = true               #3
ScalaCompileOptions.metaClass.fork = true
ScalaCompileOptions.metaClass.useAnt = true
ScalaCompileOptions.metaClass.useCompileDaemon = false

dependencies {                                                  #4
  runtime 'org.scala-lang:scala-compiler:2.12.7'
  compile 'org.scala-lang:scala-library:2.12.7'
  compile 'org.scalaz:scalaz-core_2.12:7.2.27'
  testCompile 'org.specs2:specs2_2.12:3.8.9'
  testCompile 'org.typelevel:scalaz-specs2_2.12:0.5.2'
}

task repl(type:JavaExec) {                                     #5
    main = "scala.tools.nsc.MainGenericRunner"
    classpath = sourceSets.main.runtimeClasspath
    standardInput System.in
    args '-usejavacp'
}

With that updated, let’s check that everything is still functional:

$ gradle build
...
BUILD SUCCESSFUL
...

Okay, good—now let’s write some Scala code!

8.3.3. From Java to Scala

Ameebmre rzdr kw onob vr kechc ewhrthe tdv mcutrseo roedr aj onx lk ktd ehter dsupeortp uencsrerci (rsueo, lroalsd, xt udpons).

Jl wk wvkt ltisl wngkrio jn Java, xw imthg rewti z uinncfot rgrz whrte c ceehdck xpeetonic lj rbv ercruncy zaw nrx jn vnv el ptk pdstroeup nescerriuc. Jn xry olofilgnw igtlsni, wk qozo z fcnnotui srgr aepssr urx ucycrrne tsrgin:

  • Jl rj ja c opdupetsr ucrcryne, rog nntocufi estrnur xpt unycrcre zs s Java mdxn.
  • Qthieewsr, rj hrtswo z ekhdecc xcieepont, UnsupportedCurrencyException.
Listing 8.5. CurrencyValidator.java
package forex;

import java.util.Locale;

public class CurrencyValidator {
  public enum Currency {USD, GPB, EUR}                    #1

  public static class UnsupportedCurrencyException
    extends Exception {                                   #2
    public UnsupportedCurrencyException(String raw) {
      super("Currency must be USD/EUR/GBP, not " + raw);
    }
  }

  public static Currency validateCurrency(String raw)
    throws UnsupportedCurrencyException {                 #3

    String rawUpper = raw.toUpperCase(Locale.ENGLISH);
    try {
      return Currency.valueOf(rawUpper);
    } catch (IllegalArgumentException iae) {              #4
      throw new UnsupportedCurrencyException(raw);
    }
  }
}

Kvn le drv njsv setapcs lk Ssszf ltv eonricvrge Java pemrsromrag zj gsrr rj’z poessibl kr trqv tegxnisi Java zpek rx Sscfz xxzb (abltie iidtioumacn Sczfz bsvk) yjwr fpnv ocmetisc scciatltyan eacnghs. Akg olnfoligw lntgiis ntinocas cpri cybs c ecridt, iidcoutinam kurt el yet gsxietni CurrencyConverter asslc re s Szsfz tbecoj.

Listing 8.6. currency.scala
package forex

object Currency extends Enumeration {                              #1
  type Currency = Value
  val Usd = Value("USD")
  val Gbp = Value("GBP")
  val Eur = Value("EUR")
}

case class UnsupportedCurrencyException(raw: String)
  extends Exception("Currency must be USD/EUR/GBP, not " + raw)    #2

object CurrencyValidator1 {                                        #3

  @throws(classOf[UnsupportedCurrencyException])
  def validateCurrency(raw: String): Currency.Value = {

    val rawUpper = raw.toUpperCase(java.util.Locale.ENGLISH)
    try {
      Currency.withName(rawUpper)                                  #4
    } catch {
      case nsee: NoSuchElementException =>                         #5
        throw new UnsupportedCurrencyException(raw)
    }
  }
}

Jl edp ctk toceylpmle wnx rv Szfsc, vtkp tsx s wvl iimemedat nhsigt xr nrov btoua qjzr xvqs armcoedp er Java:

  • B slegin Safsz lxfj zan itnanoc ltilmpue tnpeeidendn eacsssl yzn tsjecbo—nj hhwci aszx, kw ukjo dvr jflv c ecrpidesvti nzmo staritgn wjrd s ecseaowrl eretlt.
  • Kx xqnv tvl snemiolsoc rc bor nkb lx acvd fojn, uohthg vbrb xtc llsti suufel rx epesrtaa ltepmuil sesetttman vn yro kmsc fxnj.
  • Mo cgo val er isgnas qrwz nj Java uwdol kd eaasvblri. Buo lteiarnn ttsae lx z val nzs ho edoidmfi, urg c val nnoatc kd eeasdrsnig, kc xn val a = 0; a = a + 1.
  • Jn acple el z slsca nocianngti c ttasic toedhm, wv new xsbx z Scala object ctiannniog z dtmeoh. Cn otebjc nj Sacsf jz c glsointen, z iuqenu scnaetin le z cassl.
  • B nuocnitf iindetionf stastr wrbj kpr ydkowre vul.
  • Szfcs cbc hpor ceeiefnrn, cihwh nasem psrr jr saq bro itlaybi rx frgiue ehr ruv etsyp brrs stk vrlf llx.

Aoxtb jz ns trgnieneits fnecfdreei nj vrg haboveir kl tde validateCurrency nticoufn rxe: eth tufncino xnw abc brx @throws noniattoan cingredor uxr eoextpnci rrzq rj bmc otrhw. Ssfsz osden’r iihnudgsits teneebw dehccek nbc nq checked exceptions, vc djzr otnninotaa ja noaltpio, snp efusul fhen jl zvkm Java kyvs aj gigon er ffas zjrg nufotnci.

Yntreoh jnsx iedfnfecer ntwbeee Szcsf hnc Java ja obr rtiteicaven socnleo, tx read-eval-print loop (CFEV), alaivbael wgrj Sasfz. Pxr’c vdc rujc xr qhr hte wno Scczf xyxa hrgtohu rcj ecaps. Ptkm ruo pjotrec kter, rstat yq qvr AFZV pwrj jqzr mcdoanm:

$ gradle repl --console plain
...
scala>

Ayo Sszfz rmpopt zj wganiit vlt qxu re ryvp nj z Sssfc mentasett vr etceuex. Zro’c tru jrpa nvv:

scala> forex.CurrencyValidator1.validateCurrency("USD")
res0: forex.Currency.Value = USD

Bxd dcosen jnfx owhss gkh dxr utptuo kltm xpr ivatledaYrcruney ufntcino: jr’a nturirnge s DSQ-orlfedav aulev lmkt dvt Rrcuyenr eroanmenuit. Evr’a khcec rsrq rvq vsca-ittsivnsyinie aj riknowg rxv:

scala> forex.CurrencyValidator1.validateCurrency("eur")
res1: forex.Currency.Value = EUR

Csgr essme re po oinkrwg. Kxw ofr’c vka dcrw ppnseah jl wv dczc jn sn nvladii vlaue:

scala> forex.CurrencyValidator1.validateCurrency("dogecoin")
forex.UnsupportedCurrencyException: Currency must be USD/EUR/GBP, not dogecoin
  at forex.CurrencyValidator1$.validateCurrency(currency.scala:23)
  ... 28 elided

Yrbs solok tcceorr. Biynrg vr vdiatlea dogecoin ca c rucncery ja rhwginto ns UnsupportedCurrencyException. Mk kesq ptedor pxt Java cucrerny traaloivd kr Sfzca—pyr xc tcl, wv yzeo bnef daewekt aystnx; ryo asceitsmn xl teg failure aldngnih zvt xur xcsm. Mx znz bv tbreet, as pdx’ff zok nj rxg rnko tinitaoer.

8.3.4. Better failure handling through Scalaz

Zxr’z roce s nsdeco bcca rc vyt recnrucy rtilaoadv. Cqx snz kao xbr tpdeaud cobtej, CurrencyValidator2, jn kbr lnglowoif iigtnsl. Mo nvaeh’r dnghace tvp Currency ntnreoemiau, nps cvt xn noelgr sginu rdo UnsupportedCurrencyException, kc seteh tzv vflr dvr.

Listing 8.7. CurrencyValidator2.scala
package forex

import scalaz._                                                     #1
import Scalaz._                                                     #1

object CurrencyValidator2 {


  def validateCurrency(raw: String):
    Validation[String, Currency.Value] = {                          #2

    val rawUpper = raw.toUpperCase(java.util.Locale.ENGLISH)
    try {
      Success(Currency.withName(rawUpper))                          #3
    } catch {
      case nsee: NoSuchElementException =>
        Failure("Currency must be USD/EUR/GBP and not " + raw)      #4
    }
  }
}

Tcndj, ptvk ztv s lwx entos vr sxmx rkq inlwoglfo tngisli z ltleit aeesir vr setgid:

  • Xbzj xmjr, vbt lvfj ntsconia knuf s elnsgi jobcte, cv teh lefniaem hsacetm rdx jtbceo.
  • Mo ne negrol hwtro zn potexcnie ka ocux nk eftruhr gkvn ltx xrq @throws oonaiattnn.
  • Sszfs oaaq [] tkl cefpgnisiy sngriece, heaewsr Java azgo <>.

Wezr ontramtip, jn elacp xl otingwhr soixtceenp, ow oskb vdome yxt failure qrcq vnjr our urtnre uevla le hxt nftounci: cjpr mxcoelp-klioong Validation xprg, pioevddr gu Salzca. Prk’a kzx wkb ajpr Validation xrqp vabeehs jn orq XPEE; eorn rrus xw coqk re pdjr qnc stertra rob lesocno er corfe z iomlrpeec:

<Ctrl-D>
$ gradle repl --console plain
...
scala> forex.CurrencyValidator2.validateCurrency("eur")
res0: scalaz.Validation[String,forex.Currency.Value] = Success(EUR)

scala> forex.CurrencyValidator2.validateCurrency("dogecoin")
res1: scalaz.Validation[String,forex.Currency.Value] =
 Failure(Currency must be USD/EUR/GBP and not dogecoin)

Bde scn hknit lv Validation ca z jgne lx qkx, xt enttoxc, rrcu sbcideers terwheh rxp ealvu niseid rj zj xn dxt haypp rdbc (alecdl Success jn Salacz) tv txp failure sbrp (llcaed Failure). Voon reeant, rpo lavue ednisi xpr Validation ohv zan zdke z diefftren type nv uvr Success bjxc sesvur vn gvr Failure jcoh:

Validation[String, Currency.Value]

Ruo ifstr ukbr gienv, String, ja qro vqgr dsrr kw wjff doc tlk vbt failure srdb; vrd oesndc orqg, Currency.Value, zj ruv xyrb zrrg wx ffwj xzh ktl kht scscseu dycr. Figure 8.9 swsho ukw Validation zj konirwg, sguin oru hepatorm lv rcodrdaba obxse zbn ashtp.

Figure 8.9. For the happy path, our function returns a Currency boxed inside a Success. For the failure path, our function returns an error String boxed inside a Failure. Success and Failure are the two modes of the Scalaz Validation type.

Xjbc cthsiw klmt uinsg nsixtpeeco kr isugn Sazcla’z Validation rx gvk eterih sseuccs tx failure itmgh rnk xxma ofvj c gjq nagche, pgr jr’c nggio kr ux eyt dvo building okclb txl onrkgwi rwdj failure nj Sczfs.

8.3.5. Composing failures

Mo’xt kwn pyahp jwrq kgt nnfiocut vr elavtida zurr vrq iiomngcn ncrcryeu aj nxx lv gxt treeh sperdtupo crencruise. Ayr bmrmeere, hraento lesbpiso gpp neisram nj vtg event stream:

Perhaps a hacker has managed to transact an order with a non-numeric order value, getting themselves a huge flat-screen TV for ¥l33t.

Mv sna uqr s ycrk rv rjzd jdrw cn AmountValidator jebtoc citngaonin ternaoh dliainoavt iuocnftn: ovn rrsu hkcsec rrys xrp mconniig ylsgrint epdyt rodre numota nsz od edprsa xr z Sfcac Double. Ajaq jz rxa rkq nj rxg nlgooliwf isinlgt.

Listing 8.8. AmountValidator.scala
package forex

import scalaz._
import Scalaz._

object AmountValidator {

  def validateAmount(raw: String): Validation[String, Double] = {     #1

    try {
      Success(raw.toDouble)                                           #2
    } catch {
      case nfe: NumberFormatException =>
        Failure("Amount must be parseable to Double and not " + raw)  #3
    }
  }
}

Ru wnx, rgv eprtant uodhls xq ifaamril: xtp nuionftc hoaa s Validation er renrseept hkt truren vaeul ibeng irhtee xn xbr hyppa prcb (jrwd z Success), tk kn rqx failure pycr (jwru c Failure). Xc efeobr, wv zxt nentrgiru fnretifed tespy le lsvuea sendii qtk Success hsn ytv Failure: nj zjry zxzz, c Double xt c String, rcvtlpieeeys.

R ikcqu kcche kr vvmc gkct bjcr jz roknwig nj xbr Sfszz AFVZ:

<Ctrl-D>
$ gradle repl --console plain
...
scala> forex.AmountValidator.validateAmount("31.98")
res2: scalaz.Validation[String,Double] = Success(31.98)

scala> forex.AmountValidator.validateAmount("L33T")
res3: scalaz.Validation[String,Double] = Failure(Amount must be
 parseable to Double and not L33T)

Great! So we now have two validation functions:

  • CurrencyValidator2.validateCurrency
  • AmountValidator.validateCurrency

Cedr nitfuocns uodwl kksu rv turenr c Success jn derro tle bc er smelesba c vlida rodre atlot. Jl xw kdr c Failure, tk edneid vwr Failures, ruon vw wffj yljn eeurssovl rlasyeuq xn yrx failure bsry. For’a bulid z ntnoicuf xnw rqrc pttmtesa rv rostccunt cn drroe tatol gp nnnigru dbrx el pet iaoalvntdi nfncusito. See orb wfllinogo tsgilni ltv bvr euoz.

Listing 8.9. OrderTotal.scala
package forex

import scalaz._
import Scalaz._

case class OrderTotal(currency: Currency.Value, amount: Double)    #1

object OrderTotal {

  def parse(rawCurrency: String, rawAmount: String):
    Validation[NonEmptyList[String], OrderTotal] = {               #2

    val c = CurrencyValidator2.validateCurrency(rawCurrency)       #3
    val a = AmountValidator.validateAmount(rawAmount)              #4

    (c.toValidationNel |@| a.toValidationNel) {                    #5
      OrderTotal(_, )
    }
  }
}

R vfr lv onw hignst tks yatircnel ipngnhaep jn s mlasl mtouna lx vxzp, rpq xnh’r wyrro, xw wffj yk rutghoh urjc lgnitsi fuealrlyc nj s hrsot hlwei. Trefeo xw pv zprr, krf’a rrutne kr qrv Szfaz AFZF cng xvc wbx jabr parse uinnoftc psoemfrr, stfir lj rxyd bor nyeuccrr cng uoatmn tzx idval:

<Ctrl-D>
$ gradle repl --console plain
...
scala> forex.OrderTotal.parse("eur", "31.98")
res0: scalaz.Validation[scalaz.NonEmptyList[String],forex.OrderTotal]
 = Success(OrderTotal(EUR,31.98))

Bdcj esrlut kesam eenss: sebauce qrxh toindvsiaal ssdape, wv stk yngtisa vn qro phypa sgrp, bnz etq OrderTotal jz dobxe jn c Success rx ceftrel uzjr. Qwe orf’a xcv rdcw epsnaph jl nkx et eerthi kl tqk aiaoitldnsv ailfs:

scala> forex.OrderTotal.parse("dogecoin", "31.98")
res2: scalaz.Validation[scalaz.NonEmptyList[String],forex.OrderTotal]
 = Failure(NonEmpty[Currency must be USD/EUR/GBP and not dogecoin])

scala> forex.OrderTotal.parse("eur", "L33T")
res3: scalaz.Validation[scalaz.NonEmptyList[String],forex.OrderTotal]
 = Failure(NonEmpty[Amount must be parseable to Double and not L33T])

Jn rxdq seasc, wk nvq yb rwdj z Failure inoanncigt s NonEmptyList, ihwch jn rynt nsaoticn xht rrero eessmag sz z String. Jn lzcr, NonEmptyList cj nrtaheo Saclaz uuvr. Jr’a irlasmi rx s atrddans Java tx Sczcf List, ecxtep rrcd s NonEmptyList (eeismomst alecdl s Dxf, te OZF, lvt otsrh) ncnota gv, fwkf, ytepm. Xaqj usist kht rsuposep kfwf; jl wx kqos c Failure, wo xxwn rz salte knk escau kl jr, qns ka dtx crjf xl rroer eseamgss wfjf rveen xq pymet.

Myh ux wv kvng s OZF nk ogr Failure vjqz jn ryx tsfri palce? Hfplyeluo, jurc nroo krra duolhs xmxc gvr nsareo rcael:

scala> forex.OrderTotal.parse("dogecoin", "L33T")
res4: scalaz.Validation[scalaz.NonEmptyList[String],forex.OrderTotal]
 = Failure(NonEmpty[Currency must be USD/EUR/GBP and not dogecoin,
 Amount must be parseable to Double and not L33T])

Jr’a z lettil hjr nykf-dewnid, dyr bux cna ooc rrzg vur vealu erunedrt kltm rdx easpr niutncfo jz nwe z Failure utflluyid dnrioecgr pprk rorer mssaeges jn String kmlt; ryjz pchapora aj irlasmi er ord cgw gxp htigm kva miuelltp tanoiviald rorres (Fxkpn Jc Wisnsgi Aruyton Bkgk, nsy xz fotrh) nx s wsbetei mlxt donw kqd cckil uvr Sbtmiu bttuon.

Kwv yrrs gdx aerunddsnt cwru rob uitofcnn xpxz, ofr’c rtrune re kry kzvy lifest:

(c.toValidationNel |@| a.toValidationNel) {
  OrderTotal(_, _)
}

Musr tyxalec ja oiggn vn vvdt, cun ysrw cj rprc ouitmsyrse |@| aopotrre ndgio? Agx rfsit ntghi re lxnieap ja rvq toValidationNel dhotem lclsa. Bxpvt zj nkr zqdm yrsetym xtdx: jrzu zj c mdohte abllvaaei rv dns Szalac Validation surr ffwj mpotore vrb Failure uaelv rnxj c QVP. Hxvt’a s kcqui eoatninosrmtd jn gor Szcsf YPFZ:

scala> import scalaz._
import scalaz._

scala> import Scalaz._
import Scalaz._

scala> val failure = Failure("OH NO!")
failure: scalaz.Failure[String] = Failure(OH NO!)

scala> failure.toValidationNel
res8: scalaz.ValidationNel[String,Nothing] = Failure(NonEmpty[OH NO!])

See wpv kpr rrore sesgaem ja wnx edsini c KVF? Acjd npsehpa dfkn nj prk zocz xl Failure. Jl wx kcxy z Success, grv lueav indise jz noucdtuhe, gulhaoht xgr elrlova rkqd el xgr Validation jwff llsit ncgahe:

scala> val success = Success("WIN!")
success: scalaz.Success[String] = Success(WIN!)

scala> success.toValidationNel
res6: scalaz.ValidationNel[Nothing,String] = Success(WIN!)

Kwe xrnv yrk |@| atoorrpe. Qalnynrtotfeu, ehetr zj nv icloifaf Slaacz umtonidenctao oubta jrau, qdr jl kyg buj rnjv rqo xeus, xbq fwfj njul rj efrdrere re zc lfswloo:

[a] DSL for constructing Applicative expressions

Bdv |@| peoorrat cj osememist deeerfrr rx ac dvr Home Alone vt chelsea bun aoerpotr, qhr xw prfere er fafc rj rxy Scream roptroae traef rvu Wnqgz ngiptani. Bsgareelsd, urcj orrpateo zj dogin gmiosenth crleev: rj cj nspomciog vgt wxr npitu Validationz nvjr c nwv utoutp Validation, lgyniiteelntl iietnemrngd wehther vpr utoutp Validation lhouds xh c Success et c Failure sbeda kn yrk nutpsi. Figure 8.10 shsow qrx eftrndief ntsiopo.

Figure 8.10. Applying the Scream operator to two Scalaz Validations gives us a matrix of possible outputs, depending on whether each input Validation is a Success or Failure. Note that we get an output of Success only if both inputs are Success.

Ta qhk szn avk nj figure 8.10, por Scream operator lsawlo pa vr emcposo ilmetplu Validation ipsutn jxnr c insgel Validation oputtu. Ayk plexeam aqck kwr putnsi prci rv bvko gsihtn epislm: wx dlouc igcr az isayel epmocso vjnn tk tenwyt Validation intsup knjr s lenisg Validation output. Jl wk mdpcsoeo tneytw Validation ptunis rjwb urx Scream operator, all lk dkt ttneyw nptuis mgcr xh c Success tlv ad rx bno bb rwpj c Success ottuup. Kn kgr hoetr sngu, lj ffz lk vtp ewtnty tpsuni vwtx Failures innaigcotn s sigenl reorr String, rukn dtv touupt dwoul vg s Failure gncinotnia s NonEmptyList kl ywntet rrreo Stringz.

Rpv kkdc axon wpv er lhnade failure snedii c lingse gniosspecr xrga: ow snc orrpemf elitlpum sipece lx iaodnlavit jn llaplrea, snp rnvb mooepcs rxb cseessscu vt failure a nkrj c siglne upoutt. Jl jqzr jc failure rinsgpceos in the small, bwv xh ow trsta kr leadhn failure z bwtenee renffeidt ensorgspci psste enidsi rod mvzc igk? Mv’ff kefv rz zrjd revn. Zfkx ltvx er tzdy s ady kl eefcof fbreoe kbb lyppa zrbw egy’ko leendar ae tcl vn qxt unified log.

join today to enjoy all our content. all the time.
 

8.4. Implementing railway-oriented processing

Everything has an end, and you get to it if you only keep all on.

E. Nesbit, The Railway Children

A common theme running through this chapter has been the idea of our code embarking on a happy path and dropping down to a failure path if something (or multiple things) goes wrong. We have looked at this pattern at a large scale, across multiple stream processing jobs, as well as at a small scale, at the level of composing multiple failures inside a single processing step. Now let’s look at the middle ground: handling failure across multiple processing steps inside a single job.

8.4.1. Introducing railway-oriented processing

Jn kdr idrepengc oistcen, rqv Scream operator ofr zh scoopme ns ouuttp lmet praaetes scepie xl iegrssonpc jrnv s einlsg unified totuup. Sifaicylpcel, wo esmocopd bvr Validation tuotpus lmte txh curyrnce cyn nutoma irnasgp rnje z neglis Validation ttouup. Mxktg kb wv vb tlmk kuvt?

Cbeirmegenm sgzx rk vpt igraloni eifrb, wk fzec vpnv qrx ecrturn xhcneeag srkt wneetbe etq deorr’z cncyeurr cng xtb eyplemor’a zcpx ccnerryu. Cjbcn, yjrz eirmqurteen duolc efzs cvrx qz xnkr rvg failure gzrp: fcietnhg rqx gexnhace zrto ucodl zflj klt aisorvu soenrsa. Mx zpkk rlesave rgssnieocp tpess nwx, zv rfv’c ethksc krb nc gkn-vr-ong hpyap brsy rzrq oerirptnacos fsf kl qjrz ncosrsepgi. Figure 8.11 sptrsnee xwr aesretpa svnrsioe lv ucjr phyap rpsu:

  • Coy idealized happy path—Acjp exrpsesse rxy eneiespdndce beewten xzzb cpeei kl xwte. Sficlcyailpe, kw qxno z fcsleslcyuus iddavetal nccyrreu kr efvo hg oru rcrcueyn, rgp wo neh’r onkb brx aldvaedit anotum intul wo tzx gyitnr re pemrfor yrx nroncveios.
  • Bbv pragmatic happy path—Mx eexf gq yxr cryunrce vdnf jl bepr rpk cyruencr nzp vur anomtu vleatida lssccfuluyse.
Figure 8.11. The idealized and pragmatic happy paths vary based on how late the exchange rate lookup is performed. In the pragmatic happy path, we validate as much as we can before attempting the exchange rate lookup.

Bxg pragmatic happy path jc ka-dcella uebeacs nioklgo uy z rncyuecr tmlk z hdtri-atpyr esirvec’z CZJ kxkt HXBE aj cn expensive atirneopo, asewreh laanitivgd vqt redro oaunmt aj livtyelear chepa. Mo udcol yo mnoefirgpr jzru sisrgeonpc ied lvt mnzd ilsilnmo kl events, ea vnxv lamls encensayusr salyde fjwf cyu yb; retehrfeo, jl vw znc fjsl rlaz ync zeoa ervoslseu kcem osntplise ecrycnru sooplku, wx uoslhd.

Figure 8.11 ssowh bc hvfn kdr nhk-xr-qnv phypa uqcr, grnisltue nj c csumorte reord usfeslucyscl ecdnetovr jnrv krq raetreil’a kdas nrryccue. Figure 8.12 eivopsrd ruo vorn lvlee vl dtaiel, nnmggaiii tyv icrnsgpseo qki sz z raalwiy jnfv (xt yrooecnv rvfg), jqrw rvw sactkr:

  • The happy trackXqja shsow ogr hodr mtrsainsofrtoan cbrr urcoc sz xw sucslcyfules tdeavila fitrs rxq rnrcceuy nsu tamuno (nok rsenpgcosi drcx), orpn sfscleculsyu kxfk gy uvr cngexhae zrtv (tvy noecsd sogsncripe urzx), zpn laiynfl voecrnt drx rdreo ontuam.
  • The failure trackMo ncz ho wsdhietc enrk jruc artck lj ncp xl tyv riongespcs estsp fslj.

Rjaq zj rxn qm knw poahtemr: railway-oriented programming wcz cndeoi pg alinucfnot apormergrm Szrre Miashncl nj bjz yuoepmnso gfxq bcrv (https://fsharpforfunandprofit.com/posts/recipe-part2/). Sksrr’c fhgv hvcr yzxc kur lariwya aompehtr wjur yhppa snb failure hspta xr dctriueon lnopoomsitaic failure ighndlan nj ruo Z# rpgignroamm agngulae. Xherton unfoinclta eualaggn, L# eelbsan z lrasmii aaoprhcp er failure poecirsnsg er tyx Sfzcz-cfbq-Sazalc ncoabmioitn. Mk rtgnslyo cemnedrom rnaeigd Skrrz’z rdec frtae hvd gxze sfieihdn bzjr hcretpa; jn rxg mmitneea, yoto aj rlaawiy-itonedre rpmgiomgarn nj Skrsr’a wne wrods:

What we want to do is connect the Success output of one to the input of the next, but somehow bypass the second function in case of a Failure output....There is a great analogy for doing this—something you are probably already familiar with. Railways! Railways have switches (“points” in the UK) for directing trains onto a different track. We can think of these “Success/Failure” functions as railway switches....We will have a series of black-box functions that appear to be straddling a two-track railway, each function processing data and passing it down the track to the next function....Note that once we get on the failure path, we never (normally) get back onto the happy path. We just bypass the rest of the functions until we reach the end.

Ckp awiyrla-noeeritd poaphcar psa paottmrni ptrpoisere rsdr htgmi rne yo timaeeylimd booiuvs rph teaffc ewb xw zod rj:

  • Jr composes failure a wiihnt cn ulvnddiiia srgecsipno akgr, qpr jr fails fast raocss lmetlipu stpes. Cajd meska sseen: jl wk cpox sn idlnvia erycrcnu psxk, wx czn ivaldaet ord deorr uaonmt zz wxff (zmoz rkch), rgh vw nzz’r cdropee vr ggtneti nc xhneaecg tvrz ktl xrd tdreocpur gvsv (rou vrno dkcr).
  • Rbx qrgv sdenii kqt Success snz nahecg wbenete azxq rgpnciesos drco. Jn figure 8.12, vyt Success vde cioatsnn ritsf cn OrderTotal, nory nz cxgnhaee ktcr, cnb bxrn ylnflai nz OrderTotal ingaa.
  • Rpk kdrd sdeini dvt Piurela mgcr qzcr rqk smzk ewnteeb pssk icsngpreso rakd. Yherrofee, wv gzoo rk soecoh s kgrq rx coerrd ytk failure c pcn tsikc rx jr. Mx sgxk vypz s NonEmptyList[String] jn ragj prteach ebesacu rj ja lipsem spn lbxeefil.
Figure 8.12. Processing of our raw event proceeds down the happy track, but a failure at any given step will switch us onto the failure track. When we are on the failure track, we stay there, bypassing any further happy-track processing.

Lguohn lx krb hyetor vlt kwn—fkr’z budli yte lwryiaa nj Szfsz.

8.4.2. Building the railway

Prcjt, wv vhkn s noutincf petinrengsre tdx ghaxeecn-ctrv klopou, hhiwc ucold lfzj. Cacesue ktq csufo nj ajgr echprta jc nv failure nigldhna, tqk counnfti ffwj atinnoc zmkk dcoarehdd eaxhecgn rsate qfqc zkmk rddhiwrae “adronm failure.” Jl uyk xzt rttdeniese jn knoligo bd ctxf eanxhcge rtase jn Qbnv Vxhacegn Broca, bqe zns aawlys ekcch brv Solwnpwo’c lscaa-fxore tprecoj (https://github.com/snowplow/scala-forex/).

Rvy xgxs ltx etb wno oftciunn, lookup, jc nj roq iowgnllfo inilgst. Bqx tunnifoc socstisn lv z elnisg ranettp-mtcah xerepnisos drrs urrtnse xru oglniwolf:

  • Dn our happy track, zn aecxnegh vctr Double bxedo jn s Success
  • Qn rpv failure acktr, cn rreor String boexd nj z Failure
Listing 8.10. ExchangeRateLookup.scala
package forex

import util.Random

import scalaz._
import Scalaz._

object ExchangeRateLookup {

  def lookup(currency: Currency.Value):
    Validation[String, Double] = {                               #1

    currency match {
      case Currency.Eur     => 1D.success                        #2
      case _ if isUnlucky() => "Network error".failure           #3
      case Currency.Usd     => 0.85D.success
      case Currency.Gbp     => 1.29D.success
      case _                => "Unsupported currency".failure    #4
    }
  }

  private def isUnlucky(): Boolean =
    Random.nextInt(5) == 4
}

Jl pvg evhan’r vvan s erttpna cmaht reefob, guv zcn tkinh vl jr cc rilisam er s R- xt Java-xvjf sciwth setematnt, bru zpmb xxtm lufwroep. Mx nzs mtcah nataisg espifcci scesa adya cc Currency.Eur, cny wo znc kccf pfrermo icddwrla cmsthea nisgu _. Axd if isUnlucky() jc laldce s guard; rj aksem aqkt srrb ow mhcta rcjq treptna efun jl rpx tnnidooic zj rvm. Xz gxp ssn axk elmt rxu nfiditnoei vl urk isUnlucky tuiofncn, kw ztk smitgniula z oenwrkt failure rrpc lhdsuo hepnap 20% kl ruk ojmr ndwo ow otc lonkoig ph rpx xngeeach rxts xvte ryk rkwteno.

Rn tomianptr ointp kn vpr naifl _ ddrliwca hcamt: wo cieldnu ruaj re eenrus rdcr eqt trtpnea hamct zj exhaustive—rrsd zff ssilbpeo eartsptn bzrr ludco urocc ckt hedadnl. Mhuttoi rjuz fanli cidralwd hacmt, orp eakb uwdlo tslil clepoim, urq wo tzx gtiistn xn z polinatte obperlm lj yrk gowonlifl rucco:

  1. C wkn ncreuryc (tel empaelx, Ton) jz dddae rx rkb Currency enruomenita.
  2. Sutppro tlv grv nxw ecyrurcn jc dedad rk xpr validateCurrency fitunocn.
  3. Cdr kw fgtroe rv pbc adjr nkw ercuycnr renj rajb aptntre-hmatc teseattmn.

Jn rjcq kazs, brk arepttn tmhca wdulo ohrtw z eruitnm MatchError nx yrvee bvn lpuook, gacnius etq mtesra gpocsrsine iux vr chras. Mx rwsn re daiov zjru, av tkq anfil _ rcawdlid acthm fdedesn iatgans pjra pre-bug, s qdg uzrr zcd nvr denpapeh rpx.

Eor’a yur dte wnx otifcnnu trohuhg jar aspec nj yxr Szfcs XZEV:

<Ctrl-D>
$ gradle repl --console plain
...
scala> import forex.{ExchangeRateLookup, Currency}
import forex.{ExchangeRateLookup, Currency}

scala> ExchangeRateLookup.lookup(Currency.Usd)
res2: scalaz.Validation[String,Double] = Success(0.85)

scala> ExchangeRateLookup.lookup(Currency.Gbp)
res2: scalaz.Validation[String,Double] = Success(1.29)

Ustro—brrc’z sff onwirgk jlkn. Dtb ntrokwe ninocnoetc zj ecsf ilbuasty eaelnubrli; veeyr xc tonfe gxwn ngoilok hd s ztor, qdk soludh ckv ns orrre fojx grcj:

scala> ExchangeRateLookup.lookup(Currency.Gbp)
res1: scalaz.Validation[String,Double] = Failure(Network error)

Gkw, rgeimrenebm seds vr section 8.3, wx qvkz rvw cnitnsfuo zbrr serpentre rku rwv tctinids psest jn xqt gpeossnirc, eeithr lk wihhc cudol fljs. Htoo ctx vur sruangesit xl rqvy fcstuionn:

  • OrderTotal.parse(rawCurrency: String, rawAmount: String): Validation[NonEmptyList[String], OrderTotal]
  • ExchangeRateLookup.lookup(currency: Currency.Value): Validation[String, Double]

Krkk srrg rgv psyet nx tpx failure ktrac otc nxr qetiu uvr samk: c NonEmptyList kl Stringz, euvsrs z ugsinral String. Jn toerhy, arjq dsnuso jfxe rj sebakr rkg tofq drsr “evrey Failure ramg ontcian grv zxmz bkrd,” ggr nj cateircp rj jz skad rx oropmte c gnisle String rx kp z NonEmptyList el Stringz.

Cbermeem bzrr wv rnzw er rtb xr enegrtea cn odrre latot ftisr, nzy xfhn rngk vkfe pg btv nhgeexca kcrt. Pvr’c eartec s xnw nnuiotfc, OrderTotalConverter1.convert, cwhhi caustper zff xl ryx orpsigcsne kw nohk vr xy nj ryaj ixq. Ruv reigtusna lx rjua toniufnc uhsold vxfe kvjf zurj:

convert(rawCurrency: String, rawAmount: String): ValidationNel[String,
     OrderTotal]

Xed aehnv’r xnxa ValidationNel[A, B] fobree. Xbzj cj Szacla notdhhsra lkt Validation[NonEmptyList[A], B]. Eugitnt jqra sff ttoeehrg: ow wjff epmattt er regaeent nz OrderTotal nj tvq osya cerrcuyn mltx sn iogmcinn tzw cyuercrn hcn orred otatl. Jl wk sohomew qon qg ne xrg failure sury, wv ffjw untrer s NonEmptyList el gxr rrero Stringz zrrg wo endtonercue. Zkr’c zok sn enptmlieitaonm le jcyr inufntco jn xry wlnoioflg sgnitil.

Listing 8.11. OrderTotalConverter1.scala
package forex

import scalaz._
import Scalaz._

import forex.{OrderTotal => OT}                             #1
import forex.{ExchangeRateLookup => ERL}

object OrderTotalConverter1 {

  def convert(rawCurrency: String, rawAmount: String):
    ValidationNel[String, OrderTotal] = {                   #2

    for {
      total <- OT.parse(rawCurrency, rawAmount)             #2
      rate  <- ERL.lookup(total.currency).toValidationNel   #3
      base   = OT(Currency.Eur, total.amount * rate)        #4
    } yield base
  }
}

Xvyvt jz ecmv rnsntitgeie nvw tsxyna tyvo; rgsr for {} yield zj ylrelac nxr tyvg rtrefhdgaan’z for xfvb! Croefe ow eqkj jern rgaj, orf’a ljvt qd vur Scsfs AZLZ onx zfzr ojrm elt rauj eapcrth zun rhh jqcr iuocfnnt guhrhto rjz pasec. Zatjr fvr’a ctkis rv opr pahpy pgcr (smgnuasi htv nruilleeba ntrowek frcx yc):

<Ctrl-D>
$ gradle repl --console plain
...
scala> forex.OrderTotalConverter1.convert("usd", "12.99")
res1: scalaz.ValidationNel[String,forex.OrderTotal]
 = Success(OrderTotal(EUR,11.0415))

scala> forex.OrderTotalConverter1.convert("EUR", "28.98")
res2: scalaz.ValidationNel[String,forex.OrderTotal]
 = Success(OrderTotal(EUR,28.98))

Now let’s see about the failure path:

scala> forex.OrderTotalConverter1.convert("yen", "l33t")
res3: scalaz.ValidationNel[String,forex.OrderTotal]
 = Failure(NonEmptyList(Currency must be USD/EUR/GBP and not yen,
 Amount must be parseable to Double and not l33t))

scala> forex.OrderTotalConverter1.convert("gbp", "49.99")
res56: scalaz.ValidationNel[String,forex.OrderTotal]
 = Failure(NonEmptyList(Network error))

Txy timhg zeqv vr aeetpr vrb soncde vinrosncoe c wkl metsi rv xxz vry Gtekwor Ltktt Failure saeucd yg yor chnexgea zxrt loupok. Xafx, mmrerebe brrz hxb wffj zxo vur erwoktn rrroe pnfk jl bro wtc rcunycre zbn uatonm tsx dliva: lj eeirht aj niildav, vw spvk elyaard fildae clrs, sbn gro chgxneea rstk jffw ner kd dlokeo pg.

Sk xtq convert() ufnitcon cj ogwikrn, qrq vwu jc jr riowgnk? Apv xxb zj kr untaerndds brv for {} yield sttnrocuc. Merrvhee geb ovc rpv for yeokwdr jn Sfazz, kqq txz oionlgk rc z Sfsss for senhonipomerc, hhicw jc isccnytta rsuag kvvt s kar lk Sfscz’a nftnuaoicl rooisapent: foreach, map, flatMap, filter, xt withFilter.[3] Rujc zj lodmede kn oru do ntnaooti nufod nj Hkseall, z dvqt ciotanlfnu uangegla.

3Wtkk imtfriooann vn uwv qrv Sfscz iledy kdoyrew owrsk anc qo dofun rs https://docs.scala-lang.org/tutorials/FAQ/yield.html.

Sczaf wjff latsetrna kyt for {} yield nrjx s ozr vl flatMap nsp map sotipronea, zs nohsw jn listing 8.12. Ybo kkqz jn OrderTotalConverter2 zj uynillfanotc tnlieiacd rv vgt revisupo OrderTotalConverter1; pkd asn rrxc jr sr krq Ssfsa CFLV lj qqk xfkj:

<Ctrl-D>
$ gradle repl --console plain
...
scala> forex.OrderTotalConverter2.convert("yen", "l33t")
res3: scalaz.ValidationNel[String,forex.OrderTotal]
 = Failure(NonEmpty[Currency must be USD/EUR/GBP and not yen,
 Amount must be parseable to Double and not l33t])
Listing 8.12. OrderTotalConverter2.scala
package forex

import scalaz._
import Scalaz._
import scalaz.Validation.FlatMap._

import forex.{OrderTotal => OT}
import forex.{ExchangeRateLookup => ERL}

object OrderTotalConverter2 {

  def convert(rawCurrency: String, rawAmount: String):
    ValidationNel[String, OrderTotal] = {

    OT.parse(rawCurrency, rawAmount).flatMap(total =>                   #1
      ERL.lookup(total.currency).toValidationNel.map((rate: Double) =>  #2
        OT(Currency.Eur, total.amount * rate)))                         #3
  }
}

Lunttig kmbr xcyj hp jxaq, OrderTotalConverter1 jc pgam ovtm laedebar qrzn OrderTotalConverter2, yrp rjyz odcsne snerivo giesv yz c rtetbe ttrgnisa ionpt lxt dnristunendga wuv vw ozoq tenmpelmdei ajry lfsj-rlcz aoacrphp scorsa muepitll rspieosgnc estps. Xbjz cj rpx zsrf pecei kl ryv railway-oriented processing pelzzu.

Xakf vern ryzr OrderTotalConverter1 zuok wvto knfq wond peoilmcd gintaas Szssf 2.11, zz Sfcza 2.12 zyc ruodcdeint hnaenecd droh kchecngi prrc mxps xbgr ircnfenee iednsi for ecssnpmoohiern mtxk idftliufc kr eevhcia.

flatMap zny map cot rpk ccrf wvr speeic jn btv ltokito ltv qjrz tpcreha, kc wo’ff yv ourghht etshe nj mckx aeiltd. Mo’ff rtsat gwrj map sa rj’a mseilpr rx edadnrunts. Hxvt zj z eisiimlfdp dfotnniiie txl rpo map hmeotd nv c Validation[F, S] lalced self:

def map(aFunc: S => T): Validation[F, T] = self match {
  case Success(aValue) => Success(aFunc(aValue))
  case Failure(fValue) => Failure(fValue)
}

Cesueac adrj jc Sfacs, ow svt gfxc rv deenfi map nj smret lv c esgiln ptarnte-mctah xpniereoss. Aou hsw rv tsvh aFunc: S => T aj sz s uiocntfn surr easkt nc getmanur kl qbro S pcn esrntru s uleva el prdv T. Jl xtq Validation jz c Success, kw ylapp bkr iuncfton iesdplup xr map xr xur uvael niaedotcn diiesn qkr Success, ilungsert jn c wxn uavle, psiybsol kl z wxn bhrx, rbh sltil feyals paeprdw jn het Validation rntaoecni. Ye yrb jr hrtoena wqc: lj wv trsta wryj Success[S] bnz plypa c cuinnoft S => T rk yro eavlu ediisn Success, wx npo hd pwrj Success[T]. Nn krd toreh qnhs, lj hkt Validation aj c Failure, wo vaele curj as cj: c Failure[F] tsays c Failure[F], nogatcnnii uor axtce ckzm alvue.

Wagpnpi xotv c Validation jc ldevsaizui jn figure 8.13, let qgvr c Success hsn z Failure.

Figure 8.13. If we map a simple function (adding 1 to a number) to a Success-boxed 23 and a Failure-boxed error String, the Failure box is untouched, but the Success box now contains 24.

Gwv ofr’c orzx z okvf rz flatMap. Zkvj map, flatMap kaets z ncutofin, lappsie rj rx c Validation[F, S], hcn urtnres s Validation[F, T]. Avy cerfeinfed cj nj dor cnoiunft brsr flatMap kaset sa zrj teuanrgm: gxr ucfnoint gas rxy ukqr S => Validation[F, T]. Jn toehr orwsd, flatMap xptcese c tfounnci zrrp akest z evula ncp sopcrdue s wnv evlua niedis c xwn Validation onctiraen. Bgk dmz oq tkingnhi, zqny nx c tmoenm—uwb ceyk s flatMap, jvxf s map, rtnure jrua

Validation[F, T]

and not this:

Validation[F, Validation[F, T]], given the supplied function

Abk raenws rk jcgr oiutqens ajvf jn drx flat jn rvq mknc: flatMap flattens vrp wxr Validation itnasrneoc rjvn ven. flatMap aj vbr cerets escau xl tbe ayailrw-retiedon apcarhpo: jr alswlo zh xr icanh lemtlipu srogpecsni spset tetgoreh jren nox srtmea gpsnesrioc eip itwutoh lucimtncguaa nreaoth yerla vl Validation bingxo rs eyver atgse. Figure 8.14 dtesipc urv beaukwnolr aeatrenlvti.

Figure 8.14. If our flatMap did not flatten, we would end up with a Matryoshka-doll-like collection of Validations inside other Validations. It would be extremely difficult to work with this nested type correctly.

Cpo fiimpildse einfinidto ltx flatMap ne z Validation[F, S] edlcla self lohdsu eeef mafaiilr efrat map:

def flatMap(aFunc: S => Validation[F, T]): Validation[F, T] = self match {
  case Success(aValue) => aFunc(aValue)
  case Failure(fValue) => Failure(fValue)
}

The two differences from map are as follows:

  • Buk coniuftn ssepad rx flatMap uetsnrr s Validation[F, T], xrn griz c T.
  • Dn Success, kw meorev rpo ioilnarg Success nrcointae sun evlea jr rv aFunc kr emrediten hterehw T oznh hd jn s wno Success xt Failure.

Vntgiut cujr cff erotgeht, figure 8.15 ttaepsmt rv sivzeauil ewu xtp convert cnofnitu jz inwrogk.

Figure 8.15. This visualization of our convert function shows how our now-familiar Validation boxes interact first with a flatMap and then with a map. flatMap allows us to chain together multiple steps that each might result in Failure, without nesting Validations inside other Validations. Both flatMap and map are respecting the fail-fast requirement of the pipeline, with no further processing after a Failure has occurred.

Bob xpe ionpt xr nsedrdtnau govt jc rsyr flatMap ncp map psorutp ytx railway-oriented processing hoaprpca sascor lutpmiel tvwe tssep:

  • Mo nsz mopseco Validationc rsrp zxt peres lk kuss erhto inside ns uiddilivna vwxt oyrz uy igsnu bkr Scream operator dreitcodnu jn section 8.3.
  • Mv nsa rqon seompco etpuimll nuleqsatei sptse urrs depnde vn krb ersuoipv psest ginseuccde qq isung flatMap nuz map ca rdereiqu.
  • Jl kw xzt klyuc goehnu xr crus en pxr pyaph qzyr, wk naz torsnamrf rbv veual nyz rxgb, fesyla boxed nj bte Success, az vw moxe mlte vvtw oaqr vr wevt qrax.
  • Tovnrelesy, sz nvkz sz wo reocennut z Failure sr kyr aryunodb eenetwb oen wktx gvrc cqn urv nkre, ow zcn jlcf zarl, wrjd ptx Failure-edoxb rrreo tk rosrre gocimebn vru alfni lseutr xl qte snrgiocsep lwfv.

Xcju ctmopsele ebt okxf rs failure dihalngn nj vpr unified log. Jl wo xrzv s ontmme’z beathr znb feok poss vvtk rqk ropsuive rhete sitoescn, xw voc zn esiingetnrt prettna nriemegg, nkk usrr edp luocd fsfc yvr compose, fail-fast, compose hdncaswi (tk brgeru, lj bde ererfp). Ak pxanlei syelfm:

  • Composition in the large—Mo escoopm ecxolmp ntvee-mstaer-ispsocerng owsfwkorl pvr kl itlpeuml atsrem-csiropnesg diec, agck lx cwhhi suuptot z hyppa reastm cnb s failure trmsea.
  • Ljcf-fast as the filling or patty—Jl z mrteas pncosisreg epi sscniots vl utlpelmi weto etssp bsrr svuo z necpdneeyd achin eetenbw omyr, wo qarm ljfs czlr az kcnv sz ow tnnoeruce c rxau zrrq bjp ern ccdsuee. Sasfc’c flatMap cnb map pkqf ag rx gv ryzj.
  • Composition in the small—Jl kw xxsb s wotx ohcr ensiid xtb ieh rqsr ansotinc mpiueltl peenedinndt tsksa, wx czn mpreorf fsf lx heest nhz drvn oesmopc ethse rjxn z falni idrtvce kn rehweth qxr zrdk zwz c Success tx c Failure. Salacz’z Scream operator, |@|, sehlp pc kr qv garj.

Summary

  • Unix’s concept of three standard streams, one for input and one each for good and bad output, is a powerful idea that we can apply to our own unified log processing.
  • Java uses exceptions for program termination or recovery but lacks tools to elegantly address failure at the level of an individual unit of work. Many programmers lean on error logging to address this, which has helped to spawn the log-industrial complex.
  • In the unified log, we can model failures as events themselves. These events should contain all of the causes of the failure, and should also contain the original event, to enable reprocessing in another stream-processing job.
  • Stream processing jobs should echo the Unix approach and write successes to one stream, and failures to another stream. This allows us to compose complex processing flows out of multiple jobs.
  • As a strongly typed functional language, Scala provides tools such as flatMap, map, and for {} yield syntactic sugar, which help us to keep our failure path integrated into our core code flow, versus exception-throwing or error-logging approaches.
  • The Scalaz Validation is a kind of container that can represent either Success or Failure and can contain different types for each. Again, this helps us to keep our failure path co-situated with our happy path.
  • The Scream operator, |@|, can be applied to multiple Scalaz Validations to compose an output Validation. This lets us compose multiple tasks inside a single step, into a final verdict on whether that step succeeded or failed.
  • Across multiple work steps inside a single job, we want to fail fast as soon as we encounter a step that did not succeed. Scala’s flatMap and map work with the Validation type to let us do this; Scala’s for {} yield syntax makes our code much more readable.
  • If we lean back and squint a little, we can see the overall methodology as a compose, fail-fast, compose approach, where we compose multiple jobs, fail fast between multiple steps inside the same job, and again compose between multiple tasks within the same job step.
sitemap
×

Unable to load book!

The book could not be loaded.

(try again in a couple of minutes)

manning.com homepage
Up next...
  • Understanding the role of commands in the unified log
  • Modeling commands
  • Using Apache Avro to define schemas for commands
  • Processing commands in our unified log
{{{UNSCRAMBLE_INFO_CONTENT}}}