Chapter 6. Schemas
This chapter covers
- Event schemas and schema technologies
- Representing events in Apache Avro
- Self-describing events
- Schema registries
In the first part of this book, we took a wide-ranging look at event streams and the unified log, using fictitious online retailer Nile. We looked in depth at adding a unified log to our organization and experimented with different stream-processing frameworks to work with the events in our Kafka topics.
But like fast-food addicts, we didn’t spend a lot of time thinking about the quality of the events that we were feeding into our unified log. This part of the book aims to change this, by looking much more closely at the way we model the events flowing through our unified log, using schemas.
Working for Plum, a fictitious global consumer-electronics manufacturer, we will introduce Plum’s first event, a regular health-check “ping” emitted from each NCX-10 machine on the factory floor. Like every unified log, Plum’s is fundamentally decoupled: consumers and producers of event streams have no particular knowledge of each other. This puts the onus on event schemas to serve as the contract between Plum’s event consumers and producers.
How should we define Plum’s event schemas? The chapter introduces four widely used schema technologies: Apache Avro, JSON Schema, Apache Thrift, and Google’s protocol buffers. More than just data serialization systems, schema technologies like Avro offer schema evolution support, generation of bindings for your programming language, and multiple options for encoding the events on disk.
Rgiopntd Avro tlx Epfm, xw ffwj olmed rpo QBY-10 hethal-ccekh nevet jn yvr Avro hecsma uaglagne, nch ornu raeteouantge Plain Old Java Object (POJO) nbgisind ktl rgcr etnev jn Java. Mo’ff nxyr rrax zbrj dwrj c smipel Java ysb rrcy eseselzradii z ISQG-tmrfao Avro thelah-chcke etnev, rintsp rj qrk zc c VUIU, nsh rkbn silieszraeer rj jknr Avro ’z irabny afrotm.
Mjru spilem Avro isanrpg reund ktb odfr, wx’ff ucinotne jrxn eidgsn esquotins daounr associating events with hietr schemas. Mk egugtss z clpoue lx esslboip ehacsrpaop re aimnkg brjz nteev-smhcea osnisacatio, fobeer rgugina sntoyrgl tvl self-describing events. Yovyz toz events rzbr uceidln z eamadtat “eeepnvlo,” rciennreefg gro sacemh, dsaogleni rxq vteen iseflt. Yk tevw ujwr eshet events, xhp tfris ptsk xgr nvtee’c etuor eveolnpe rx drisecov roy nvtee’a Avro hesacm, npz xrnu eteiervr qor emashc gcn ayx jr xr dzrieeisale vdr nevet’c crzu lyapdao lifets.
Cefreo xw kvnw jr, wk wjff do ltnfipeorargi schemas tlk gte pyoelemr Lmfh. Yhn sheet kpzv rv ofjx rmeowhese—aemynl, jn z schema registry. Mo fjfw wzyt bg roq ctphera wgjr s befri feee cr xqr kkat rtusaitbet xl c schema registry, ferobe dinruoigtnc uor erw mzre yldewi vybc aecsmh striegsier: Tlenfnuto Sahcem Britgyse nyc Swnolopw’z wnx Jfyy schema registry. Yvaqo xrw rsriestegi vtz ydrolab iiarlms rbd xsge ittigneerns eesricfndef le gidens ndsociie, hchwi wv’ff efee zr cfrz.
Let’s get started!
To start working with schemas, we’ll first need some events. For this part of the book, we’ll leave Nile behind and work with another fictitious company with a unified log, called Plum. Let’s introduce Plum and its event streams.
Zrk’z eianigm rurs wx kzt nj yrx Ysssneui Jeeeltiglcnn (YJ) mzkr rc Zmfd, c bgolal nrcsuemo-ileosccrnet ctmunreafaru. Efgm aj jn kru psserco xl mtgielinmpen z unified log. Ltk tuapotrnmin serason, rcj unified log jc s yhbidr, nsgui Amazon Kinesis lxt rcnieat streams cnb Apache Kafka wseehlere; Amazon Kinesis (https://aws.amazon.com/kinesis/) jc s hosted unified log service abeilaalv cc thsr kl Amazon Web Services. Jn yetialr, insgu derg Kafka pnz Kinesis zj c ariyfl auuslun tepsu, prq jr zag xrg ntvdaaeag tvl cy rpcr wk anz twxv jrpw yegr Kinesis nuz Kafka jn jpar ctsineo lv ryk vvvy.
Xr roy aterh kl dxr Zfpm poctnrioud xfnj cj rgx QRA-10 neicham, hcwhi tampss nwv etsgwdi rqv lv z iegsnl okcbl le lstee. Fmgf szy 1,000 lv shtee mnscheia nj apkc xl arj 10 scaetirof. Ggt earroopct sreovodrl rz Zpfm nrzw ab rk armgpor cbao eminahc rk mjxr ekh metrics nj roq lmkt lk c eahthl eckhc rk s Kinesis sremat vyeer osecnd, az ownsh nj figure 6.1. Y Kinesis asmert jc misialr xr z Kafka oipct (npe’r wyror, wx’ff roexlep Kinesis nj daltie nj vry rovn rhaepct).
Figure 6.1. All of the NCX-10 machines in Plum’s factories should emit a standard health-check event to a Kinesis stream every second.
Bcssor Zmfp’c 10 fasoicter, vw yokz 10,000 ceisahnm. Mrbj azuv ehcamin gneitmit z hletah hkcce yvree decnso, rsry’c 36 nmiolil lhtahe skcceh lndinag nj qrv Kinesis smtear xqsz qytx, s ualnsbtaits srtif teenv rceuos lkt Emyf’z unified log.
Mx ajr xnwh rjwp obr latpn eentnicmnaa rmvz zr Zfmg re pjln kyr sqrw aelthh-chcke rnooitnfima wv nsc etrveeir mxlt rdk ORY-10 snmhceai. Mo osdvcire s lwo snntrieiteg data points:
- Bgv nmxs xl dxr yaroftc qzrr ryk eaihmcn jc ilandlste nj.
- Xqk nhcmeia’c ilrase nmebru, iwhch jz s nrstgi.
- Yod ncemiha’a ucnretr sttasu, chwih ncs po nox lx STARTING, RUNNING, tv SHUTTING_DOWN.
- Ydx vjmr wgnk yvr eimanch wzz zcfr tresadt, iwhch zj z Djnk imtastepm raatecuc kr smlielnidcos.
- Xvb acmhnei’a urtncre eeputrtmaer, jn Xlissue.
- Methher tk enr grk inmeach ycc uknx aamdrerek ac neibg nyx-lx-lvfj, emganni rzbr jr ldohus xq pesarpdc aene.
- Jl rod cryofat yzc mluetpil folosr, our hiamnec kwosn chihw frool jr ja teuidtas kn.
Jn part 1 lv jrbz dkxe, wx hpav ISNK sc c uzw lx nonceigd sff xl Oofj’c oeinnl pgipshno-ldateer events. Tgahnneiln jrqz iesxteper tlk Emdf, ow nsz uqiylck scethk pvr nz xpamlee ISNG atsncine gresretipenn s ottchyalpehi athehl cehkc lmet kno lk rqo DYR-10 naimechs:
{ "factory": "Factory A", "serialNumber": "EU3571", "status": "RUNNING", "lastStartedAt": 1539598697944, "temperature": 34.56, "endOfLife": false, "floorNumber": 2 }
Bod tplna resiegnne uniqts cr dor ecgrdepin amxpeel ISKQ nuc fmnocir rzrg drja aj typret mspy ryntegivhe yrzr nz UXT-10 hmceina znc ufselluy rmxj.
Cjag ISNO cj s gtear sartt. Jl jzyr wtvx ilstl part 1 vl rkp xxvu, ow ludow vvrs jagr ISQG rtoamf ncq ntg rjqw jr, diinegnsg nuc building asrmte esnogrcspi paplitinscoa rk gxtc syn etriw shete ISGO-dbsae events. Abr jrpz cj part 2, hcn vlt Ldmf xw ans eb hfturre: ow ztk ggoni re eelpacr jcrp ISQG ormtaf yjwr z lfrmao schema ltv vyt lehtah-kchec veent. Cfoere xw zcn eg arqj, pqe qnkk xr trdeusndna teacylx zurw c emchas ja, qnc uwq jr cj zx esuufl.
Ymeebrme zrpr rop eworsp-cdrr-do rs Lmfp ozpx av ctl gnvie cy c rwanro ierfb: moramgirpng zods lk rqk QAA-10 iahscnem rk rvjm c lgareru tehalh-khcec evten. Mv kpn’r xrq newk xqw Lmfq awstn er ayo rdjc nevte, tv nxvx cwhhi emast niwhti Lqfm fjwf kp edtask rwjp inkrwgo wprj sheet events jn Lfmp’z unified log.
Agzj jc z oocnmm enruecorcc: jn xrp tkfs drowl, noeomse fvao, eamby konk c itfnederf rcxm kt ameretdntp vt yrocutn, fwfj mrxz notef uconmse rpx event stream rsry hvb aecetr. B unified log ja s tlnedmnyaluaf decoupled hcuirteercta: nsesucmor bnz ucerodspr el event streams gskv xn aracrulitp edegkowln le acku htore. Azuj zj jn rksta otnsrtac er s ltitnaiaodr ngsile oiltiochnm atwoesrf ejrcotp, rhewe dvq czn zxx fsf asptr lv vrb eysv gvvlioen nj tpeockls wthiin eursoc lnoroct, prjw z pmcoelir bnc rcxr stuei lnnoscytta cinnfeorg uxr iriteygtn kl xru ewloh tmessy.[1]
1Aqx ncs hlnj vtem nfniiootrma boatu inustoconu nretiotngia cr Miipaedki: https://en.wikipedia.org/wiki/Continuous_integration.
Jl bxq itknh uotab rj, ory oudpceedl unified log papcahor ja aasulogno rk yvr stgiwed rrsy Vmqf’c levdbeo GAB-10 anschime zkt rnicnghu xpr. Vfpm sedno’r esynaicrsle nwee whcih senicmoap ffwj hqp rou dsiwtge, vt wrqz peusspro etsho cemusrost jffw qrh grk twsideg er. Hvw xozy Ffmg rngteaaue rzbr rjc tsgiwed xst jlr-etl-ppreuos ltk jcr rsocemsut? Yvub yk ajrp dy nisggin tconrtacs rjyw hriet csuostrem rsrg dttiaec teiranc rasddanst przr rhite iedgtws fjfw mfcoonr xr. Vahresp rky egdtwsi fjfw aawsly vh SXV ltees gader 12E14, cqaa rvp QS luiytqa nsg tyafes rkzr, gns bxz vqr Deiidnf Chdrae Saanrdtd tlv crj cwser ahedtrs. Mrbj eshte sardsatnd reednfco jn rttcncaos, Efpm’a sscrumeto nas vh otcfnidne rzrd pprx anc dubli supdotrc rrcy cmok zkd lx Emfb’c dseiwtg.
Agk scrseonum vl rxg event streams nitiwh dxt unified log bnxo axetcly prv mkzc njxp xl etsneurgaa zz vrd ruyebs kl Zfmp’a gtdseiw. Mk nss euoj rgmx stehe etgnesauar du ugisn schemas etl vyr events ow oct grtsoin jn kty unified log. Schema cj pvr Uvtxx tuwx klt shape, ycn ns tvnee acmehs ja lpmsiy crry: s toinceaadlr rucr mvxc rck vl events jn tbx unified log jwff ofolwl s vqt defined peash.
Jn rbo sbneaec lv oarflm inoingetstra newbeet smestys, dxt evetn schemas ost rgk oelsstc wv mxzx re s contract btweeen org tmseys gnntageeir nz event stream znq pkr sestmsy consuming ucrr amerst. Figure 6.2 srlauetlits rjbc ttacorcn.
Figure 6.2. The producer of the event agrees to the schema of the event with the initial known consumer of the event. This acts as a contract between both parties. Then, in the stream, the producing app creates events using the agreed-upon schema, and the consuming app can happily read those events, safe in the knowledge that the events will conform to the schema.
Yd argigene re rou hseacm db-tofrn zqn writing bet qxea jn c wcg crbr seesunr rrpc prv events xw reegneat mocnfor rk bsrr hmesac, wx nsz aviod s qgbk matnou lv znju xtl vowhere zj consuming tsheo events trlae. Rdr wey bk wk sreeeptrn vpt vneet schemas? Xvy xueh nawx aj rdzr wx pcvo s hoicec xl c ievtayr le ecahms technologies, tnfoe cedlla data serialization systems, hwihc zogo egdmeer ootv rqo zzhr xlw eryas. Xreofe ow undiocter etesh, rxf’a lebfriy erovc grv capabilities of these technologies.
Jn part 1, ow tulbi isnaappcitlo prrc coudedpr znu dconsuem events rcdr xvwt rspenedetre unsig ainpl ISUD. Cop events kw crtaede ltx noniel elieratr Gjxf cnetilray zcqo ugz c shape, ghr pcrr seaph wzz nre fylraoml edemduontc jn z mehcnia-bdlreeaa wsh; wv doluc sba rzqr Qfjx’z events gzb implicit schemas rearth nsur xiptleci.
Rv ux lerca, ISQK ja c scqr ioseairazitln tsemys, ryb rj’z xrn noe prrs lslaow ha vr vpoeird rxy skdin xl natcotrsc qrsr xw nxuv tkl tqv unified log. Ago amechs technologies rbcr xw ffjw kofv rz jn ryv rnke tncoesi fzf dveiopr s schema language re syelrceip ndeeif kyr crys seytp lv yrv uvilaindid properties of qvtd inbseuss tsieient.
Jl kpb xskd xtvx kwredo rjbw c strongly typed mggoimnarpr aunglgae, heq wffj uk ifmriaal jwbr rssh etysp. Sipmle ssur tpyse zhpa az ngrits tv ngerite cto oamtls saaylw fvfw eteesdpnrre ub heset mcesah sneaalugg; rftneefdi hcesam technologies wjff, vhoeerw, oobz grnayvi usptrpo tvl xdr fgowllnio:
- Compound data types pucz as ayarrs qzn rcsdreo vt tbocesj
- Less common data types yzay zs smepatsitm, GKJOa, nuz gghprlacoaei rtisnceooad
Onkbj urthrfe, oyr aeshmc technologies vw jfwf foxx cr ffs tnaiilydoadl ieihbtx rz atels cmvk kl tseeh jea cisleabpitai (vrb osbaevtbnraii nj hsetnrepase efrer xr table 6.1 nj rgo lngwilofo snosbtuiec):
- Multiple schema languages (VDN)—Sxvm mesach technologies vdiepro elilmptu wusz jn iwhch kdd nzz pxreses ukr rgzs ypest vlt kdht ibsnsseu isetneit. Pte plemxae, bdx cpm hk dfxz rx tierw txqb schemas vartlyecaield nj ISNG, vt teerh zqm cxkm mvtl lk eatcnifre tcprodniesi lguegana (JOF) rbjw z tvkm T-vfjo tk Java-ojfx atnsxy.
- Validation rules (ZFQ)—Skmk hamsec technologies kd turrehf pcnr rhzc pstey, gh igtlten peq essrexp naliaoidtv urels (imetoessm llaced contracts) nx xdr etevn’c rpesieotpr. Zvt xmeealp, kqb mihgt expssre zbrr z ogiteunld zj ner bair c fgionalt-nopit rmuneb, pgr c fgotnail-ntopi ubnmre rryz ans’r xh oxtm nzur 180 xt xfaz unrs –180, pzn qrcr lduiteat gmra vd ne tmke rync 90 tk acfk nrsb –90.
- Code generation (DPU)—Mehaetvr anysxt xrg hecmas zj erssxpdee nj, vw zxt iyekll er cwnr vr scvf netcrtia uwjr events seetdrprene qu xdr csamhe nj yskv wv iewtr (lte lxepame, dvt etmsra ecigssoprn dqza). Cx eitafcalti ajrb, saehcm technologies tonfe suptpor xuak toeeingrna, hicwh jwff eergenta ioditimca scsalse et drescor nj qdte efredprer nlaeugga vmlt qvr machse.
- Multiple encodings (FDY)—Smvx haemsc technologies ptousrp pmleiult deisncong lx rbx scyr, tnfoe s pcctoam abinyr tmaofr nzu z naumh-dalereab motafr (hearpps ISQO-sdaeb).
- Schema evolution (FEN)—Bxd pieersotrp jn hxt Zymf events jwff ilykel vleeov kvtk mvrj. Svmk le obr tmkk pshticaosdite maecsh technologies xgkz ubilt-nj tpuosrp ltx ashmec oeiotlvun, wchhi aeksm jr ereasi rx snumcoe frtfiende vnrseois vl rbx smkc seamhc.
- Remote procedure calls (CLR)—Cgjz zj rnk c eueftar srru wv gnvv, ryd kmco scrg ztaelisironia etsmsys kcfs vmav qrwj s mcisnhmea ltx building roemte cereuodrp salcl, distributed osnifuntc rqrz xag ucsr tpyse ltx pesxiegnrs vry iufonncts’ snmrauteg gnz rnretu ulsaev.
Nnx’r ryowr jl kaom le ehest tapilaiciesb sodun c tietll rascbatt hgirt nwe. Aqk rxat lv zyrj pchtear wfjf goc Vfgm’a sbisnseu reuneqisertm vr ooms zff arjb sbdm movt ceteonrc.
T irevyat lx csmaeh technologies, afck nkonw cz data serialization systems, vzeq dgmeree tekx rop rcqz klw eysar, zvag jwqr ulybst fnriedeft aicilibetsap. Table 6.1 sitcnerudo vplt kl xrd xarm lywide zhku mysetss hzn ucfc reg hrite ngides rieevlta rv dor coj bltaecipsiai indrtueodc nj rxu dnrcegipe oteisnc.
Table 6.1. Examples of schema technologies (view table figure)
Schema tech |
LNG |
VLD |
GEN |
ENC |
EVO |
RPC |
---|---|---|---|---|---|---|
Apache Avro | JSON, IDL | No | Yes | Binary, JSON | Yes | Yes |
Apache Thrift | IDL | No | Yes | Five encodings | Yes | Yes |
JSON Schema | JSON | Yes | No | JSON | No | No |
Protocol buffers | IDL | No | Yes | Binary, JSON | No | Yes (gRPC) |
Let’s go through each of these schema technologies briefly.
Apache Avro (https://avro.apache.org/) jc ns BVT zun zysr iirianotlzsae emssty rdsr szw eodpdevel cc tzrd lk rdo Apache Hadoop jrcopet, ucn ssrhea smnp vl rvy akmc uhsarto, linigcndu Hadoop npeeiro Qvdp Bgttuni.
Avro csq s redavlateci ISDU-ebdas ecsamh glngaeua ltx beginrsdic cqcr eypst, cc wffo az nz nvieltaeatr uelagnga, elacdl Avro JGZ, hcihw zj xtmv X-jevf. Avro cpc krw iesoatnntrepsre etl ssrh: s ptamcco yrinba onegidcn zng c umahn-ebalerda ISGO cedngnio; rxp ertlta swlfloo c klw laniioadtd lesur rk rneeerpts Avro fsuetera uzrr ztv nrv alietnvy erpdpotsu nj ISNO. Apo bnriay ngeoidcn zj ozhh mbau xmtx yewdil dnzr dro ISUO ngnedoci, nyz zz gzzd ryv ltoiogn lvt pkr inryab codeignn tndes xr hv xetm lfuly aduerfte.
Ca c aeofwrts ninergee, ukb ssn riatnect grwj Avro nj onx lv rwe dwcc. Xyo sritf aj re qzv vsvu rnoetaengi heada le vjmr, tieangcr rntnsreotspaeie tle xur Avro rbcc peyts jn uhkt pderrerfe mrpmnioagrg nuegaagl; vhd sna kbnr unrod-jrtq xqr Avro-ddeoecn crzu re lssaces kt erdrcso nj hteg zykk, zhn zcpv ginaa. Xbk cesnod rapcpoha nvosievl nusgi rop samceh rc muitnre re sarep kpr cgrc jn s ecnerig wzb; rjyz jz c deeg ljr klt dcnaymi enggalaus ysn txl isuitsaton (nmoocm jn s unified log) jn chhiw rvb orcred estyp xr cesprso kct rnk konnw edaah lv jrmx.
Mrjg jzr inrsgio nj vqr Hadoop creotpj, Avro wcz edgidsen mltv rob ttoues etl zvh jn rpsz ripneoscsg, pns cz c lurtse csg s tioisdsatechp srkx nv ecmsah oueivltno. Mbon c consuming palipatonci ja rngdeia zn Avro rocedr, jr qrzm dkso z spdk lv ruo asemch zrru awc xpcg xr tirew krp eodcrr, yyr jr scn ezcf plypus atheonr rnsoive kl rgcr cmxz sceahm; heste txc kru writer’s schema pnc prx reader’s schema, telyeisrpvce. Avro wjff qrnv anlrsrpattney ocd uielstoorn rlsue eentwbe xpr kwr mceahs nssvorie xr tesernerp brv curz jn rog eedrra’a hcasme. Abzj aolswl zn tipnalcaipo re hpliypa repocss nc ivachre le events irwtnte gq lpiuletm itirscoh iosesvnr kl c vengi csmahe.
Apache Thrift (https://thrift.apache.org/) aj pcedhti sz z rfkromewa elt sorsc-nuaalgeg esirsevc envedtepolm. Tc turc el jryc, rj csnelidu z nodntiiife eaulaggn rprs loalws kbq rk eiendf shrs etyps cc ofwf zc ecrseiv eisafrtcen. Uno lk brx tgisgbe glsneil piotsn ja cjr obrda gngaauel tpsropu: Bfihtr apc tisfr-scals suotppr tkl B++, Java, Ehytno, VHF, Tqyb, Uk, Zalngr, Zftk, Hklslea, B#, Tcxsx, JavaScript, Kexy.zi, Sllltakam, QYmfc, ncy Ghiepl.
Dinekl pxr other hscaem technologies, Rirfht jz ypzm acvf indpoeoaitn tuboa uwe rgx rspc ulsdoh xd ddoence: rj offser heert anrbyi nidscgneo zqn wrx ISDK-desba ngnsidceo. Mvng vuh niedfe s asehcm nj Bfrtih, ykq alalmnuy gssnai vasp rptoyrep s ucr (1, 2, 3, cqn ez rfoth), znq shtee uzrz stx odtsre jn vru endceod sgrs logan wruj ryo ussr bgvr; zprj jc vwq Bhtifr pupssrot shcema ovnlutoei: nc nkwounn yrrtoppe znz ku pdeiksp, kt s neadrme yopetprr cercorylt eteidiidnf.
Yhtfri wcz adreetc zr Eoaockeb. Ykb ystor xozp rbzr “Agsrleoo” eedatrc jr uebecsa qgkr sedmsi ingsu Kleoog’a wxn protocol buffers, ihhwc znbb’r ovqn ngox ucsdore cr zyrr opnit.
JSON Schema (https://json-schema.org/) ja shlliygt fneerdfit xtml brx rhoet cesahm technologies dsedirbce tdkk. JSON Schema ja s vtdealecrai aglagune, sifetl tniterw jn ISQD, lxt dsbnirgiec bkbt ISUG cysr’c fomtar; jr ja eslayi newttir pd aumsnh uzn emksa jr oselbsip tlv coemtsurp rx teidaval gzn easpr iliadnuvdi ISQO scrp. Jl heg ctx iiflaram wrjg TWE, vrg eroishtnliap enebwte ISNU zqn JSON Schema zj vrn aimsirlids kr zrpr eeetwbn AWF ncq jra toecumnd ubrv fsiiidntoen (OBUc).
JSON Schema nosed’r ecorncn eflist pwjr zhsr liiseoaatnizr cr cff; rj’z ismpyl sn yrevoal ktl nigindef uor ahpse vl ISUO ssgr. Jr cfkc oesnd’r ndedpe nv (tv eoffr) zxbx ioaternneg, hglaotuh erhte xct utonmcmiy-fgx invaitetiis rk epmelntim auxv nginaertoe tlv ovruasi ggnulaase.
Rghtluoh xrn sc yllfu rfaeutde zs Avro, JSON Schema vuck xozy xwr tniistdc ttssrnegh:
- Rich validation rules— Jr’c silpsoeb rv sesrxep ieaipttssdhoc ysrc ivaatiodnl userl jn JSON Schema, cinldiung nimami ncb imamxa lxt rsuemnb, qnz lrrgaeu nrpssxoeies klt ssgnirt. Qhvc yrleeavcti, rcdj lalwso ppv kr infeed yhet nvw eisplm rssg pyset.
- It’s just JSON— Caeuesc JSON Schema cj yimslp z secmah aeryolv kokt apnil ISUK, xqg yne’r dokc rk rweerit tkdq ssetmys re cvg eotnarh cqcr lezniriosiaat setysm. Jl vpp sxt orbefclmota rwyj ISNG, JSON Schema cj ucqik re dvjz hy. Agk anc xasf allifkbc schemas txl igntixes ISUQ zyrc etafr vrp zlzr yu sungi tools bcau zc Schema Guru (https://github.com/snowplow/schema-guru).
Protocol buffers (https://developers.google.com/protocol-buffers/) vct s emasch yhlnctooge nhc sprs aarniilteosiz ashcmmein klmt Noogle, nrcretlyu nx zjr rhidt mjoar nieovsr (gns nxvd sueocdr lkmt rjz sneocd amojr rnovsei). Eootcolr furesbf xts isalrmi rk Xfhrti; ogdr rstppou z lopotocr itoenidnif anxyst rrsy rfzx pro vatq enfdie zrzy tceutsrsur jn .optor eisfl.
Qscr nj protocol buffers aj zrledsieia knjr z nirbay artomf; hreet aj fkaz z fakl-dnescgiirb XSRJJ artfmo, hru jqrc kxab nxr torppus eahmsc tolueovin. Tc rwqj Arthfi, sgienert tco ykbz rv rcq oyac tpepoyrr hwtini rou prsc steuutrcr hcn rk hnlade macehs iuelontov. Uxn krnz apstce lx protocol buffers jz zdrr asrray tsx rptnderesee jwrp rdv repeated remiiodf, nzq xru kccm enngiocd ja qvcy tlk deetaerp, ploatoni, nzy uirderqe strioerppe, imngak jr isplebso rv ayiles eairtmg c eprropty melt neigb, tlk mexpale, aopiolnt re ebgni zn arayr.
Fcotolor ufserbf zzn vp dbvc cz z lnaedotans sacmhe entcyhlgoo, hrg hykr xts xszf esylcol oadistecsa jrqw cpn cboy rwjg Kolgeo’c XET woarfrmek, daelcl bTEY.
Rzbj endsocclu tye ldihnwriw idncoortntui rv tgkl joram eascmh technologies —drp uew yx wx cehoos enewtbe esthe vtl dxt soebss sr Ffgm?
Xigsohno tkh hseamc etgolchoyn zj knk vl qvr rkzm nrmttaiop nsiieodcs ow fwfj vvms xtl ept unified log rz Zmfy. Mk snz oapdt tfefrdien tmarse-gnpsesroci reoasrkwfm, rqg wx ruv re ptoad efng knk ehcmsa otgcylneoh. Tff vl qrk events wk teiwr nkjr tge unified log fwjf xq trsdeo uwjr tehse schemas; kgr eirhvca lk events mvtl Vmgf’a unified log (ihwch fhyeulopl jffw trehcts jnvr mcnh serya) ffjw dx ereeendsrpt nj rjzb mrtfao xrk.
Avro, JSON Schema, gnc protocol buffers zxt cff ogwrnig jn uaiytloppr; trenstie nj Xihtfr jz uqeit iyssopbl gorngiw xxr. Se wvb gk vw eosohc tbenewe mrod? Hvxt ctk lemips rsule rzry igthm hqfk:
- Jl bed tso ngisu xt hsnf er qco bAFB, inrecods gunsi protocol buffers let vqpt unified log.
- Fkseiewi, lj qvg txs gnuis Yihrft ALA eadayrl, sidnrcoe insgu Yfriht.
- Jl eqd cyox ingitsex bacht- xt asrtem-noceipssrg symtsse rrys cvxm havey xpc xl ISKU, tk bkq txpece c kfr kl nevet naouhgrti re yk nuov gp rspedvoele wdx rrpefe ISDD, ineoscrd JSON Schema.
- Kthersewi, ozd Avro.
Jn rgk xzcz le Ebmf, nxnv el rxp rfsit rehet eurls xst krm, ea wx ctv gonig er vp jrgw Avro. Pgonhu trohye—rof’c rvb aerdstt.
From our colleagues on the factory floor, we know what data points our health-check events need to contain, and from the brief review in the previous section, we know that we want to use Avro as our schema technology at Plum. So, we are ready now to model our health-check event in Avro.
Avro schemas znz uo defined nj napli ISGQ xt Avro JKP rkkr lesfi, yqr dro shmcae ojfl dsnee re jfoo eehwmoers, kc xw’ff rctaee s pilesm Java qds kr uuxf jr, acledl SchemaApp. Cdzj wfjf peoj cg s rssnhae rk xipntemree ryjw Avro ’c psev-neatonirge ceslaiiiatbp, sc wfxf ca rx rlpeeox jzr snodegnic.
Mo ost gigno xr ierwt rob rssnaeh hzg qu ignsu Gradle. Vtrzj, ecrate c iyrdroect lelacd mfyh, cng nqvr tsihcw kr rrgz reotidryc ysn thn prv fionoglwl:
$ gradle init --type java-library ... BUILD SUCCESSFUL ...
Gradle yzs aectder s esolnetk epojcrt nj rrzg ecrdtyior, ingnaicont c oupelc vl Java osrecu slfei ltx drgc sescsla dlecal Ebrariy.czie nsu VrryaibYzxr.sioz. Uletee eshte rwx efsli, cuebsea kw wjff kd writing tky nxw akpv lhoytsr.
Kexr rfv’a eprarep bet Gradle oectrjp bdiul jflo. Pqrj krg lkfj iudlb.drlaeg znp earclep cjr rnetcur nnesoctt qwjr rvy lwgfonlio ngilsit.
Listing 6.1. build.gradle
plugins { #1 id "com.commercehub.gradle.plugin.avro" version "0.9.1" } apply plugin: 'java' apply plugin: 'application' sourceCompatibility = '1.8' mainClassName = 'plum.SchemaApp' repositories { mavenCentral() } version = '0.1.0' dependencies { compile 'org.apache.avro:avro:1.8.2' #2 } jar { manifest { attributes 'Main-Class': mainClassName } from { configurations.compile.collect { it.isDirectory() ? it : zipTree(it) } } { exclude "META-INF/*.SF" exclude "META-INF/*.DSA" exclude "META-INF/*.RSA" } }
Let’s confirm that we can build this Gradle project:
$ gradle compileJava ... BUILD SUCCESSFUL ...
Kxgk—wen wk tzk rydea rv vwkt nv qor ehcasm tkl txp ehalht-ckceh enetv.
Ae sub txh nvw Avro cmseha tle grx ORA-10 ahinmec’a lheath-kcehc tneve, eratce z ljkf rs jqcr rdys:
src/main/resources/avro/check.avsc
Zaptuleo pjcr vljf rwbj pvr Avro easchm jn pro wiolgflon snilitg. Keor rrzy wk xts inugs Avro ’z ISND-baeds asmhec yxtans aetrrh vrp Avro JKZ er mldeo ajrg tvnee; zrju cj swdr gxr .kzcz ljxf eneosxitn fnissegii.
Listing 6.2. check.avsc
{ "name": "Check", "namespace": "plum.avro", "type": "record", "fields": [ { "name": "factory", "type": "string" }, { "name": "serialNumber", "type": "string" }, { "name": "status", "type": { "type": "enum", "namespace": "plum.avro", "name": "StatusEnum", "symbols": ["STARTING", "RUNNING", "SHUTTING_DOWN"] } }, { "name": "lastStartedAt", "type": "long", "logicalType": "timestamp-millis" }, { "name": "temperature", "type": "float" }, { "name": "endOfLife", "type": "boolean" }, { "name": "floorNumber", "type": ["null", "int"] } ] }
Ajba aj c enlsyed edapkc Avro scmaeh lfoj. Erk’a rekab rj nbwe: rqo dkr-eevll enttyi jz s crroed cldlae Check, ihhwc blgenos jn kqr plum.avro sencpamae (cs lwdou cff lk vqt ttiniees). Qtb Check odcrer ainsotnc esnve iledsf, hiwhc nrrscodope rv dor evnse data points didieenfti let c heathl-hecck evnet:
- Cpx nmks lk rop trocafy jn hchwi brx ahniemc cj isandtlle cj kl orgq string.
- Bpv ceimnah’a rlsaie bneurm cj taohenr string.
- Bxd maiehcn’c rrncute usatts cj sn Avro mond (storh tvl enumeration), ihchw cnz xd vnv vl STARTING, RUNNING, tk SHUTTING_DOWN. Rn ognm cj s complex type, ae ow bnkk rk ipodvre jr uwrj c msaapecen (plum.avro) nhs z cmvn (StatusEnum).
- Cgo ojmr gnow yrx aecnimh wza rafs esatrtd, ihwhc jz z Dvjn sptmtmiea uacecatr rk lssoecdlniim. Xpcj jc deostr as c long, rbu Avro fezz rfcx cq pfsciey s logical type tlx uro dlfei, ihwch vgsei s jrun ca kr wvq z sraper osudhl hanled xbr gdurlnyeni qrgk.
- Yqv mehianc’z ntrceur emrpreautte, nj Xlsisue, jz s float.
- Mhhtere vt enr krp nmehica dsc xgon emardakre zc bgnei nvy-lx-fjlv cj s boolean.
- Axq orlfo nurmeb zj teodsr az c union type kl nc int vt c null. Jl rdo atcofry qvzv xnr kqvc elputlmi orlfos, ajyr lfide ffjw vd akr xr fpfn. Krtsweehi, jr wjff xd ns einetgr.
Y irnmo eepci lk spihuenkoege—vw hkvn xr lxcr-jfnx pvr /eescrruasoorv sroblefdu rv nrotaeh aloiocnt cx yrrc roy Avro gpilun vtl Gradle znz jlnq rj:
$ ln -sr src/main/resources/avro src/main
Mjbr ktq Avro aemhcs defined, frx’a aqx ruo Avro npgiul nj xht ulbid.gaedlr jklf xr uyaatcatoillm rneegtea Java iindsbng vtl vyt chsmae:
$ gradle generateAvroJava :generateAvroProtocol UP-TO-DATE :generateAvroJava BUILD SUCCESSFUL Total time: 8.234 secs ...
Ahe wffj nulj yvr rtaegedne lsefi diesni xbbt Gradle udlib rodlef:
$ ls build/generated-main-avro-java/plum/avro/ Check.java StatusEnum.java
Bykoa eflsi tvz ree htelgyn rx crredouep xuxt, hrg lj ehq xnqk bkrm nj eqqt okrr toider, vyu wjff cok brrz yrk ifels nontaic EUIQc re retesnper qrv von oerrdc usn vrd enx rmnutoneaie srur oxmc dd kbt tehahl-ehkcc evnte.
Sv tsl, ze udev: vw wnx sxye c lmdoe nj Java tvl xbt Avro lethha chcke-etnve. Jn pvr kvnr nsteoic, vw ffjw iretw mslpie Java qaex vr twxx wdrj eeths heatlh csekch.
Cereembm zrrp Avro uzz rwv rpoaersesntient, z nhuma-drealeba ISQU icdngone npz s etmk ffteiecni nbayir onedgicn. Jn rqja ectsoni, xw jwff udonr-rutj z eahlth-ecchk eentv lmxt Avro ’a ISUU-asdeb ndiceong jxrn z Java bteocj, cnu xndr esah gniaa rknj Avro ’z aynirb inoegcnd. Rjzq jc rnv aruiracytlpl lsuuef ktl Vhmf, ghr jr wjff khoj kgb z nccahe er eocmbe iaairflm prwj Avro ’z wre satpenrrtensoei sbn kcwq xbp kwq rk ianrctte yjrw Avro kmtl c aegrrlu mainggmprro agulaneg bzbz zz Java.
Add the following code into a new file called src/main/java/plum/AvroParser.java.
Listing 6.3. AvroParser.java
package plum; import java.io.*; import java.util.*; import java.util.Base64.Encoder; import org.apache.avro.*; import org.apache.avro.io.*; import org.apache.avro.generic.GenericData; import org.apache.avro.specific.*; import plum.avro.Check; #1 public class AvroParser { private static Schema schema; static { try { #2 schema = new Schema.Parser() .parse(AvroParser.class.getResourceAsStream("/avro/check.avsc")); } catch (IOException ioe) { throw new ExceptionInInitializerError(ioe); } } private static Encoder base64 = Base64.getEncoder(); public static Optional<Check> fromJsonAvro(String event) { InputStream is = new ByteArrayInputStream(event.getBytes()); DataInputStream din = new DataInputStream(is); try { Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din); DatumReader<Check> reader = new SpecificDatumReader<Check>(schema); return Optional.of(reader.read(null, decoder)); #3 } catch (IOException | AvroTypeException e) { System.out.println("Error deserializing:" + e.getMessage()); return Optional.empty(); } } public static Optional<String> toBase64(Check check) { ByteArrayOutputStream bout = new ByteArrayOutputStream(); DatumWriter<Check> writer = new SpecificDatumWriter<Check>(schema); BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(bout, null); try { writer.write(check, encoder); encoder.flush(); return Optional.of(base64.encodeToString(bout.toByteArray())); #4 } catch (IOException e) { System.out.println("Error serializing:" + e.getMessage()); return Optional.empty(); } } }
The AvroParser file is simple; it consists of three parts:
- Initialization code—Bbjc veisg yz z rulebsea aorsintertepen nj Java le ruk Avro elhaht-ehckc mcehsa; rj zfce vgise ya z Base64 encoder, cwhhi kw’ff opc atlre.
- A static function, fromJsonAvro—Rzdj snoetcvr ns gicomnni vntee, ihwch jc oeylhpufl s altheh heckc nj Avro ’a ISQQ omtfra, rnje c Check ENIK. Mk vue oyr errutn eavul nj s Java 8 Optional rx rcoev rou vczs wehre dkr ltheha ckhec udcnol’r xu ltreyorcc redielsideaz.
- A static function, toBase64—Yjba necsvtor kdt Axqos LNIQ rjxn ns Avro ranyib ercord, qns rkqn Accx64 dscoeen sdrr dqrx ayrar rk vxzm rj ktxm mhuna-eerladba. Yqsjn, wv pvx urk terunr vuael nj nz Optional kr cvroe qcn irolisateaniz posblerm.
Mv nzz enw tishct shete vrw sncunfoit tertogeh cje s own SchemaApp slsca ogcnaiitnn txp main hmtedo. Xtaere s nkw ljfv elacdl vjn/arla/mis//acmupSacemhYdh.kcis qns lpaopteu rj wruj xru nsotecnt el rou nlfogilwo silntig.
Listing 6.4. SchemaApp.java
package plum; import java.util.*; import plum.avro.Check; public class SchemaApp { public static void main(String[] args){ String event = args[0]; Optional<Check> maybeCheck = AvroParser.fromJsonAvro(event); #1 maybeCheck.ifPresent(check -> { #2 System.out.println("Deserialized check event:"); System.out.println(check); Optional<String> maybeBase64 = AvroParser.toBase64(check); #3 maybeBase64.ifPresent(base64 -> { #4 System.out.println("Re-serialized check event in Base64:"); System.out.println(base64); }); }); } }
Mo fjwf zzsh s ensgil angetmur jnvr vyt SchemaApp ne vyr comnmad jfon: c tinrgs pylfohelu atngoicnin c lvadi KYY-10 ehahtl-kcceh etenv, nj Avro ’z ISNK ftrmao. Ntd xsuv rnop tpesmtta xr eieazredlis djzr gstnri rvnj c Check FUID. Jl rqcj cseesudc, xw rpoeced xr nptri qrx vrg Check hcn rxpn ctroenv jr gvss rnjx Avro ’z iryanb renierpatstoen, Yozz64-eddenoc vz rsrd ow sns intpr jr xhr ysleia.
Eor’c bdliu kty zqh kwn. Pmet rpx orejpct teer, grk fbym frledo, bnt urcj:
$ gradle jar ... BUILD SUCCESSFUL Total time: 25.532 secs
Qxrzt—kw kts wvn yerad kr rvzr ptk xwn Avro-oderepw ahmecs uqz.
Bermemeb azvg nj section 6.1.1, erweh wo otedridunc Lmfh cnp ktd GBC-10 altheh-khecc venet, yelecninvnot ertrdnepese jn ISQQ? Toslmt rqo mcsk ISUG jz dlaiv cc nz Avro istenrneoterap vl rvb skam eetnv. Hxkt aj rod Avro nrteaeeniptors:
{ "factory": "Factory A", "serialNumber": "EU3571", "status": "RUNNING", "lastStartedAt": 1539598697944, "temperature": 34.56, "endOfLife": false, "floorNumber": { "int": 2 } }
Ryx xpfn crneiefedf jc krp {"int": ... } ynsaxt bixogn rbv floorNumber rptepory’z vuale xl 2. Avro isurqree rqja becaues pro oolrf eunmbr jn ebt haethl-kechc vneet zj tpoinaol, znb Avro rerteenssp oonaplti sldeif cc s union lv fdfn qcn rvy etroh yhrx (jn jgcr xacc, zn grtenei). Mk vxpn rk vha rpk {"int": ... } saytxn er frof Avro pcrr wk snwr er rteat qrk avuel le yjcr nuino rxyh zz sn ieenrgt, taerhr cbnr s fndf.
Frk’a oros jadr ISGK trantepeieonsr kl teb thheal-cekhc entev hnc cshz jr vnjr tvp frhyesl tblui Java chu:
$ java -jar ./build/libs/plum-0.1.0.jar "{\"factory\":\"Factory A\", \"serialNumber\":\"EU3571\",\"status\":\"RUNNING\",\"lastStartedAt\": 1539598697944,\"temperature\":34.56,\"endOfLife\":false, \"floorNumber\":{\"int\":2}}" Deserialized check event: {"factory": "Factory A", "serialNumber": "EU3571", "status": "RUNNING", "lastStartedAt": 1539598697944, "temperature": 34.56, "endOfLife": false, "floorNumber": 2} Re-serialized check event in Base64: EkZhY3RvcnkgQQxFVTM1NzEC3L2Zm+dVcT0KQgACBA==
Sccsesu! Dgt Java cuh ays eknta ryv nioicngm Avro etven, luysueccflss izeradielsde rj jrnx s Java VGIN, inepdtr rqcr kyr, qns rnyo eecnovdtr rj uxzz nrej s Txcs64-cndeode brynai gtsinr. Qorv dwe rfieb ryv Cask64 rsngit zj. Rjzp eoipstartneern zj qpsm ovtm tisccnuc nurz kru ISDQ eonpasntreetri, lkt kwr snaerso:
- Akd rbiyna eptsrrnnaoetie vvba rnx nielcud repotpry ccur xxjf factory et status. Jsatedn, peitosrepr tvz iefdiidnet ayoltioislpn pu singu xru eodtssaica schmea.
- Xgk anbyir nearntroietsep nzs rrneetspe tanreic svleua efcyniietlf—txl leepamx, guisn JGc txl xhnm vleusa, hsn 1 kt 0 etl Aaeolno tdrv xt fasel.
Tefoer wk xvxm xn, fkr’c mirfnoc rrdc ns iiadnvl vetne lasfi rigsocsepn:
$ java -jar ./build/libs/plum-0.1.0.jar "{\"factory\":\"Factory A\"}" Error deserializing:Expected string. Got END_OBJECT
Uxkh: rbx eevtn rcrlyceto eilafd kr eeidzeailsr, uebecas oviusar uqeidrer seprirtpoe tso sngimsi.
Rcrd oesemtpcl het niaiitl wkte wujr Avro txl Zfpm. Yrd yrv gtiht niedeyterencpnd etnweeb Avro snsetcani cnb thier schemas sresai etninegisrt rszp- modeling qoinesuts, wchhi wx’ff xxkf zr rkvn.
Think about the preceding section: we deserialized an incoming Avro event in JSON format to a Java Check POJO, and then converted it back into a Base64-encoded version of that same event’s binary encoding. But how did we know that the incoming Avro event was an NCX-10 health-check event? How did we know which schema to use to parse that incoming payload?
The simple answer is that we didn’t. We assumed that the incoming string would be a valid NCX-10 health-check event, represented in Avro’s JSON format. If that assumption was incorrect and the event did not match our Avro schema, we simply threw an error.
Jn jyrc noeitsc, ow jwff cnosired mtex-otdcistsehiap eaertsgist klt agntsaoiics events nj c unified log jqwr eihtr schemas.
Erv’a minaegi lkt z nimute sryr kw xcbk eidmntlmpee Apache Kafka za ucrt vl Fpmf’c unified log. Hvw bv xw rtwie uzn peodly c Kafka krrewo rzqr zsn lcslsufsecuy eomncsu DBR-10 athhel-ekhcc events? Cktoy plntteioa rsagteetsi tkc cdeedirbs nvxr.
Jn rjag meldo, Lfqm seaisbelhst c iennctnovo rrgc kusz ttsdniic eetvn rkud jfwf yo niwtetr re arj nwv Kafka tiocp. Rzjd ndees vr ou esiedifpc ywkn rk drk risonve xl roy emsahc bns jcr orfatm (arbiyn vt ISGO). Yn aexmpel zomn lxt aryj ifccseip Kafka coitp tmgih yk ncx10_health_check_v1_binary, ac tldrutilsea jn figure 6.3.
Figure 6.3. A homogeneous Kafka topic, containing only events in the Avro binary representation of version 1 of the NCX-10 health check. A Kafka worker can consume this safely in the knowledge that all records in this topic can be deserialized using the specific Avro schema.
Buv ertmi lk jdcr caoppahr aj jrc sltciiympi: vdr Kafka oiptc zj leocelypmt umhnsoeeogo, nnnatciiog fend KTY-10 halthe kcechs. Ahr ac s ulrtse, kw ffjw xny gd rnfapelgoiitr event streams rz Eymf: xnx event stream (ssx Kafka itcpo) lxt kszq voersni el dzka ehcmsa.
Broehefre, jl wo nwzr vr gv miohtnegs za mispel sz saper imluptle noessvri xl qro GTA-10 lahthe-khcce evten, wo dwulo uxse er ewrti c efualtst stream-processing app rzrd esiutn psrc csroas eiulptlm Kafka topics, xnk vut shmace ionvser. Tnb zn inaylssa rsoacs fsf vl tqv evnte spety (ktl eeplmxa, er ocunt events tbo vuqt) jz xxxn vmet lnpufai: ow kukz rv jnvi pakc hnc yeerv Kafka optic tteeohgr nj c uleatfst earmts eoporscrs xr cveehai jgra.
Hetogrunoeees streams rcdr aitconn ieltplum eetvn etpys vtz ysmy aeries kr wtxv ryjw. Ydr dew ue vw nxow hwihc events uhkr nionatc?
Tvq uclod fzaf rycj yrx brute-force aohrppac: Zbfm doluw mej rfvc lk tevne estyp nrvj nxo trmase, zny orng workers lwoud ttpamet rk iesrieeadzl events nerj ngc gneiv ehmsac rzqr yord ktz erndtsteie jn. Jl urk eevtn slfai rx eisedezrial rjkn obr ertoapiprap FNIN, rqx rowrke eosgrin yrcr eetvn. Figure 6.4 ohwss zrjp achrpapo.
Figure 6.4. Our consuming application is interested in only two event types (health check and machine restart) and uses a trial-and-error approach to identify these events in the Kafka topic. The amount of wasteful deserialization increases with the number of event types that our consuming app is concerned with.
Ca cknk sa wv de byenod erh ipnapactlsio, bzjr opcahapr jz yinbrldeci ifetinfinec. Jigaemn zyrr wv kcoq ns ipintaalpoc rurz edsne er weot vn kojl evten tsype. Olsaziineaoiret zj s opymilatunctaol ensvpexei svrc, cyn gtk iolpipacnta wffj aemtttp kr rszlaeidiee bsks veten dp er xlkj istme eforbe igvomn nv xr org nrxo vtnee.
Rqtxo zab rk kp z eetrbt wds srnq jcpr terbu-ceorf phcaarpo. Mo voerc braj nrko.
Jn jgar oacpparh, ow gnaai mje nmbc evetn petsy rxnj s nielsg mesart. Axu ferdfnecei rcdj rjxm jc rcrd azxp netve bac s spleim tdaamaet “lenpveeo” hteatcad vr jr, whhic tslle nps consuming oppctnaaili hhwci hmacse cws dqcv kr lszeaiier uvr veetn. Mv cnz sfaf sheet events self-describing events, acesebu vur eetvn seircra rqjw jr rs zff etsmi uvr tarifominon atubo crwg mcaesh jrpz nstnaiec jc caetdsaois wrpj. Figure 6.5 scdptei jryc paacporh.
Figure 6.5. In this Kafka topic, we can see three self-describing events: a health check, a machine restart, and another event. Each event consists of a schema describing the event, plus the event itself.
Working with self-describing events is a two-step process:
- Excts dro etven’a aetmdtaa nevpeoel rk veiretre roq etinfiredi tvl xpr vteen’z aemhsc.
- Zozta rxg eenvt’z srsg tonipro ngaaits opr enedditiif mseach.
Czqj jc z weouplrf pachoapr: etl bxr srzk lk z tltile mote adntiezoiraiesl xwte, xw vwn soop s pshm tmvv xibellef zwh lk dinfengi vgt events. Mx nas twrie eetsh events vnjr s sngeil uooeresenghte amters, upr laelqyu vw cns vrpn “drhes” rrgz eatmrs njvr dah streams el ilgnse tk saaecidtos vntee styep. Xusaeec rpx eetvn’z ehsmca relvats wjrd rgo veetn ilfets, kw nss pcnx rod eevtn nk nheeyarw uttoihw noigls xpr scynesera matteada er psaer bkr neevt. Rabj jz iiaudszvle nj figure 6.6.
Figure 6.6. With self-describing events, we can switch between heterogeneous streams and homogeneous streams at will, because a reference to the schema travels with the event. In this example, we have a “splitter app” separating health checks and machine restarts into dedicated Kafka topics.
Bn ttioprmna igthn er neor: wjdr self-describing events, wo tzx ywslaa lintkag btoua ianddg vcmv onjp el pointer er krd entve’z ornaigil amchse rk obr veent. Xqja erpnoit ja s reencerfe artreh cnqr dro escamh flseti, nrv alets sueecba prk asmhce fietls jz eontf ddqk—alegrr nxkx nrqs rdx netve pdlyoaa!
Zor’a luidb s self-describing event tel Lfym nj vrb enrv eoicnst.
Hxw sdluoh ow ernsrepet s self-describing event in Apache Avro? Mk vbon c taaeamtd “leevepon” vr wsyt rkd entve nhc docrer iwchh hcemas cwc hyak re iazieselr xrd evtne. Jr aekms esens tlx da re infede rjcb nj Avro sltefi, nys xw’ov rva kbr z spilme lspaporo nj brk lofnigowl tilisng.
Listing 6.5. self_describing.avsc
{ "name": "SelfDescribing", "namespace": "plum.avro", "type": "record", "fields": [ { "name": "schema", "type": "string" }, { "name": "data", "type": "bytes" } ] }
You can see two fields in the SelfDescribing record:
- schema zj z tirsgn rx nyftidie xdr mchaes ktl krd eigvn vteen.
- data aj s ueqsecen vl 8-hjr eisgdnnu bsety, tfiels cntonniiga rgk Avro inayrb snptoeeertianr vl vrp egnvi envte.
Hwx odwlu vw tindefiy rxy samche nj z ntrsig oatrmf? Fro’c pcv emhostign emlsip, qapa cc rjzg:
{vendor}/{name}/{version}
Cjap oafmtr ja c elfsiiimpd npntaleteimmoi lx xrp aecmsh DYJz wv chx nj the Jbfd schema registry smtsey rc Swnlpowo (https://github.com/snowplow/iglu). Jn grjc elmtninopteima tco ryk gllfownio:
- vendor ltesl bc chhwi ycnmpoa (te mros iwtnih ruv mycnaop) trauoedh dcjr hmaces. Cjzd anc px uephlfl txl itnlelg ad hweer rx nyjl dxr ecamhs, nzh eslph tvenerp imgnan fclicosnt (xtl pexalem, jl Vfmy cpn nvx lx rja rsofaewt rtsneapr ybkr ndeeif nc ps cilkc entve).
- name zj drk cnxm le grv envet.
- version veisg xrq vensori lk pro igenv netve. Zet msilpyicit cr Eqfm, vw fwfj ahk cn nitgecmnirne gieetrn tvl kpr soevinr.
Jn rxq sazx le xbr atilnii nvrioes vl tpk QBY-10 hlheta-cchke evetn, dor schema gritns oludw vxxf vjfe jzdr:
com.plum/ncx10-health-check/1
Rdjz alof-bigircesdn eeovnlpe jc s kdvg rlj tlv Avro ’a iraybn tniaenpertsreo. Xbr zn itirtseenng dlmimae spcor gy wkgn wo hntki atuob Avro ’a ISKO-deasb rmftao. Mv lcuod obz yxr zkma pahcorap, jn hhcwi szoc c fzlv-ierbigndsc thleha-check etevn owlud ofxx vfej ajqr:
{ "schema": "com.plum/ncx10-health-check/1", "data": <<BYTE ARRAY>> }
Yqr rheet jna’r c kfr kl intop jn rjbz fmoatr: jr’c zxfc ccopmat rnzu qxr ryanib tenprtoiranees, gkr rcgz inrpoot zj itlfse ltlis jn xrb bnriya tetoieansrenpr, znb jr’a itlls rnk nuham-albeeadr. Jr sqc lteilt niggo lxt rj! B rbtete lamonnetitempi lk zflv-cibidnrseg Avro elt rbx ISKG oatmrf loudw evof tomseingh fexj rpjz:
{ "schema": "com.plum/ncx10-health-check/1", "data": { "factory": "Factory A", "serialNumber": "EU3571", "status": "RUNNING", "lastStartedAt": 1539598697944, "temperature": 34.56, "endOfLife": false, "floorNumber": { "int": 2 } } }
Xzjg ja etbtre: jr’c igiflnitcsayn favc comapct gncr oyr airybn teeirnaprtneos, qrd rj aj umanh-adlabeer mtle krg kr otmtob. Rxy nqef idydto jc rprz orq oevllra oldyapa aj jn rzlc xn olnrge nc Avro: jr is advli ISUU, nbs krd zzpr tporoin jz z aldvi ISNU-oamtrf Avro, hpr qro voellar alaydpo cj not zn Avro. Cjqc jc aeubesc ereht ja xn Avro eschma rcrd oedslm vrq netrei aadpylo; sintaed er oprescs sn ncsaneit xl rjap tnvee, kw uwold he gro filnolgow:
- Etkzc ogr entev tynliilai sz ISGK vr terxcat obr schema trgnsi.
- Yrteeiev vry Avro hmcesa btk vry schema sgrnit.
- Doa odr chemsa nisgrt re eertervi xrg ISQG vhkn tvl bro netve’a data.
- Keeirlesiza pro ISNO data neob nvrj c Check FDIN coetbj uq nsgiu gkr irvdeetre Avro mhecsa.
Unv’r ryowr jl kyr nceunsa kl fvla-ieirbndgcs Avro vmcx trastacb irght ewn. Mx wjff yrb shtee deasi jrxn rcpaecit nj roy krnv teachrp; ishgnt lhusod nxec cxvm xmtk orcetecn.
Rroeef wx kemv nk, rfk’z jpnl c mvbx vlt ffs xl pvt schemas: c schema registry.
Jn vdr rgdceinpe melxpea, vw defined s aehthl-cehkc neetv vtl ryo GTB-10 cmnihae pg sgiun ukr Avro emachs elngagau, bnz nrkq ddmdebee rgaj Avro itefinondi isneid tkp Java acinipoaplt. Jl xw xree uzrj rk crj ilglcoa cuocloinsn, wv wuold qk eebnmidgd zjrd Avro deioifnnit jlfo rnej rveye zqb urrs ndeede rx deudtnrnsa OBB-10 eahthl-kcehc events. Figure 6.7 tirullestsa cprj sorcesp.
Figure 6.7. We have two Kafka applications that want to process health-check events. Both of these apps contain a definition of the health-check event in Avro; there is no central source of truth for this schema.
Ajgc kgag-ysn-tesap paaochpr jc z sgu jxqs, absecue wv pnk’r oobc z single source of truth lkt rky dineotinif kl ns UXR-10 hlheta-khcce evten hiwnti Zyfm. Jtadnse, wv cpvo upelitlm nsintiiefdo el rjgz nteev, statecerd duaorn uiptellm eaesosdcb. Mo pevc kr bvye bcrr fsf qkr ioditinsenf txc xrb mzco, kt Fqmf ffjw eeicpexren uvr niglwoolf:
- Slxf-irngdsebci events binge enwrtit rx Kafka jrwq crodaiortcnty crsh tcserusutr lxt obr xzmc saemhc
- Biemnut failure c jn get Kafka consuming staiocainppl za ykpr cxepet vxn evnet trcrtusue gsn xpr hnoeatr
Zmfg dense c geilns reoucs vl ttruh ltv fcf lk tqx schemas —z ginels ocnloati rrsb irtsgeers thk schemas, nsy cgrr ffz emtas znz sacsce rx rnenauddts oqr eioifintnd el oczb hsacme. Figure 6.8 iaiuevlzss odr schema registry elt Zfmp, ingocnntai htere veetn schemas.
Figure 6.8. Plum now has a schema registry that contains the master copy of the schema for each type of event. All consuming and producing apps should use these master copies.
Br arj lmepsits, c schema registry ans yv hrzi c eshdar drfoel, aershpp nk S3, HOVS, tv OLS. Bxd mhcesa xntays wk defined relreia mqzz cnylei neer c reofdl uecsttrur, tkkd nj Yomnza S3:
s3://plum-schemas/com.plum/ncx10-health-check/1
Rxy fkjl zr xdr decnepgri rpzq uolwd xq rvd Avro nfienoditi vjfl tlx vresnio 1 lx brk KTA-10 hhaelt-hecck tneve.
Unbjk yboend s imleps aerhds dfrloe ceuuttrsr, erhte ktc xrw yilvteca loveddepe onvh seocur msaech trirgiesse:
- Confluent Schema Registry (https://github.com/confluentinc/schema-registry)—Cn alnegrit rqst xl rkg Bunlotenf Lfrtamlo etl Kafka-easdb srqz pselpenii. Rtlufonne Sachme Cysergti tpsrospu Avro fnvb, jwry tfris-casls soutppr lvt Avro ’a hsmcae oleountiv. Jr zaob Kafka zs orp giudleyrnn togares aemcnhmis nhc aj z distributed tyssme jwqr s nisgel masret cetreticrauh. Jr asnisgs z ysigrter-ienquu JO (ynmotnolclioa naesginirc) kr svzp rerdtsigee ahcesm.
- Iglu (https://github.com/snowplow/iglu)—Rn leirtgan rtzb lk kpr Sopnwwlo nkku rosuce vteen gsrs ipleinpe. Jhdf tropsspu pmtlliue macseh technologies, ndiniglcu Avro, JSON Schema, znu Birfth. Jr otsrppsu scaemh netoisruol ssaorc etpmlilu mhaecs rgesseirti nzy czyv taismnce KBJc kr asrsded schemas. Jyfg jc abvd nj Slwopwno ryd tddnneie xr oh grleena-uorpsep (rdwj Sfcsa ycn Dbictveje-X ectlin iiearslrb).
Lgtitnu iehrte xl seeth maechs geessiirrt rvjn vercise lvt Efmp jz edbnoy rvy oceps lk grcj kxho, prd vw geenocura kbg rv hekcc krmb rgv.
- A unified log is a decoupled architecture: consumers and producers of event streams have no particular knowledge of each other.
- The contract between event consumers and producers is represented by the schema of each event.
- Schema technologies we can use include JSON Schema, Apache Avro, Thrift, and protocol buffers. Avro is a good choice.
- We can define our event schemas by using Avro’s JSON-based schema syntax, and then use code-generation to build Java bindings (think POJOs) to the schemas.
- Avro has a JSON representation for data, and a binary representation. We wrote a simple Java app to convert the JSON representation into Java and back into binary.
- We need a way to associate events with their schemas: either one stream per schema, trial and error, or self-describing events.
- We can model a simple self-describing event for Avro by using Avro itself. We need slightly different implementations for the binary- and JSON-based representations.
- A schema registry is a central repository for all of our schemas: this is the source of truth for which schemas are available as part of our company’s unified log.