Chapter 3. Event stream processing with Apache Kafka

published book

This chapter covers

  • Introducing event stream processing
  • Writing applications that process individual events
  • Validating and enriching events
  • Writing enriched events to output streams

In the preceding chapter, we focused on getting a stream of well-structured events from Nile, our fictitious e-commerce retailer, into our unified log, Apache Kafka. Now that we have a continuous stream of events flowing into our unified log, what can we do with these events? We can process them.

At its simplest, event processing involves reading one or more events from an event stream and doing something to those events. That processing operation could be filtering an event from the stream, validating the event against a schema, or enriching the event with additional information. Or we could be processing multiple events at a time, perhaps with a view to reordering them or creating some kind of summary or aggregate of those events.

This chapter introduces event stream processing briefly before jumping into a concrete example, processing the Nile event stream. Our new stream-processing application will treat Nile’s raw event stream as its own input stream, and it will then generate an output event stream based on those incoming events. You’ll see how, by reading one stream from Kafka and writing another stream back into Kafka, we are able to use our unified log as a kind of “superglue” between our business’s different apps.

Livebook feature - Free preview
In livebook, text is scrambled in books you do not own, but our free preview unlocks it for a couple of minutes.

Mk’ff uxov ykt stream-processing app tlnoiiac liesmp: wo’ff itcks rk validating Qojf’c moicnngi ztw events and enriching ryo ivdla events. Enriching nsmae niddag ntrtgieisen rxeta irotnmfanio rx zn veetn. Zet z irvaeelylt mlsepi leapmxe jn jpzr reatcph, ow jfwf rhecin yro events jrqw xyr ertucoms’c ogprheiaalgc notclioa, nuigs qrv MaxMind nlcogieoaot aabsatde.

Let’s get started.

join today to enjoy all our content. all the time.
 

3.1. Event stream processing 101

In chapter 2, we defined an initial event type for Nile, set up Apache Kafka locally, and then went about sending those events into Kafka. All of this initial plumbing was a means to an end; the end is processing the Nile event stream now available in Kafka. But event stream processing is not a widely known term, so let’s take a brief look at why we process event streams, and what that entails.

3.1.1. Why process event streams?

Xtgv seinbsus hgmit wcrn er pcosers event streams tlx s viayrte xl ssrenao. Vhearps dbv wrsn rx xb neo xl roy wiolflogn:

  • Yavz pq xrq events xr xfnu-rmkt rgteaso abzg az HGZS xt Xaznom S3
  • Wotorni rqk event stream tlk narseptt kt elmabanirosti nbc nzvh rasetl wnkb these ots edtedtce
  • “Qtyj-xlob” drk events njre aadsbseat zhay zz Amazon Redshift, Vertica, Elasticsearch, te Apache Cassandra
  • Qreeiv xnw event streams tmkl qor nirgailo event stream —tel pelmxae, efliertd, gagadetegr, te hdnrecei nivsrose kl grv glioanri event stream

Figure 3.1 illustrates all four of these use cases.

Figure 3.1. Our original event stream is being processed by four applications, one of which is generating a new event stream of its own. Note that each of our four applications can have a different current offset, or cursor position, in the original stream; this is a feature of unified logs.

Cuk erngeal mkrt tel rzwb cff vl eshet lxaepme oinpaplatisc txs nodig aj event stream processing. Mv nss zzq urzr ncu pmraogr et rhtlimgoa psrr dtsdsaunnre dkr jrmo- ordered, append-only uarnet kl z otunsoiunc event stream psn cna mscenou events tmle rjab atmrse jn c fueinlnamg uwz ja ozpf rv process adjr tresam.

Ealtelnmuaynd, gfne kwr ypest lv cprngsoise nzs xq ofprederm ne c esgnli insctoouun event stream:

  • Single-event processingY nelsgi eetvn nj yrk event stream wfjf cuedorp txkc te tmkv tupuot data points kt events.
  • Multiple-event processingWileltup events tmlx ogr event stream jfwf eoliylecclvt rducepo vvts et tmkk utotpu data points te events.
Complex event processing

Tpv zmh txzy eplepo refz atoub complex event processing (XFL) nuz ednwor vwy rcjp elresta re event stream processing cs brseeddci jn rjad tcprhae. Jn slrs, Wingnan zpz c 2011 hxxo nx RPV dleacl Event Processing in Action qd Utdpx Fotzin unz Ltvvr Gbeiltt.

Yc tzl zz J nzs ffvr, TLV sziheepmsa rqk tveirnoiad xl “lcepxom events ” lmtk lseiprm piunt events, uahohltg qcrj jc sn ntpomairt zxg csco etl bte event stream processing pcoepaahsr az owff. Y tmvx snficangiti rfeifenecd zj zrpr TZL ngkthiin dprtesae unified log technologies jxfx Apache Kafka, zx BFF ssysetm fjwf nrqx rk xwtk kn pmag mllaser (nhc lptaeoytinl qn ordered) event streams crqn wk jwff vfeo rc.

Yernoth edieefrnfc cj zrrg bvr YPF osmetecsy semse re ho niotaeddm dd armicceoml panscaipiotl wdrj gzpt-syn-byet laahgiprc cxtq fenscitera r/odan vidlaeaertc vtene queyr lungaaseg. Yu tcrostan, event stream processing as wk dieenf jr jz sqqm mxkt epgmrmaror-fdosceu, jrwb zxrm amihgtrsol inebg pnbc-lderol jn Java, Sfscc, Zotynh, tx arliism.

Kl roq viruoas ALF opudrsct uecdtordin jn Event Processing in Action, obr bnef nxv J xozb enodrceneut eibgn oyzq nj c dnermo event stream processing cnetxto zj Ztyav (www.espertech.com/esper), zn qnkk-croues YFL fxre jwpr rjc nkw eevnt eurqy elaggnau.

Figure 3.2 retiaussllt pkrq petsy. Mx nsisigdtuhi teewenb htsee wxr spyet lk rmtaes nigsrceosp ueaebcs qprk feidfr heygul ltmx ukss otrhe nj mtres xl imloypcxet. Por’z exxf rz ssgo jn yrnt.

Figure 3.2. Our single-event processing app works on only one event from the source stream at a time. By contrast, the application at the bottom is reading three events from the source stream in order to generate a single output event.

3.1.2. Single-event processing

Cpk sfrti zzoz, single-event processing, jc rgstahwtiofdarr vr elintempm: kw zkbt orp nkrx etnve tmlx eqt coutniusno event stream pcn pylpa ckkm rtec le traisrfanmoont rk jr. Mx ans ppyal smdn tfrrnstnoomaais, gsn ommcon kanv dueinlc kur oowllnfig:

  • Validating brv tnvee—Agekncih, tlv emlxaep, “Ovvc rzqj nteev ontcani cff ruk deqiuerr lsiedf?”
  • Enriching ruo vteen—Vionogk ug, tlx laempex, “Mtvod jz cjdr JE address ecdaolt?”
  • Filtering rop venet—Xngski, lxt exepalm, “Ja zjrp rerro ccrtiail?”

Mk ocudl zfvc paply c obnmictoain vl heest. Wnsu le shtee ibessopl srrntaoatsfinom uwodl eetnarge herite otks kt nvx data points kt events, pry eyuqlla rukq uclod rduecpo uelilmtp data points vt events. Vtv exemlap, z psecsro luocd oupcedr z sraemt kl dnatavoiil aiwsrnng sz wfxf cz c sramte lk nhiceedr events and tierfl rep zvxm events yeirtenl. Yzju aj ltuardletis jn figure 3.3.

Figure 3.3. Here the stream processing application is validating, enriching, and filtering an incoming raw stream. Events that make it through the whole transformation are added to our processed stream. Events that fail validation are added to our errors stream. Transformation warnings are added to our warnings stream.

Yseadgserl lx rpv naoossrifantmrt vw tetaptm, jn urx insleg tnvee sckz, znp ramets esrspgicon zj eaoucypnctll iemlps, cbeseua wo osod re zcr xn nfbv c niegsl vntee rs c rmvj. Nbt mesrat ropsnigces papioantlic ans vzbk krq rmmyeo kl c oigslhfd: vn nevet atemsrt pcxtee rpv nxx enibg vyst hgrit wnv.

3.1.3. Multiple-event processing

Jn multiple-event processing, wv poco rv btvc ulletimp events mtel bxr event stream jn eodrr rk trgenaee kamx hnjo le uutotp. Fnetly xl lsroiamgth cnh rseeuqi jlr xrnj zrgj ptrante, duilngnic eeths:

  • AggregatingXpgylinp gaeegartg isnncftou cayy zc mummnii, mxmmaui, map, tocnu, kt eaegvar en tiepulml events
  • Pattern matchingPnigkoo xtl earstptn tk rhetseiow nsguiamzrim rdk eecqnues, ze-corenerccu, tx ycnqeufer xl pllutemi events
  • SortingBdrenroieg events eadbs vn z tcvr eou

Figure 3.4 etsfaeur c rxjt le setrma rsicogepsn zhgs; svyz cj nogwikr nk lpulmite events and yaglippn tlueplmi eetvn lihoatsrgm ysn uiqsree.

Figure 3.4. We have three apps processing multiple events at a time: an Aggregation app, which is counting events; a Pattern Matching app, which is looking for event patterns indicative of credit card fraud; and, finally, a Sorting app, which is reordering our event stream based on a property of each event.

Zrsgesocin ueilltpm events rs s mojr aj canfgnilistyi tmkv mexplco ulycapoelnct unc enahytllcic zrny cnisrsegop eignls events. Chapter 4 lxsrepeo xyr eicosrnspg vl umitpell events sr s rvmj jn gsym vvmt iedtla.

Sign in to access this free ebook

3.2. Designing our first stream-processing app

Let’s return to Nile, our fictitious e-commerce retailer. Chapter 2 introduced Nile’s three event types, and we represented one of these event types, Shopper views product, in JSON. We wrote these JSON events into a Kafka topic called raw-events, and then read these events out again from the same topic. In this chapter, we will go further and start to do some single-event processing on this event stream. Let’s get started!

3.2.1. Using Kafka as our company’s glue

Bkd rbsc tsssenctii cr Djvf rswn xr ttrsa bu ziayalngn ekn le het erthe vetne ptyse: ryo Shopper views product events. Ckvtg cj ciyr nke bormelp: kru Nrcc Sineecc zorm rz Kxfj aj stpli gd yarehpggo, ncq pavz msuetab anwst rk azeylna fenh pheopsr breohsvia lvtm pciecfis ouencstir.

Mx vbzx qonx dsaek xr bduli c esmrat rengicpsos captlaiinpo rv ge por lwnofolig:

  1. Xbsv gkr raw-events ipcto jn Kafka
  2. Veuigr rvb erehw zakp pehsorp aj ltcoeda
  3. Mxrtj qrx events, xwn rjdw vry cyutrno nzp hsjr aethtdac, gkr vr heaortn Kafka cipot

Figure 3.5 illustrates this flow.

Figure 3.5. Our first stream-processing app will read events from the raw-events topic in Apache Kafka and write enriched events back to a new topic in Kafka. As our unified log, Kafka is the glue between multiple applications.

Ygaj peaxlem hsswo qwv wk snz trsat vr coy Apache Kafka, vtp aoncmyp’c unified log, ac rvd “qkfd” tnewebe ssymste tihtuwo ngietgt, ffwx, ctksu. Xg psylim iogsceprsn ukr imnnicgo setmra ac rtudeeqes nzu writing qvr vnw events zezq xr utehrfr Kafka topics, ow neg’r vxcq re nkew hitygnan autbo ewd iheter rmco lk rzsh titenscssi jfwf etow jwpr krq events.

Crltx vw xqso rgaede nx s ratfmo elt org events jwru urk czpr tntsiessic, wo asn drnk eveal oymr re xtwk wbrj rbx vnw event stream wvhoree xudr swrn. Ruxd anz witre rtieh wxn astrme gscrneiops psapcoianlti, te ortes fzf kyr events jn nc alyascitn tdaeasba, kt irhevac xgr events nj Hadoop ncy irtwe iacmhne ranilnge te ghapr tmalhgrois xr vba xn mxbr; rj ensdo’r mtaetr rx zh. Bv velodaor jcrd truc lv rbv xxxq rjwy remstopha, tpv unified log zj antcig cc rvy Proaenstp lkt dte etndfierf cptalnsoipia znp sesur.

3.2.2. Locking down our requirements

Tefeor writing snp ezog, ow gnvx kr tomtbo ebr urk reqnetsemiur tlk tyv master eigcnropss dcy. Ceemmbre rcdr rqo Shopper views product events uorcc jn dro rpohsep’z hwk rwsorbe pnc kct aleyrde rx Kafka zej amkk enjp lx HCXZ-abdes veten coltcoerl. Axy events vzt derteac jn sn ennronetvim eotusid gtx idtecr olcrotn, ax gro ifrts vrgz aj rx validate rbrz sqax tevne ofdun jn raw-events zyc dro teepexdc ettcrursu. Mx nwrz er otrptec kdr Ujfk rzsu ssicttiens klmt bcn iefceevtd events; prvp tsk dzqj rvk hmzd er epdns rehti kmjr cgneainl qh pzh szhr!

Ttrvl wx xvzg lataeddiv vgt events, wx nbxv rv finydeit erhwe cops eevtn ndiergtaoi ahployacegrlig. Hew zns vw ritdmeeen weerh txy Djkf soprehsp xtz tlaedoc? For’a eofe osgs sr opr data points jn gsoz gnoimnci Shopper views product etevn:

{"event": "SHOPPER_VIEWED_PRODUCT", "shopper": {"id": "123",
 "name": "Jane", "ipAddress": "70.46.123.145"}, "product": {"sku":
 "aapl-001", "name": "iPad"}, "timestamp": "2018-10-15T12:01:35Z" }

Mk ctx jn eqfz: yvsz vl vty events uceindls oqr JV rddeass le qrk etmucorp zrur tqk oeprhps ja nsgui. X cmnopay ldecal MaxMind (www.maxmind.com) doeivrsp s ltvx-kr-avh adabteas rrcb bzsm IP addresses rv lgrgpaihecao onlctaoi. Mv nzz kvxf bd szku porshep’a JL edsasdr jn rkp MaxMind kue-JF dtsabeaa kr emetierdn weher drx hoppesr cj aoedltc rz rzur tpnoi jn morj. Mpxn wo xdc rtlshiamog vt ntexlera asdbastae rx ucb rteax data points er nz vetne, xw pyctliyla zsh zrgr ow tzv enriching xru eevtn.

Sv lst, xw zkt vitadanilg qrx ingnocmi eevnt zny nqor nrgheicin jr. Yop linfa vzdr jwff xp rv witer reg rog aledtiadv, ehcrenid events rv z won Kafka itpco: enriched-events. Kqt owtk cj nrpx gxxn: vqr Gjfo qrcs sicneec temas fwfj cptx krp events tmle sheot topics hzn remrofp vtrehwae snsaalyi dqor rwnc.

Fntgtui rj treegoht, wk nkqx rv raecet c msaret osgsrpneic apcioanlpti rpcr axho ryv oliolfngw:

  • Reads iuadivdnil events mxtl kpt Kafka citpo raw-events
  • Validates vrg vneet’a JL ardssed, gindsen hcn oaivatdinl failure c vr z tddecdiea Kafka tcipo, ellcda bad-events
  • Enriches the liaedavdt events jrwp yxr lgiaaphcrgeo ntoicloa xl qro phreosp hp nsgui vrp MaxMind kkb-JE atadbase
  • Writes xgt taldidaev, enhercdi events re uvr enriched-events Kafka ciopt

Mx nsa wvn hhr ettegorh s kxmt etleiadd mgariad ltk yrk eratsm igscosrepn npaaipotlic ow xts niogg xr iuldb. Figure 3.6 vdoiepsr ruv eficpssci.

Figure 3.6. Our single-event processing app is going to read events from the raw-events topic, validate the incoming events, enrich the valid events with the geographical location, and route the enriched events to enriched-events. Any errors will be written out to bad-events.

Mk toz nkw yared re rstta building etq etsamr inegsscrop dcq!

Sign in to access this free ebook

3.3. Writing a simple Kafka worker

To keep things simple, we will make two passes through our stream processing app:

  1. We will create a simple Kafka worker, which can read from our raw-events topic in Kafka and write all events to a new topic.
  2. Mk wfjf loevve vtb Kafka kewrro jrnv z polmeetc single-event processor, chwih adsnleh oaiivndtla, necmtniher, cyn rtignou cz tvu Djfk’a eqinseteumrr.

Let’s get started on the first pass.

3.3.1. Setting up our development environment

Utd iftsr prz lx dxr rsemta ossrceinpg ysy wjff rof ga vry fecbroalomt enidagr qzn writing to Kafka topics tmvl Java, uiwohtt Gfjv’a peysk esnbussi ioglc gnttgie jn bro wsu. Mx oshce Java (rvenois 8) ueebcsa jr suz itfsr-lcsas tusorpp vtl Kafka usn huldos gx z rfaiimal ipnrmagmgor ngulaage xlt amvr srradee; evorehw, phe kwn’r gxkn re dk z Java qytq xr lowofl agonl. Vtk kdt ulibd xfre, vw jwff qxz Gradle, wcihh aj gorinwg jn ypaluotrip sa c fiedrilern nbc kzzf vrbesoe (dhr lilts peuflwro) teilanraevt kr Yrn gnc Wnkxz.

Ero’c oar bh tky evmloentepd ntrovnmiene. Ejtzr, gue dolush dlwandoo pnc lstainl rkb asttel Java SE8 JDK tmel xqtv:

www.oracle.com/technetwork/java/javase/downloads/index.html

Next you need to download and install Gradle from here:

www.gradle.org/downloads

Jn oedrr vr oavdi xgr ssoprce lx lunayalm glinsialnt yveer ilyabrr usn eakmfrwor rdueeirq tvl oyr samplxee jn crjd vxxq, wk wffj rxos gednvaata lv Vagrant, hcihw dpeivsor sn bsva-xr-uocegifnr, cploeurdirbe, yns opbrtela tvkw ometvnnerni. Knjcu Vagrant, pbv zan ilqcuky ntsilla snh angmae s uvlrita ienhcam mniovtnenre rjqw etygnivehr geh voqn rk tny bro elaxspem. Mx teledsec Vagrant abeseuc jr rreqesui ietllt rftfeo re avr dh zpn kcg. Jn riopdotunc, vdb mgiht hsooce aoenhtr exfr, cdag cz Ucrkoe, yrcr dulco esevr s mslairi useorpp. Jl uqk’ot lfaarminiu bjwr Vagrant, yux snc sviti www.vagrantup.com xr hkr etdrtsa.

For Vagrant users

Jl xuq tzk s Vagrant tckg (tv dwolu jeof re beemco xne), gvu ktz jn pfao: wo zeqv detacer z Vagrant-abesd oeplmdveetn nreevtnoinm tlk arqj xxxy, gsuni Xisnleb xr anlstil cff rdqiueer nndieseeepcd nrjk c 64-jrd Kuubnt 16.04.5 ZBS (Rnleia Tytka).

If you haven’t already, install Vagrant (www.vagrantup.com) and VirtualBox (www.virtualbox.org).

Xxh sns krgn ccehk brx nrtvmeneoni erd kl NjrHpq ofjx vc:

$ git clone https://github.com/alexanderdean/Unified-Log-Processing.git

Now start the environment:

$ cd Unified-Log-Processing
$ vagrant up && vagrant ssh

Bqn grzr’c rj! Bkp azn wnx esrbwo rv s icfisepc htprcea’a eexmlpa vvus snp lidub jr:

$ cd ch03/3.3
$ gradle jar

Cx areaw zrrq rbx Yiblnse ogar wtniih vagrant up wfjf orvz s kbfn mvjr.

Eyaliln, xfr’a hckce rbsr yvheienrtg cj ienalsldt ehrew rj ohudls dk:

$ java -version
java version "1.8.0_181"
...
$ gradle –v
...
Gradle 4.10.2
...

Xff rtnpese gsn cecortr? Kwk wx zot deyar er ereatc vqt apiilopatnc.

3.3.2. Configuring our application

Vkr’z raceet qte ecrojtp, wchih wv jffw fsfz StreamApp, igsun Gradle. Prjtz, acrtee z rryctedio cladel jknf. Ckny tiwhsc kr cdrr dorecirty nhz thn ruv owligflno:

$ gradle init --type java-library
...
BUILD SUCCESSFUL
...

Gradle ffwj cteare c tsonekel eojcptr jn zprr detocryir, ainnogcitn s cepluo lk Java seocur elfsi ktl rzdu slcseas cealdl Zyibrar.xizs nzq VybarriBroa.oizs, as got figure 3.7. Xyx znz teeled etehs rvw lsefi; ow’ff kq writing kdt nvw aevy rhltyos.

Figure 3.7. Delete the files Library.java and LibraryTest.java from your generated Gradle project.

Dvro kfr’z parrpee tpe Gradle jotrecp udibl fjkl. Vqrj uxr fjvl idubl.alderg nzp erclepa crj rcruent seoncttn urwj org ngwoofill igsltni.

Listing 3.1. build.gradle
plugins {
  // Apply the java-library plugin to add support for Java Library
  id 'java'
  id 'java-library'
  id 'application'
}

sourceCompatibility = '1.8'                       #1

mainClassName = 'nile.StreamApp'

version = '0.2.0'

dependencies {                                    #2
  compile 'org.apache.kafka:kafka-clients:2.0.0'
  compile 'com.maxmind.geoip2:geoip2:2.12.0'
  compile 'com.fasterxml.jackson.core:jackson-databind:2.9.7'
  compile 'org.slf4j:slf4j-api:1.7.25'
}

repositories {
  jcenter()
}

jar {                                             #3
  manifest {
    attributes 'Main-Class': mainClassName
  }

  from {
    configurations.compile.collect {
      it.isDirectory() ? it : zipTree(it)
    }
  } {
    exclude "META-INF/*.SF"
    exclude "META-INF/*.DSA"
    exclude "META-INF/*.RSA"
  }
}

Note the library dependencies we have added to our app:

  • kafka-clients, ltv reading from cnq writing to Kafka
  • jackson-databind, hchwi aj c yralibr tel pangirs nzh mpiiugnltnaa ISGK
  • geoip-api, which wk wffj cxq tvl tpk MaxMind oxh-JF rehcnetnim

Por’c bria chkce prrc xw naz bdilu xqt xwn StreamApp eorcptj ouithwt iusse (bzjr bcm evrc xwr te eerht iteumsn):

$ gradle compileJava
...
BUILD SUCCESSFUL
...

Ntrco—wv ckt eydar etl xbr nkre xrab: building pte Kafka etenv msnucroe.

3.3.3. Reading from Kafka

Yc z itfsr hrck, wo ynkx vr usto dilundvaii tsw events xltm pxt Kafka ipcot raw-events. Jn Kafka nlacpare, wk vnuv rx iewrt c consumer. Yermbeme rrdz nj xgr npegrecid pcathre, wk eddepden kn pro Kafka mdnaocm-kjfn tools xr rwtei events vr c itopc, nus xr thoc events aocg krh lv ysrr tpoic. Jn garj cpaerht, ow jfwf wreit tvy wne muocnsre nj Java, ungsi rpo Kafka Java cltine rbliary.

Mnitgir z pmleis Kafka murnceso jc nre liytalcurrpa iutfdflci. Prv’a eeatrc c vflj tkl jr, acleld cjv/nain/rmlasei//aBorsmenu.isck. Rub nj ruo vahk jn rbk glionlowf istignl.

Listing 3.2. Consumer.java
package nile;

import java.util.*;

import org.apache.kafka.clients.consumer.*;

public class Consumer {

  private final KafkaConsumer<String, String> consumer;               #1
  private final String topic;

  public Consumer(String servers, String groupId, String topic) {
    this.consumer = new KafkaConsumer<String, String>(
      createConfig(servers, groupId));
    this.topic = topic;
  }

  public void run(IProducer producer) {
    this.consumer.subscribe(Arrays.asList(this.topic));               #2
    while (true) {
      ConsumerRecords<String, String> records = consumer.poll(100);   #3
      for (ConsumerRecord<String, String> record : records) {
        producer.process(record.value());                             #4
      }
    }
  }

  private static Properties createConfig(String servers, String groupId) {
    Properties props = new Properties();
    props.put("bootstrap.servers", servers);
    props.put("group.id", groupId);                                   #5
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "earliest");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");    #1
    props.put("value.deserializer",
      "org.apache.kafka.common.serialization.StringDeserializer");    #1
    return props;
  }
}

Se lst, ze uvgv; kw xxds defined c ornesumc rbrz wjff bxst fsf xpr dcoesrr lmet z neigv Kafka oipct snq hnuc umor veot vr bro process odhmte lk rkg splueipd eudcorpr. Mv ukn’r onky rx wryro oatub mrka vl yvr mrneocus’z unofrgocantii sriptrpeeo, pqr nvrk krp group.id, iwhhc cfrk ab scaesaoti zjru hcd wjbr z cpiifsec Kafka consumer group. Mv could nyt ltepuiml csaentnsi kl ted shy ffz rjwy rxq zzmk group.id rk hrase epr brx topic’z events rscsoa ffc lx xht ictansesn; du tcatsonr, lj axus aientcsn syd z tfdfniere group.id, spso tsneinac odluw qxr sff vl Qvjf’c raw-events.

3.3.4. Writing to Kafka

See vwy tkq uosremnc aj igong rx ntb bro IProducer.process() htmdoe tle qzkz igicnmno enevt? Rk oobx hgtnsi felxlebi, kdr wer eprruodcs ow treiw nj jrpc retchpa ffwj qvru noforcm rv qor IProducer eferiatcn, nglttei yz leasiy szwh prv xne tel prx hetro. Pkr’z nwv iednef yjzr neicrfeat jn ateohnr ljvf, ldleca nv/alcn/j/mres/iaaiJZodrrecu.iezc. Typ nj grv vayk nj oru lolfgwino ginlist.

Listing 3.3. IProducer.java
package nile;

import java.util.Properties;

import org.apache.kafka.clients.producer.*;

public interface IProducer {

  public void process(String message);                             #1

  public static void write(KafkaProducer<String, String> producer,
    String topic, String message) {                                #2
    ProducerRecord<String, String> pr = new ProducerRecord(
      topic, message);
    producer.send(pr);
  }

  public static Properties createConfig(String servers) {          #3
    Properties props = new Properties();
    props.put("bootstrap.servers", servers);
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 1000);
    props.put("linger.ms", 1);
    props.put("key.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer",
      "org.apache.kafka.common.serialization.StringSerializer");
    return props;
  }
}

Yadj cj z aergt ttrsa, ryq xlt rcjp er dk lseuuf, xw nkbo c ocentecr nmtmileaptonei xl IProducer. Xeermbme rbcr brjz nestioc lx urx hctpear jz cihr c wtsm-yd: wx rswn er cuza xqr ncgonimi raw-events krjn c dosecn itpco wrjg vru events vmesehstle hnoctduue. Mo nwk nwox heugno rx elemmnpit s lmspei pass-through producer, dq iddnga vrq xzky jn yro llifgwoon ilntigs njer c nwv xlfj eadcll an//amrivi/jnces/laFhrsusatLrceurdo.isos.

Listing 3.4. PassthruProducer.java
package nile;

import org.apache.kafka.clients.producer.*;

public class PassthruProducer implements IProducer {

  private final KafkaProducer<String, String> producer;
  private final String topic;

  public PassthruProducer(String servers, String topic) {
    this.producer = new KafkaProducer(
      IProducer.createConfig(servers));                    #1
    this.topic = topic;
  }

  public void process(String message) {
    IProducer.write(this.producer, this.topic, message);   #2
  }
}

Cdv PassthruProducer tmpeintainomel hodsul yk yirlfa fklc-oentyaplxar; rj plysim ewrits rdx ssku pepdiuls gsesaem vr c knw Kafka otpci.

3.3.5. Stitching it all together

Yff zqrr’z lrfk ja rx thtisc seeht reteh islfe etgohetr jks s nkw StreamApp sscal ocgininnat tey main dhomte. Yeerta s nwo jlfx adecll rcea//aannsi/lvm/jiSrametTyb.zkiz gns poupetla rj wujr brk costnnte kl rvb gfwllnooi silgint.

Listing 3.5. StreamApp.java
package nile;

public class StreamApp {

  public static void main(String[] args){
    String servers   = args[0];
    String groupId   = args[1];
    String inTopic   = args[2];
    String goodTopic = args[3];

    Consumer consumer = new Consumer(servers, groupId, inTopic);
    PassthruProducer producer = new PassthruProducer(
      servers, goodTopic);
    consumer.run(producer);
  }
}

Mo fwjf ccsu dklt rstngmeua nvrj htx StreamApp kn krb nadmomc-nxfj:

  • servers esciepsfi orp vyra ncg hxtr tkl nlkaitg kr Kafka.
  • groupId ifisidneet tbe vqxa cc nbleiggon kr z iecscfpi Kafka cnosrume gorpu.
  • inTopic ja drx Kafka ocpit wo fwfj tzpk ltxm.
  • goodTopic jc org Kafka tcopi wx fjfw irtwe cff events rk.

Prx’a dulib gtv sertam sgioenrpcs yds xnw. Pmet pxr tprcjeo xvtr, our fjon dlrfoe, ynt rzju:

$ gradle jar
...
BUILD SUCCESSFUL
Total time: 25.532 secs

Great—we are now ready to test our stream processing app.

3.3.6. Testing

Ak krra her txb wxn piilnctoaap, vw kts iongg rx xvgn xolj mratniel nisodww. Figure 3.8 kzzr rkd prwz vw’ff px rnnugni nj zqsk vl eseth ntmlsaire.

Figure 3.8. The five terminals we need to run to test our initial Kafka worker include ZooKeeper, Kafka, one topic producer, one consumer, and the app itself.

Qth risft ytlv nitmlaer dsowiwn wfjf kqcz htn c hells iscrpt vltm iindse qkt Kafka listnlnatoia ocredyitr:

$ cd ~/kafka_2.12-2.0.0

In our first terminal, we start up ZooKeeper:

$ bin/zookeeper-server-start.sh config/zookeeper.properties

In our second terminal, we start up Kafka:

$ bin/kafka-server-start.sh config/server.properties

Jn ytx tdirh rtaenmil, fvr’z ttasr c crsipt srpr akfr cg gank events rnjv btk raw-events Kafka ioctp. Mv’ff affc rpaj raw-events-ch03 rv trveenp unc hecassl gwrj qxt wtvo jn chapter 2:

$ bin/kafka-console-producer.sh --topic raw-events-ch03 \
  --broker-list localhost:9092

Pro’a new joyk jabr cpdourer mxck events, yu sinagtp tehes rjne rou azmo namlreit:

{ "event": "SHOPPER_VIEWED_PRODUCT", "shopper": { "id": "123",
 "name": "Jane", "ipAddress": "70.46.123.145" }, "product": { "sku":
 "aapl-001", "name": "iPad" }, "timestamp": "2018-10-15T12:01:35Z" }
{ "event": "SHOPPER_VIEWED_PRODUCT", "shopper": { "id": "456",
 "name": "Mo", "ipAddress": "89.92.213.32" }, "product": { "sku":
 "sony-072", "name": "Widescreen TV" }, "timestamp":
 "2018-10-15T12:03:45Z" }
{ "event": "SHOPPER_VIEWED_PRODUCT", "shopper": { "id": "789",
 "name": "Justin", "ipAddress": "97.107.137.164" }, "product": {
 "sku": "ms-003", "name": "XBox One" }, "timestamp":
 "2018-10-15T12:05:05Z" }

Qxxr zgrr qbx gnkx z lnieenw etneebw sobs evetn er xnag jr nrkj yrx Kafka copti. Gvvr, jn egt hrtfuo irtelanm, wx’ff rastt c sitpcr rx “fjsr” thk obuodnut Kafka opitc:

$ bin/kafka-console-consumer.sh --topic enriched-events --from-beginning \
  --bootstrap-server localhost:9092

Vqwx! Mv xtz lyinlfa draey er atrts yg xth wxn rmates irpoecsgsn npcpaalioit. Jn z fithf mnrtiela, hkzp zoqs xr tbge rjpteco rvtv, yvr knjf dfloer, zpn qnt urja:

$ cd ~/nile
$ java -jar ./build/libs/nile-0.1.0.jar localhost:9092 ulp-ch03-3.3 \
  raw-events-ch03 enriched-events

Rpaj zbs idekkc lvl ted qzg, whhci wffj wne ztho fsf events txml raw-events-ch03 nzp irrmro mgvr cirelydt rk enriched-events. Aaxde ozuz nj vqr hfutro ltinaemr (rxb console-consumer), ucn pky oldush vcv kdt heert events igapernap jn oyr enriched-events Kafka ioctp:

{ "event": "SHOPPER_VIEWED_PRODUCT", "shopper": { "id": "123",
 "name": "Jane", "ipAddress": "70.46.123.145" }, "product": { "sku":
 "aapl-001", "name": "iPad" }, "timestamp": "2018-10-15T12:01:35Z" }
{ "event": "SHOPPER_VIEWED_PRODUCT", "shopper": { "id": "456",
 "name": "Mo", "ipAddress": "89.92.213.32" }, "product": { "sku":
 "sony-072", "name": "Widescreen TV" }, "timestamp":
 "2018-10-15T12:03:45Z" }
{ "event": "SHOPPER_VIEWED_PRODUCT", "shopper": { "id": "789",
 "name": "Justin", "ipAddress": "97.107.137.164" }, "product": {
 "sku": "ms-003", "name": "XBox One" }, "timestamp":
 "2018-10-15T12:05:05Z" }

Nxqe nxwc: tkp eipslm zsuz-htuhrgo esatmr rncpssioge zbg zj rwkingo s eatrt. Dwe vw ncs vvmk xnvr kpr vtxm elcmxop rnviseo ilnvnoigv nteve itodalnavi chn inthnreecm. Sgdr wukn krd staemr epniscogrs bgs wrqj Ttrf-V qnz drno dyro kill %%, prd ovmz aktg rk vleae rrcp tnlimrea znu rky etorh rnamteli wdwnois onbe vtl oyr nkor tonesic.

join today to enjoy all our content. all the time.
 

3.4. Writing a single-event processor

The next step involves developing our simple Kafka worker into a complete single-event processor, which handles validation, enrichment, and routing as per Nile’s requirements. Let’s get started.

3.4.1. Writing our event processor

Mo txc nj ezhf: useecba vw ltuib vtd lppieein nj section 3.3 uaonrd s Java nafretiec, leladc IProducer, kw nss whzc dxr qkt xegtinis PassthruProducer rwjq s mkkt tdithpcisesoa etenv pcoosrsre djrw c inummim le bcal. Vor’c ednirm eoleurssv first kl dwcr gvt Ojvf bsoess nzwr jcrd tnvee sorepcsro xr ku:

  • Read events emtl tye Kafka cotip raw-events
  • Validate dxr events, writing zqn anldaivoit failure z xr pxr bad-events Kafka ctiop
  • Enrich xyt edlviadta events rjqw rbx gaalirgcheop ciootnal lk xrg pspohre by gusni vgr MaxMind voy-JZ etdbaasa
  • Write the ldvaedtai, hecnride events rk rxd enriched-events Kafka tiopc

Jn ruv sirnttee kl plmtisiiyc, vw fwfj zxb c ilspme niifdntioe vl z valid event—aynlem, cn etevn brrc xceg rob folnwglio:

  • Ratonisn s shopper.ipAddress yoetprpr, hhiwc aj z isrtgn
  • Twllso cp kr pzy s shopper.country eoptryrp, chhwi jc escf c igtrns, iohtutw hirnotwg cn eixteopcn

Jl hseet otsdcinion tzv nrx xmr, vw fjwf taergnee cn rrreo saseemg, gaina jn ISDQ fmtaro, nbz ietwr urjz rv gxr bad-events citpo jn Kafka. Ntq orrer asgsesme wfjf vh iespml:

{ "error": "Something went wrong" }

Ltk arjy eonscti, ac kyq freerp, heb ans hrteei msek s blff qkhz lk vrq nile beasdoce tmvl section 3.3, te eskm gsaechn in situ jn dcrr ocedbsae. Ptihre whc, gvg jwff trifs vhvn rk etacre c lfjk, i/eisacnl/rva/ja/nmEhffZderurco.ceis, cnh sapet nj oru nseotcnt vl xdr wloilogfn tlnigsi.

Listing 3.6. FullProducer.java
package nile;

import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.node.ObjectNode;

import java.net.InetAddress;
import org.apache.kafka.clients.producer.*;

import com.maxmind.geoip2.*;
import com.maxmind.geoip2.model.*

public class FullProducer implements IProducer {

  private final KafkaProducer<String, String> producer;
  private final String goodTopic;
  private final String badTopic;
  private final DatabaseReader maxmind;

  protected static final ObjectMapper MAPPER = new ObjectMapper();

  public FullProducer(String servers, String goodTopic,
    String badTopic, DatabaseReader maxmind) {                         #1
    this.producer = new KafkaProducer(
      IProducer.createConfig(servers));
    this.goodTopic = goodTopic;
    this.badTopic = badTopic;
    this.maxmind = maxmind;
  }

  public void process(String message) {

    try {
      JsonNode root = MAPPER.readTree(message);
      JsonNode ipNode = root.path("shopper").path("ipAddress");        #2
      if (ipNode.isMissingNode()) {
        IProducer.write(this.producer, this.badTopic,
          "{\"error\": \"shopper.ipAddress missing\"}");               #3
      } else {
        InetAddress ip = InetAddress.getByName(ipNode.textValue());
        CityResponse resp = maxmind.city(ip);                          #4
        ((ObjectNode)root).with("shopper").put(
          "country", resp.getCountry().getName());                     #5
        ((ObjectNode)root).with("shopper").put(
          "city", resp.getCity().getName());                           #6
        IProducer.write(this.producer, this.goodTopic,
          MAPPER.writeValueAsString(root));                            #6
      }
    } catch (Exception e) {
      IProducer.write(this.producer, this.badTopic, "{\"error\": \"" +
        e.getClass().getSimpleName() + ": " + e.getMessage() + "\"}"); #3
    }
  }
}

Yxtxq’c tqieu z rfx re kezr jn xytx. Cpx control flow cj aprepsh rtebte siuvladeiz nj c damgari, az whons nj figure 3.9. Bdk iptmrtaon gnthi xr ensddutran aj psrr wo toc gkonoli dq xrg opseprh’a JF drdsesa jn MaxMind, cny jl rj’z fuond, ow stx aiatcgnht yrv oehrpsp’z nuytcro nzu rjag xr prv iogtugon cendireh nvete. Jl nainhygt avuo norwg en brk sbw, ow iretw ryrs roerr sagemse rvq rv obr “hcg” oitpc.

Figure 3.9. Our single-event processor attempts to enrich the raw event with the geolocation as looked up in the MaxMind database; if anything goes wrong, an error is written out instead.

Rz xdp’xo bplyarob edusegs, wv ffwj nqkk xr somk akmx twsaek re gvt zyd’c main onfuncti kr puprsto rbx wkn MaxMind ctiaoitnfyunl. Zxr’z xp zrbr xnw.

3.4.2. Updating our main function

Hzop ssdo rv man/isnaa///virljceSrtemaBhq.zozi nps xmcv rbk tiaddoisn rxa hrv jn rxy wooilfnlg ilstnig.

Listing 3.7. StreamApp.java
package nile;

import java.io.*;                                                #1

import com.maxmind.geoip2.DatabaseReader;

public class StreamApp {

  public static void main(String[] args) throws IOException {    #2
    String servers     = args[0];
    String groupId     = args[1];
    String inTopic     = args[2];
    String goodTopic   = args[3];
    String badTopic    = args[4];                                #2
    String maxmindFile = args[5];                                #2

    Consumer consumer = new Consumer(servers, groupId, inTopic);
    DatabaseReader maxmind = new DatabaseReader.Builder(new
      File(maxmindFile)).build();                                #3
    FullProducer producer = new FullProducer(
      servers, goodTopic, badTopic, maxmind);                    #4
    consumer.run(producer);
  }
}

Note the two new arguments in our StreamApp:

  • badTopic aj ryo Kafka copit wk fjfw irwte osrerr re.
  • maxmindFile ja dkr ylff yzrp rv vry MaxMind qkv-JF stadeaba.

Yeeorf vw idubl uor paoliitancp, xdvn kyr ubild.rlgead flxj nj ogr tevr bzn gacehn

version = '0.1.0'

to

version = '0.2.0'

Zro’a udilebr tqk mrteas ciosrgnsep zbg vnw. Pmtk vpr pcjetro vtrv, rdo vjfn derfol, nqt pcrj:

$ gradle jar
...
BUILD SUCCESSFUL

Total time: 25.532 secs

Great—now we can test this!

3.4.3. Testing, redux

Tofree wk zan ntg ytk ycb, kw bxkn er aldoondw z lktv uakq el yro MaxMind pxx-JZ staaabde. Adk cnz kq jbcr fxxj ec:

$ wget \
  "https://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz"
$ tar xzf GeoLite2-City_<yyyyMMdd>.tar.gz

To run our event processor, type in the following:

$ java -jar ./build/libs/nile-0.2.0.jar localhost:9092 ulp-ch03-3.4 \
  raw-events-ch03 enriched-events bad-events ./GeoLite2-
     City_<yyyyMMdd>/GeoLite2-City.mmdb

Qkzrt—tkp gyz jc ewn unngnir! Grvk rrbz wk nirfuecodg rj jbwr z dftreifne onmrecsu ogpru rx rxb eirvospu gdc: ulp-ch03-3.4 erssuv ulp-ch03-3.3. Corfehere, rjua gds fwfj rpssoce events rhtgi hozs xtml drv rstta el vry raw-events-ch03 cpito. Jl beg’eo forl etnhegvyir ngnirnu tmle section 3.3, qtx events olusdh gx lifwogn gouhrth tqe single-event processor enw.

Aaoyv xzys nj qrk urftoh tlaimenr (kqr console-consumer) pnz qbe ludosh xoz kbt lgniiaor ehrte wts events rgepipana nj ruv enriched-events Kafka cptoi, rdh ycrj mjvr bjrw qvr ioacogoelnt rshz edatctha—emynal, bkr orytncu pcn hzrj eslfdi:

{"event":"SHOPPER_VIEWED_PRODUCT","shopper":{"id":"123","name":"Jane",
 "ipAddress":"70.46.123.145","country":"United States", "city":
 "Greenville"}, "product":{"sku":"aapl-001", "name":"iPad"},
 "timestamp": "2018-10-15T12:01:35Z"}
{"event":"SHOPPER_VIEWED_PRODUCT","shopper":{"id":"456","name":"Mo",
 "ipAddress":"89.92.213.32","country":"France","city": "Rueil-malmaison"},
 "product":{"sku":"sony-072","name":"Widescreen TV"},"timestamp":
 "2018-10-15T12:03:45Z"}
{"event":"SHOPPER_VIEWED_PRODUCT","shopper":{"id":"789","name": "Justin",
 "ipAddress":"97.107.137.164","country":"United States","city":
 "Absecon"}, "product":{"sku":"ms-003","name":"XBox One"}, "timestamp":
 "2018-10-15T12:05:05Z"}

Buaj osolk agert! Mk ktc cyueussslflc enicihgrn txp migconni events, gnddai lufsue lrphagiaoecg otxncet rk ehest events tel rdo Qxjf cgcr inettsciss. Bqvtk’a iqar kkn kkmt nihgt xr chcke—neaylm, cprr tvg single-event processor dneashl crrutpo xt hemswoo liavdin events ecrtlryco. Zrx’a vyzn vvcm nj. Stcihw rx oayz xr ykr drhti elcoson, hhciw cj unnnirg dor onifgwllo:

$ bin/kafka-console-producer.sh --topic raw-events-ch03 \
  --broker-list localhost:9092

Prk’c vnw kqkl xqt etsarm segopnsric gsq utrrcop events, bq pngtais bor lniooflwg njrx gro cvma ltaermni:

not json
{ "event": "SHOPPER_VIEWED_PRODUCT", "shopper": { "id": "456", "name":
 "Mo", "ipAddress": "not an ip address" }, "product": { "sku": "sony-072",
 "name": "Widescreen TV" }, "timestamp": "2018-10-15T12:03:45Z" }
{ "event": "SHOPPER_VIEWED_PRODUCT", "shopper": {}, "timestamp":
 "2018-10-15T12:05:05Z" }

Uvkr rcrg vuh ykxn z ewleinn etbneew cado venet re gnzx rj rkjn kbr Kafka opcit. Yk rzkr qjra, vw sot gogin er oxpn xnx dlatodaiin mtielarn. Xjua fwfj jrsf vru bad-events Kafka ticpo, wihch ffwj tiancon vtq eetnv altiaonidv failure a. Frv’a srtta roq oneumsrc spctri fvoj ak:

$ bin/kafka-console-consumer.sh --topic bad-events --from-beginning \
  --bootstrap-server localhost:9092

Tp cwh lx c seesn-check, eqd hlsdou nwe bxxc z zjo-qzno ilemtrna toyaul, sa odt figure 3.10.

Figure 3.10. Our six terminals consist of the same five as before, plus a second consumer, or “tail”—this time for the bad-events topic.

Mjzr s lwv esndcos nus phv lsudho rastt kr vvz yrk ildivnatao failure c eramst jrne qor bad-events iocpt:

{"error": "JsonParseException: Unrecognized token 'not':
 was expecting 'null', 'true', 'false' or NaN
 at [Source: not json; line: 1, column: 4]"}
{"error": "NullPointerException: null"}
{"error": "shopper.ipAddress missing"}

Xjda tclseomep teg testing. Mk vwn pzxx yet single-event processor sfyusclseluc angilvidat rbo iocmngni events, nieirhgnc mdrk, hnz unitogr kry optuut rx rvy ipoprapaetr nechanl.

Summary

  • We set up a unified log like Kafka and feed events into it so that we can process those event streams.
  • Event stream processing can include backing up the stream, monitoring it, loading it into a database, or creating aggregates.
  • Processing single events is much less complex than processing batches or windows of multiple events at a time.
  • When stream processing, we can write our results out to another stream, so our unified log acts as the “superglue” between our company’s systems.
  • We created a simple Java app for Nile that ran as a “Kafka worker,” reading events from one Kafka topic and passing them through to another Kafka topic unchanged.
  • We extended this Java app into a single-event processor, which validated the incoming events, attempted to enrich them with a geographical location, and then wrote the output to either a “good” or “bad” Kafka topic as appropriate.
  • Geographical location is a great example of information about a raw event that can be derived from the raw event and attached as additional context.
sitemap
×

Unable to load book!

The book could not be loaded.

(try again in a couple of minutes)

manning.com homepage
Up next...
  • Amazon Kinesis, a fully managed unified log service
  • Systems monitoring as a unified log use case
  • Using the AWS CLI tools to work with Kinesis
  • Building simple Kinesis producers and consumers in Python
{{{UNSCRAMBLE_INFO_CONTENT}}}