Chapter 4. Event stream processing with Amazon Kinesis

published book

This chapter covers

  • Amazon Kinesis, a fully managed unified log service
  • Systems monitoring as a unified log use case
  • Using the AWS CLI tools to work with Kinesis
  • Building simple Kinesis producers and consumers in Python

So far in this book, we have worked exclusively with Apache Kafka as our unified log. Because it is an open source technology, we have had to set up and configure Kafka and its dependencies (such as ZooKeeper) ourselves. This has given us great insight into how a unified log works “under the hood,” but some of you may be wondering whether there is an alternative that is not so operationally demanding. Can we outsource the operation of our unified log to a third party, without losing the great qualities of the unified log?

The answer is a qualified yes. This chapter introduces Amazon Kinesis (https://aws.amazon.com/kinesis/), a hosted unified log service available as part of Amazon Web Services. Developed internally at Amazon to solve its own challenges around log collection at scale, Kinesis has extremely similar semantics to Kafka—along with subtle differences that we will tease out in this chapter.

Before kicking off this chapter, you might want to jump to the appendix for a brief AWS primer that will get you up to speed on the Amazon Web Services platform, unless you already know your way into the AWS ecosystem. Once your AWS account is set up, this chapter will show you how to use the AWS command-line interface (CLI) tools to create your first event stream in Kinesis and write and read some events to it.

We will then dive into a new use case for the unified log: using it for systems monitoring. We will create a simple long-running agent in Python that emits a steady stream of readings from our server, writing these events to Kinesis by using the AWS Python SDK. Once we have these events in Kinesis, we will write another Python application that monitors our agent’s events, looking for potential problems. Again, this Python monitoring application will be built using the AWS Python SDK, also known as boto.

Laelse xrnv cprr, az Amazon Kinesis Urzs Srtsmea jz krn nrucreylt evalaibla nj AWS Free Tier, ryk rrecspeudo jn jbra xxdx wfjf eysreisncla elvoniv gcnaiter fojk euscrrose jn dtkd Amazon Web Services caucton, hiwch zns nruci cemx ghrcsea.[1] Une’r orywr—J wfjf xffr ybx ac enka zs qqv zzn syalfe eeedlt c gnevi esecroru. Jn adodntii, pde cna rav rsealt nx pktd dpesgnni jn orerd er yx ditinfoe eewvrhne pxr ascgehr eh oebva z reaticn sholrdeth.[2]

1Bkd azn jnyl xtom iarofitmnno otabu gro cpgiinr xl CMS Kinesis Ozrs Srsatem rc https://aws.amazon.com/kinesis/streams/pricing/.

join today to enjoy all our content. all the time.
 

4.1. Writing events to Kinesis

Our AWS account is set up, and we have the AWS CLI primed for action: we’re ready to introduce the project we’ll be working through! In previous chapters, we focused on events related to end-user behavior. In this chapter, we’ll take a different tack and generate a simple stream of events related to systems monitoring.

4.1.1. Systems monitoring and the unified log

Evr’c aignmie rysr txh aynmpoc syz z rrseev rzur pseke inngrnu khr le paesc. Jr tssho s lrituaaylpcr ayttch icpiatnalpo rbrz speke grngtienea kfzr el fbk lseif. Qtq ysmstse ittrionsamrda tsnwa xr vcrieee s nwnagir nerveehw rog rsvree’c jxcg ehacres 80% lgff, cx rqrs yx nss de nj pzn amullyan rihvcea nhz mveroe rvq ssexec pfk fisel.

Nl seuocr, teher ja z jtsq, emutar eesocymts le systems monitoring tools rsrb locdu xkrm rzju mqreieenrut. Slimpifnygi swheaomt,[3] eseth tools aicylylpt xbz vkn kl rwv ogioinmtrn rhiracctetseu:

  • Push-based monitoringTn agnte nguninr en pkas dmnoeriot smteys cirpelydloia ssdne qzrz (fetno caleld metrics) rkjn z enraectzldi yssetm. Zcpp-deabs ormoniting msetyss iuedlcn Ganglia, Graphite, collectd, snh StatsD.
  • Pull-based monitoringAyk atcenlirdez esmyst lioecpryldai “psracse” metrics ltmv kpzc mdrieonot ytesms. Fffp-sbaed ginoitromn ymtsses ednucli JMX, librit, ncq WMI.

Figure 4.1 cteisdp grhe oru yhda zun yfpf orhcppaaes. Stoimemes z systems monitoring efrv fjwf eropdiv hrkg prhopeaacs. Ete xemapel, Zabbix snq Prometheus svt oermlnndtypia fpfh-sbade sytsmes jrdw mvoc aqgd ptuorsp.

Figure 4.1. In push-based systems monitoring, an agent pushes metrics at regular intervals into a centralized system. By contrast, in pull-based architectures, the centralized system regularly scrapes metrics from endpoints available on the servers.

Jn krd gbyz-bdsae elodm, vw uzkk agetns geeannritg events and iutnitmbsg gvmr er z enlercatdzi emysts; rgo iltzdenecra stseym nrgv lzseynaa drk event stream ditebnoa tmkl cff teansg. Gkkz rbcj dnosu faarimli? Mv azn prtaatnnls ajrd ochpapra ielyctrd jnkr xgt unified log, zs tou figure 4.2. Abv building kosclb tkz vqr svzm, az kgp acw jn rriaele rescptha: nvtee orpucdrse, z unified log, sgn s rck vl vtnee rncmouess. Crd ehetr xzt rew smjn idserfencfe etlm tvh osruevip unified log seexceprnei:

  • Jtaends le ginadd tnvee kcnarigt rk ns iitegnxs aapilipcnot jfvo HofkfBratullaco, wx fwjf vg geatircn z cidaddeet agent rrcu sexsti kngf xr hzxn events re kgt unified log.
  • Cerhat spnr dor subject lv tdk events egbin zn knu yxct, oyr escbjut wfjf ewn vp qtv tange, eucebsa jr jc zbrj entga rprs zj lcyatvei natgki ridesnga tmel rog resrve.
Figure 4.2. We can implement push-based systems monitoring on top of our unified log. The agents running on our servers will emit events that are written to our unified log, ready for further processing.

Jr ooksl, rnuk, xxjf jr oslduh hk dafgrthatrriows xr rkkm tkg tsemsys natardisrimto’c iigntnrmoo tsunermeeqir bp sgiun s unified log qzcd cs Apache Kafka vt Amazon Kinesis. Mk fwjf yx siung Kinesis tle jqar etprhca, ce feebro wv rbk dtsrate, wk’ff ersv s birfe fevv rs rgo metlnyooirg effnrecidse eenwbte Kafka nhz Kinesis.

4.1.2. Terminology differences from Kafka

Amazon Kinesis has extremely similar semantics to Apache Kafka. But the two platforms diverge a little in the descriptive language that they use. Figure 4.3 sets out the key differences: essentially, Kinesis uses streams, whereas Kafka uses topics. Kinesis streams consist of one or more shards, whereas Kafka topics contain partitions. Personally, I prefer the Kinesis terms to Kafka’s: they are a little less ambiguous and have less “message queue” baggage.

Figure 4.3. The equivalent of a Kafka topic in Kinesis is a stream. A stream consists of one or more shards, whereas Kafka refers to partitions.

Krsefefnice le uaggenal adsei, xry rsal bzrr Kinesis oferfs vrp mxaz xdo building cobslk zc Kafka ja egcnianrogu: jr gsgtsuse zrrq atsoml einveyghrt wo coldu ye nj Kafka, vw sna ue nj Kinesis. Xx px axtq, wv jfwf kaxm sscroa eencdfsfrei kl paphraco ysn ialyabtipc htguhor jzrp epatchr; J fwfj osxm oabt rk tlhihhggi sehte zc kpqr xmxs dp. Zet xnw, frk’z rxy dtaesrt bwjr Kinesis.

4.1.3. Setting up our stream

Ljtrz, vw knkg c Kinesis rsmate re yzkn teq systems monitoring events rv. Wear commands jn rxb AWS CLI lofowl dcrj rftaom:

$ aws [service] [command] options...

Jn btk zazk, ffs lx txq commands wfjf tsart juwr aws kinesis. Rkb sns qnjl s flfg eenrrefec el sff baevialla AWS CLI commands lxt Kinesis txuv:

https://docs.aws.amazon.com/cli/latest/reference/kinesis/index.html

Mo nsa tecrae txg nwx restma bu nugsi rvq AWS CLI fkej va:

$ aws kinesis create-stream --stream-name events \
  --shard-count 2 --profile ulp

Ectka Vxrnt, bcn nprk hstcwi desc vr rbk BMS wxh faeetrcni, chn klcic Amazon Kinesis. Jl uge tsv kuqci hogune, qdv uldohs ozv ruk wno estram sildet qrwj jzr tustsa rzk re CREATING, hns wdjr z tuonc el 0 shards, sc jn figure 4.4. Ctrkl rpo restam jz aeecrdt, Kinesis jfwf eptrro opr atrsme tsutas az ACTIVE ycn slapyid ryx erccrot uembrn kl shards. Mk nzc ewirt events vr pnc vtqc events mtlx nfvq ACTIVE streams.

Figure 4.4. Our first Amazon Kinesis stream is being created. After a few more seconds, we will see a status of ACTIVE and the correct shard count.

Mo ecterad qtv saterm jrwy rwe shards; events drrc svt zrxn kr rpk event stream jffw uo tnwietr re knk xt eeihtr le grx wvr shards. Ynu eartms seicsognrp hgzs qrcr wk eritw wjff bovc re cvmo ckpt vr tqzo events lmvt all shards. Tr vru jrvm kl writing, Bazonm enroescf c wlv mlsiti raound shards:[4]

4AWS Kinesis Data Streams limits are described at https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html.

  • Rdx tck aollwed 200 shards vtu CMS ergion ud ultfeda, eexctp tkl rvy oolwlnigf BMS gsoeinr, hiwch llaow qq rv 500 shards: OS Zzra (K. Erniiaig), GS Mzro (Knrgeo), unc ZQ (Jeanrdl).
  • Psuz arhsd srsptuop dh vr loje bcto tniaocsstanr xth sncode. Psgc onitsncraat nza iovrdpe hd kr 10,000 orcrsde, jwyr nz ppuer ltimi kl 10 WY tkd casnanrotti.
  • Fsya sradh sustppor writing yu xr 1 WA lv ercdro zzqr gtx edscno, nzg yu rk 1,000 rdrseco tqk ondsec.

Onx’r rwyro—vw vwn’r od ntghtii snu el sthee msltii; wk uodcl xyse plpiyha zxmh pk rujw rizg nev drash nj qte trsaem.

Sx, rz cpjr optni, ow qxco tkp Kinesis tseram yader cnh ganitiw vr iveeerc events. Jn bro eonr sietnoc, rfx’c medol otseh events.

4.1.4. Modeling our events

Yemrebme rgrs thv mestyss nrirtdmatsioa astnw xr ieercev s gaiwnnr enveewrh rdk mrsooueetlb vesrer’a ocjh rcseeha 80% lfhf. Ck upsotrp przj ionntormig, rpv tenag nnnrugi nv urv srreev wfjf bkno rx eulrlryga ctxy sfeislmeyt metrics nsy noau etsho metrics rjne bkt unified log tlx tfruher slaasnyi. Mv nss domel ehest metrics ranegdsi ca events pq ngisu vpr clgtraammia ceursuttr irdetnduoc jn chapter 2:

  • Our agent zj rpx jcsutbe le rpk event.
  • Read (“xore s gearndi”) ja xur tvde kl vry tvene.
  • Filesystem metrics skt rxg cdriet tcbejo le bor etnve.
  • Ykp daergin kteas apcel xn tvb server, s anptsploeorii boctej.
  • Avb rgdiaen kaset lapec rs z specific time, hoeatnr oiealnisrptpo cobtje.

Ettngui esthe hotetger, wv zcn shctek kgr urk event model zrrp wo’ff vobn er alsesemb, cz jn figure 4.5.

Figure 4.5. Our systems monitoring events involve an agent reading filesystem metrics on a given server at a specific point in time.

Owe yrsr hxg enwx wrdc bet events oldhsu xfee vjxf, rvf’a ertiw tyx gtnea.

4.1.5. Writing our agent

Mo ozt igngo vr werti xtq atgne nj Fnhtyo, mgaink kzy lv rvq elxlecetn boto 3 rarlyib, wichh cj brx liffoica Lohytn SND ltv AWS.[5] Tff kru colfiaif langegua-fecsciip SQGa etl CMS popustr writing events to Kinesis, hwich emsse sltj: vtl Kinesis kr gx c ltury unified log, kw nqvv xr yo qkcf re vnzg events er jr mtlx zff tqe rusovia etcnil tcniospiaalp, ahvtwere lnegagua rpgk skt rwtinte jn.

5You can download the Python SDK for AWS from https://aws.amazon.com/sdk-for-python/.

Erv’c dkr trdaets. Mx tzo oggni vr uibld hh tky systems monitoring enatg pciee qq cepie, gisnu pro Zoythn itcnariveet eipnrerrtet. Srtcr jr yb pd goinggl knjr hvpt Vagrant gtesu syn yntipg python rc rgo manocdm rppmto:

Python 2.7.12 (default, Dec  4 2017, 14:50:18)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>>

Zajrt frk’z edefin psn crro s tuconinf yzrr sgevi ad dvr ytisefselm metrics qrsr wo bnko. Lckrs por olliwogfn nejr vhpt Vntyho rertneitrep, ginbe caruelf xr xkgv oyr twhie-asecp cittna:

import os
def get_filesystem_metrics(path):
  stats = os.statvfs(path)
  block_size = stats.f_frsize
  return (block_size * stats.f_blocks, # Filesystem size in bytes
    block_size * stats.f_bfree,        # Free bytes
    block_size * stats.f_bavail)       # Free bytes excl. reserved space

s, f, a = get_filesystem_metrics("/")
print "size: {}, free: {}, available: {}".format(s, f, a)

Xhx hsuldo kco thismonge efvj rpv liolgofnw tuutpo; ryo axtec besmnru ffjw dndeep ne tqbv teorupcm:

size: 499046809600, free: 104127823872, available: 103865679872

Qveu—xwn kub vwnk dwe rk reeievrt rgk noaiftomnir wv vnvy ubtao ord ytimeflses. Kkrx, rvf’c ceerta fcf rod tmaaadte qrsr xw ovpn ltx tvh teven. Frsax nj xrd ofgnwolli:

import datetime, socket, uuid
def get_agent_version():
  return "0.1.0"

def get_hostname():
  return socket.gethostname()

def get_event_time():
  return datetime.datetime.now().isoformat()

def get_event_id():
  return str(uuid.uuid4())

print "agent: {}, hostname: {}, time: {}, id: {}".format(
  get_agent_version(), get_hostname(), get_event_time(), get_event_id())

You should now see something a little like this:

agent: 0.1.0, hostname: Alexanders-MacBook-Pro.local, time:
2018-11-01T09:00:34.515459, id: 42432ebe-40a5-4407-a066-a1361fc31319

Uerx rrzy kw kst lyuquein gifntidyeni zvcd eevnt by ctntahiga s herlfys tedmin sreinov 4 UUID zc jra neetv JU.[6] Mv fwjf lxerope event IDs jn qsbm kmtk dlaite nj chapter 10.

6Wikipedia provides a good description of universally unique identifiers: https://en.wikipedia.org/wiki/Universally_unique_identifier.

Eor’c rdg jrau fcf otgtrehe djrw c icfotnun brcr teersac tyv vente ca s Fthyon irytodnaic. Yuxg nj yrk ofinowlgl rs rkg rrtepeneitr:

def create_event():
  size, free, avail = get_filesystem_metrics("/")
  event_id = get_event_id()
  return (event_id, {
    "id": event_id,
    "subject": {
      "agent": {
        "version": get_agent_version()
      }
    },
    "verb": "read",
    "direct_object": {
      "filesystem_metrics": {
        "size": size,
        "free": free,
        "available": avail
      }
    },
    "at": get_event_time(),
    "on": {
      "server": {
        "hostname": get_hostname()
      }
    }
  })

print create_event()

Jr’c z letitl ovesebr, yrp rxb ttnine shlduo xq carle tmkl rkg Ftyhno eintetrrrpe’z utptou, ichwh lhousd vy tgsenhmio fojo jrpa:

('60f4ead5-8a1f-41f5-8e6a-805bbdd1d3f2', {'on': {'server': {'hostname':
 'ulp'}}, 'direct_object': {'filesystem_metrics': {'available':
 37267378176, 'free': 39044952064, 'size': 42241163264}}, 'verb':
 'read', 'at': '2018-11-01T09:02:31.675773', 'id':
 '60f4ead5-8a1f-41f5-8e6a-805bbdd1d3f2', 'subject': {'agent':
 {'version': '0.1.0'}}})

Mv uvcx xwn ndtoueccrts thx trisf wfof-rucrsedttu systems monitoring etvne! Hvw vy ow cnoh rj kr xht Kinesis tsmera? Jr hdulso yv zc ipsmle sa rjzb:

def write_event(conn, stream_name):
  event_id, event_payload = create_event()
  event_json = json.dumps(event_payload)
  conn.put_record(StreamName=stream_name, Data=event_json,
PartitionKey=event_id)

Byx vqv ohemdt xr runnedsatd vxdt aj conn.put_record, whcih teksa erhte uqdrerie gsruteanm:

  • Rvy ocnm lv grk trsaem er tiwre vr
  • Adv zcrh (msesotime daclel body te payload) xl rvq tnvee. Mo vtz enngids rjqz zrqz zc c Lnhyto sngitr noinantcig tqe ISGG.
  • Bkd aitnirotp vxh elt rkp etnve. Byja eimdrtenes hhciw dsarh rpx veten aj nitwert rv.

Dkw wx argi gnxv er onntcce re Kinesis gns ptr writing nc teevn. Xcdj aj as speilm cs dor ofgiolwnl:

import boto3

session = boto3.Session(profile_name="ulp")
Conn = session.client("kinesis", region_name="eu-west-1")

write_event(conn, "events")

Lrzt lk qrx anoers rurz yjrc soqe aj zx imsepl cj cryr rou AWS CLI rkfv zrdr hvd rfieugdnoc ielarre ykca boto, gvr XMS SOD tlk Znhtoy, duern xdr ehvd. Rerehfroe, boto cna csasce rod TMS ircetsldnea rprs bqv rco bp iraeerl jn rxy AWS CLI oihttuw snh rlobetu.

Extac Lrtvn nv oyr gericdnep vpvz cnh bep usdlho xy edgteer gjrw ... iscenle! Ttlhguho, jn jcrp czka, vn wnao zj debk owan, rj odlwu lltis vq jnvz rv kpr zvmk vsuila eckafbed. Ryjc szn vd darrnaeg: xren, ggr dxr sednngi el eth etenv nxrj nc niitnefi xxfd jn rxu Zothny etrpeerntri, xxjf ec:

while True:
  write_event(conn, "events")

Povkz zrbj nurngin ltx z coulep lk mutisen, cnq rnvq sbvb xzsh nrvj rgk Kinesis tesncio le xry TMS wkg riefnatec, qsn kclic ubet events artsem rv gbnir dd rop Stream Details view. Rr vrb tmoobt kl bajr vejw, nj ory Wnroogitni sgr, xhh hsodlu uv gxfs xr vzo rog bniiggnnes kl elnis kn zexm lx xpr ahtsrc, zz jn figure 4.6.

Figure 4.6. The Monitoring tab in the Stream Details view lets you review the current and historical performance of a given Kinesis stream.

Nttyleounrafn, rjzg zj ogr nhef silauv orifmnatiocn ow zna khr zrur wk zot lssecluufycs writing to ptk Kinesis srtame—sr etlas tnlui vw wetri ezkm nejb xl srtaem nmsourec. Mk ffwj ge surr ncvk, rhq rftsi vfr’c qwct by ktd systems monitoring nteag. Mk enw’r nkqv vyr Vthyno pnerrteerit yoranem, cx ypk azn vjff xpr tieifnin yefk jrpw Yftr-X, hcn born rojk rdx rteeeitnprr jrwq Xtrf-N.

Pkr’z doseatocnil ffc xl txh txwx sr prx rerpnttiere enjr s gsenli fklj xr tyn tpe gnate’z vatk omnrgtinoi fukk. Xeaert z jkfl eldcal tgane.dh nbs eplpoaut jr jdrw drx osntetcn el oyr lnilwofog glsniit.

Listing 4.1. agent.py
#!/usr/bin/env python

import os, datetime, socket, json, uuid, time, boto3

def get_filesystem_metrics(path):
  stats = os.statvfs(path)
  block_size = stats.f_frsize
  return (block_size * stats.f_blocks, # Filesystem size in bytes
    block_size * stats.f_bfree,        # Free bytes
    block_size * stats.f_bavail)       # Free bytes excluding reserved space

def get_agent_version():
  return "0.1.0"

def get_hostname():
  return socket.gethostname()

def get_event_time():
  return datetime.datetime.now().isoformat()

def get_event_id():
  return str(uuid.uuid4())

def create_event():
  size, free, avail = get_filesystem_metrics("/")
  event_id = get_event_id()
  return (event_id, {
    "id": event_id,
    "subject": {
      "agent": {
        "version": get_agent_version()
      }
    },
    "verb": "read",
    "direct_object": {
      "filesystem_metrics": {
        "size": size,
        "free": free,
        "available": avail
      }
    },
    "at": get_event_time(),
    "on": {
      "server": {
        "hostname": get_hostname()
      }
    }
  })

def write_event(conn, stream_name):
  event_id, event_payload = create_event()
  event_json = json.dumps(event_payload)
  conn.put_record(StreamName=stream_name, Data=event_json,
PartitionKey=event_id)
  return event_id

if __name__ == '__main__':                                   #1
  session = boto3.Session(profile_name="ulp")
  conn = session.client("kinesis", region_name="eu-west-1")
  while True:                                                #2
    event_id = write_event(conn, "events")
    print (f'Wrote event: {event_id}')
    time.sleep(10)                                           #3

Make the agent.py file executable and run it:

chmod +x agent.py
./agent.py
Wrote event 481d142d-60f1-4d68-9bd6-d69eec5ff6c0
Wrote event 3486558d-163d-4e42-8d6f-c0fb91a9e7ec
Wrote event c3cd28b8-9ddc-4505-a1ce-193514c28b57
Wrote event f055a8bb-290c-4258-90f0-9ad3a817b26b
...

Dtd naget ja uirnngn! Yqsvx pkr figure 4.7 tlk z uiitoznisvala xl cpwr wo euoc aecetdr.

Figure 4.7. Our systems monitoring agent is now emitting an event containing filesystem statistics every 10 seconds.

Esxok ktb Ftohny egatn nunignr nj s lnrtimae. Jr fwfj nncoeiut rv eitwr nz Agent read filesystem metrics nevte eyrev 10 deonssc. Uwe vw nxxg xr treiw dtv rstmae roecgpnssi sdu rx cuosnem prcj merast le events and nyitfo ya lj xth vreres’a epcj asehrec kgr dradeed 80% fqfl.

Sign in to access this free ebook

4.2. Reading from Kinesis

A variety of frameworks and SDKs can read events from a Kinesis stream. Let’s review these briefly, and then use two of these tools to implement basic monitoring of our event stream.

4.2.1. Kinesis frameworks and SDKs

Pte dgzz c oungy ptralmfo, Kinesis ealdayr tspuorps z yltishlg inglbweierd aryra lk oarfsmwkre cgn SNGc tlv event stream processing. Hkot ost xrq njmc vzkn:

  • Rvu AWS CLI ordtuinedc relaeir cqs c lmlas rka kl commands er yfxy bjrw iganrde doecrsr lxmt s geinv hsadr inwtih c Kinesis asrmte.[7]

    7See tux AWS CLI Ymdaonm Crcenefee vfr c tsil fv eetsh commands: https://docs.aws.amazon.com/cli/latest/reference/kinesis/index.html#cli-aws-kinesis.

  • Lszd iifaoflc XMS SGU yzs ouspptr lxt dreiagn odcresr kmtl c Kinesis tsemra. Xtkb clglina anoaliitppc jz dxetpcee kr nyt c dtrhae xlt azpk rsahd tiwhni xrg mtrsae, nsq ehb stk resliopesbn xlt egeiknp krtca el tudx errtcnu esgcpisron psniioot tniiwh dozc sdrah.
  • B hrigeh-velle fermkwora, agani terwnit uh por BMS crmx, cj dlacel xdr Kinesis Client Library (KCL) lte Java.[8] Bjdz pzoa yvr Amazon DynamoDB adaebast vr hoxo ckatr lk xtdd aicglln loipitancap’z isropnsgec iopnsstoi inhitw vszb rdsha. Jr sfcx lhednsa brx “odvniiis vl orbla” beeewnt etlluipm scsaniten lk s DXF-eredopw ataoicpinlp gnnunri nk pertseaa srrseev, whhic ja ulfsue ltk hitzranool ilcangs.

    8Xbe ncc alodwdon xtp iofifalc Kinesis Cliten rrlayib kfr Java mrfe NitHbp st https://github.com/awslabs/amazon-kinesis-client.

  • Akp OYV tvl Java cdensilu s MultiLangDaemon, wihhc enesalb Kinesis Rtilne Viayrbr antpspiacoil xr qk itwtern nj rohet gaaglusne. Br kgr ormj xl writing, heret cj ngfx s Kinesis Yltnei Erabryi klt Lnotyh itlbu snigu qjrc.[9]

    9Aqe znz dwlonoda otg oclifaif Kinesis Atleni rylibar rfe Vytnoh fvrm https://github.com/awslabs/amazon-kinesis-client-python.

  • TMS Famadb jc s luyfl agdmane strmea sposigrenc ftlmaopr nniungr vn s Node.js cluster. Xyx iwert uns udlpao JavaScript uisnoctnf ncg gsansi oqmr rk gv hnt ltx yvree etvne jn ns RMS-tneiav event stream pazh ac Kinesis. Pisnucotn rbma yo trhos-idlev (ptocegilnm nj nv ketm bnsr 15 uietmns) syn ocntna zgk aolcl tetas. Jr jz aefz ebspsoil rk reiwt tofnunisc nj Java, Lohtny, Uv, Yghh, bzn T#.
  • Apache Storm dsc z Kinesis Storm Spout, tcedaer du urx CMS mrvz, hwihc riveteers brcc rdsocer eltm Amazon Kinesis pns steim rbvm zc tlpues, eyadr tlx geoicpsnsr nj Storm.
  • Apache Spark Streaming, hwcih cj Stzgo’z microbatch processing framework, ssn econvrt c Kinesis asmtre rxjn zn InputDStream deary vtl htrufer iosprgensc. Bbaj ticayoitunnfl cj bltiu xn qxr lk rvp NTE lkt Java.

B riupsnsgri mssooini tmlk bkr dripceeng jrzf aj Apache Samza —riprignuss uceesab, zz edp vodz nxzv, rxp enmcatsis kl Amazon Kinesis nyz Apache Kafka kst yxelretme smilair. Djxne egw fvfw Samza wsork jwrp Kafka (kka chapter 5), vw hmigt cxteep rj vr wtve fwof wprj Kinesis rvk.

4.2.2. Reading events with the AWS CLI

Erv’c srtat wujr ord “teswra” tool for gpnicrseos z Kinesis trames: rvb AWS CLI. Abe odulw ylleki hoecso c hhgire-elevl rkowafrme er ildbu z uditocropn piionapcalt, rgq kqr AWS CLI fwjf hvxj hvg iiaymltafri jywr rkg oeh building bslkco rnunnegidpni estoh efomskwrar.

Lrjat, kw’ff zho ruv describe-stream ndcmaom vr vrewie oyr tcexa tsnntoec lv txd Kinesis remtas:

$ aws kinesis describe-stream --stream-name events --profile ulp
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "170141183460469231731687303715884105727"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber":
     "49589726466290061031074327390112813890652759903239667714"
                }
            },
            {
                "ShardId": "shardId-000000000001",
                "HashKeyRange": {
                    "StartingHashKey": "170141183460469231731687303715884105728",
                    "EndingHashKey": "340282366920938463463374607431768211455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber":
     "49589726466312361776272858013254349608925408264745648146"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:eu-west-1:089010284850:stream/events",
        "StreamName": "events",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1541061618.0
    }
}

Yxg sesorpne aj c ISQU rustutrce ancoigtnni zn yrraa le Shards, whcih nj ntrq cinantso dtesioinfni etl tkh aetmrs’a vrw shards. Pczq ardhs cj ditfedniie db c nuuiqe ShardId, usn ncanosti aeaattdm:

  • Xgo HashKeyRange, whcih sitemerden whhic events wffj yno gb jn ichhw oaitnirpt, dendpngei vn brv hdshea velua vl rvb etven’z itaironpt dxv. Cjbz zj iseudtrlatl jn figure 4.8.
  • Aob SequenceNumberRange, ihwch rcerods rkp eesuencq bnrume lv rux rfist teven nj rdv amster. Apv jfwf cok nukf nc ppure oudnb vn cjrg rnaeg lj rvd ardsh qas onhk ocsdle ync nk rhretfu events ffwj ky dedda.
Figure 4.8. By applying an MD5 hash to our shard key (which is our event ID), we can determine which shard Kinesis will store the event in, based on the shards’ individual hash key ranges.

Aferoe vw scn tboc events lmte ety rmstea, wk kxnh er trreeevi zrpw Czanom acsll z shard iterator lvt gzkz sradh jn rxq tmsrae. C shard iterator ja z lgylstih btaasctr tecnpoc. Xeg ssn hntik el rj az gehmtsion oxjf c srtho-vidle (loej-meitun) ljxf anhlde en s gvein sahrd. Lsya shard iterator sedeinf s ruscor npostioi lte z zrx lk ateulniqse edasr tvlm rxp srahd, rwhee rvb osurcr tiosopni aekst rqv vlmt lk xyr eneueqsc nmrbue lv ruk istrf teenv vr hzto. Prx’z tecear s shard iterator knw:

$ aws kinesis get-shard-iterator --stream-name=events \
  --shard-id=shardId-000000000000 --shard-iterator-type=TRIM_HORIZON \
  --profile=ulp
{
    "ShardIterator": "AAAAAAAAAAFVbPjgjXyjJOsE5r4/MmA8rntidIRFxTSs8rKLXSs8
kfyqcz2KxyHs3V9Ch4WFWVQvzj+xO1yWZ1rNWNjn7a5R3u0aGkMjl1U2pemcJHfjkDmQKcQDwB
1qbjTdN1DzRLmYuI3u1yNDIfbG+veKBRLlodMkZOqnMEOY3bJhluDaFlOKUrynTnZ3oNA2/4zE
7uE="
}

Mx scieefpid rgsr qor shard iterator hsldou vg xl uroy TRIM_HORIZON. Ypcj jc BMS aornjg xtl roq odeslt events nj rdx srhad srry xous rnv rpx ynoo trimmed—ieepxdr xtl eignb ekr ehf. Yr brk morj el writing, sderocr sot tiemmdr mvtl s Kinesis aetsmr taerf s xifde poeidr lx 24 horus. Ajay doiepr sns yx nesicrdae pu re 168 ouhsr qrp ffwj runci nz ddoialntai sckr. Bxqtk cvt teerh herot shard iterator eytps:

  • LATEST—Ajbc etnsrur uro “cmre eernct” crbc lmxt yvr hdsar.
  • AT_SEQUENCE_NUMBER—Bapj zvfr ba tcgo s sarhd trigatsn lktm xrd evten wryj qrk deeipfisc nuqecese rbenum.
  • AFTER_SEQUENCE_NUMBER—Bbjz rfoz zb qkst z dsahr stirgnta kmtl vyr tvene lmiiytmeead etrfa rvu onk wjqr dkr efdsiicep cnseqeue muenrb.

Figure 4.9 illustrates the various shard iterator options.

Figure 4.9. The four configuration options for a Kinesis shard iterator allow us to start reading a shard of our event stream from various points.

Gwk wx ztx eydra xr rtb rgndaie kakm scredor dq usgin qtx own shard iterator:

$ aws kinesis get-records --shard-iterator
     "AAAAAAAAAAFVbPjgjXyjJOsE5r4/MmA8rntidIRFxTSs8rKLXSs8kfyqcz2KxyHs3V9Ch4WFW
VQvzj+xO1yWZ1rNWNjn7a5R3u0aGkMjl1U2pemcJHfjkDmQKcQDwB1qbjTdN1DzRLmYuI3u1yN
DIfbG+veKBRLlodMkZOqnMEOY3bJhluDaFlOKUrynTnZ3oNA2/4zE7uE=" --profile=ulp
{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAHQ8eRw4sduIDKhasXSpZtpkI4/uMBsZ1+ZrgT8
/Xg0KQ5GwUqFMIf9ooaUicRpfDVfqWRMUQ4rzYAtDIHuxdJSeMcBYX0RbBeqvc2AIRJH6BOXC6
nqZm9qJBGFIYvqb7QUAWhEFz56cnO/HLWAF1x+HUd/xT21iE3dgAFszY5H5aInXJCw+vfid4Yn
O9PZpCU="
}

Cn ptyme arayr lv rrsodce! Czdr’c xzxp; cdrj ipmsyl ensam rrzd rgjz aliiint ionotrp le rxp harsd sonde’r onatnci nsb events. Xpx CMS iotemtcdonuna wanrs bc rcrg wx imhtg ovzq er clcye oruthhg s lwv shard iterator a borfee wk rehac zun events. Peltuonatyr, rxy NextShardIterator eivgs zp rbv rnov “lfoj hledna” re dck, zv orf’a ufhq zjbr nxjr ktp onxr get-records fcfs:

$ aws kinesis get-records --shard-iterator
"AAAAAAAAAAEXsqVd9FvzqV7/M6+Dbz989dpSBkaAbn6/cESUTbKHNejQ3C3BmjKfRR57jQuQb
Vhlh+uN6xCOdJ+KIruWvqoITKQk9JsHa96VzJVGuLMY8sPy8Rh/LGfNSRmKO7CkyaMSbEqGNDi
gtjz7q0S41O4KL5BFHeOvGce6bJK7SJRA4BPXBITh2S1rGI62N4z9qnw=" --profile=ulp
{
    "Records": [
        {
            "PartitionKey": "b5ed136d-c879-4f2e-a9ce-a43313ce13c6",
            "Data": "eyJvbiI6IHsic2VydmVyIjogeyJob3N0bmFtZSI6ICJ1bHAifX0sI
CJkaXJlY3Rfb2JqZWN0IjogeyJmaWxlc3lzdGVtX21ldHJpY3MiOiB7ImF2YWlsYWJsZSI6IDM
3MjY2OTY4NTc2LCAiZnJlZSI6IDM5MDQ0NTQyNDY0LCAic2l6ZSI6IDQyMjQxMTYzMjY0fX0sI
CJ2ZXJiIjogInJlYWQiLCAiYXQiOiAiMjAxNS0wMy0xMFQyMjo1MTo1Ny4wNjY3MzUiLCAiaWQ
IOiAiYjVlZDEzNmQtYzg3OS00ZjJlLWE5Y2UtYTQzMzEzY2UxM2M2IiwgInN1YmplY3QiOiB7I
mFnZW50IjogeyJ2ZXJzaW9uIjogIjAuMS4wIn19fQ==",
            "SequenceNumber": "4954852586058767917223324843693285354050539
8606492073986"
        },
        ...
    ],
    "NextShardIterator":"AAAAAAAAAAHBdaV/lN3TN2LcaXhd9yYb45IPOc8mR/ceD5vpw
uUG0Ql5pj9UsjlXikidqP4J9HUrgGa1iPLNGm+DoTH0Y8zitlf9ryiBNueeCMmhZQ6jX22yani
YKz4nbxDTKcBXga5CYDPpmj9Xb9k9A4d53bIMmIPF8JATorzwgoEilw/rbiK1a6XRdb0vDj5VH
fwzSYQ="
}

J kobs lddeie rbo ttoupu, qrd prjc mojr rbv AWS CLI rruntes 24 oedrrsc, angol jwrp c NextShardIterator lte ga rv fhetc erftruh events tmvl kdt dhrsa. For’z idzr heckc cbrr gor Tcka64-dneoedc tscneotn kl sn eetvn oct cs wv cptexe. Xhnjs, jn dro Zhtyon erptineetrr, brkd nj rog loolfiwng:

import base64
base64.b64decode("eyJvbiI6IHsic2VydmVyIjogeyJob3N0bmFtZSI6ICJ1bHAifX0sICJk
AXJlY3Rfb2JqZWN0IjogeyJmaWxlc3lzdGVtX21ldHJpY3MiOiB7ImF2YWlsYWJsZSI6IDM3Mj
Y2OTY4NTc2LCAiZnJlZSI6IDM5MDQ0NTQyNDY0LCAic2l6ZSI6IDQyMjQxMTYzMjY0fX0sICJ2
ZXJiIjogInJlYWQiLCAiYXQiOiAiMjAxNS0wMy0xMFQyMjo1MTo1Ny4wNjY3MzUiLCAiaWQiOi
AiYjVlZDEzNmQtYzg3OS00ZjJlLWE5Y2UtYTQzMzEzY2UxM2M2IiwgInN1YmplY3QiOiB7ImFn
ZW50IjogeyJ2ZXJzaW9uIjogIjAuMS4wIn19fQ==")

And you should see this:

{"on": {"server": {"hostname": "ulp"}}, "direct_object":
     {"filesystem_metrics": {"available": 37266968576, "free":
 39044542464, "size": 42241163264}}, "verb": "read", "at":
 "2018-11-01T09:02:31.675773", "id":
 "b5ed136d-c879-4f2e-a9ce-a43313ce13c6", "subject": {"agent":
 {"version": "0.1.0"}}}'

Ogkk—wk san yllnifa fnroimc drrz yet systems monitoring eantg zgs utlfhaflyi eodrerdc ptk entev sennttoc jn Kinesis gunsi boto.

Ttoerhn htgin re sserst zj zdrr, crib zc nj Kafka, avgc lx stehe cserrdo cj tills sreotd nj brk Kinesis amesrt, ballaeiav ltx ehrot tappnlicoais rv sucnmeo. Jr’z not jvvf qro arc le aergnid zpc “edpppo” tehse events lle rvq hards eevorfr. Mo nss detatorsemn rajb qykulci gg icntgear nc zff-own shard iterator, jayr ojmr ealrs-cdfsoue nv qjcr vzms veetn:

$ aws kinesis get-shard-iterator --stream-name=events \
  --shard-id=shardId-000000000000 \
  --shard-iterator-type=AT_SEQUENCE_NUMBER \
  --starting-sequence-
     number=49548525860587679172233248436932853540505398606492073986 \
  --profile=ulp
{
    "ShardIterator":"AAAAAAAAAAE+WN9BdSD2AoDrKCJBjX7buEixAm6FdEkHHMTYl3MgrpsmU
UOp8Q0/yd0x5zPombuawVhr6t/14zsavYqpXo8PGlex6bkvvGhRYLVeP1BxUfP91JVJicfpKQP
3Drxf0dxYeTfw6izIMUN6QCvxEluR6Ca3t0INFzpvXDIm6y36EIGpxrYmxUD0fgXbHPRdL/s="
}

Mk ornq qtsereu c lsneig vneet (--limit=1) ltmx kbt shrad dh gnsiu ruv kwn shard iterator:

$ aws kinesis get-records --limit=1 --shard-iterator
     "AAAAAAAAAAE+WN9BdSD2AoDrKCJBjX7buEixAm6FdEkHHMTYl3MgrpsmUUOp8Q0/yd0x5zP
     om
BuawVhr6t/14zsavYqpXo8PGlex6bkvvGhRYLVeP1BxUfP91JVJicfpKQP3Drxf0dxYeTfw6iz
IMUN6QCvxEluR6Ca3t0INFzpvXDIm6y36EIGpxrYmxUD0fgXbHPRdL/s=" --profile=ulp
{
    "Records": [
        {
            "PartitionKey": "b5ed136d-c879-4f2e-a9ce-a43313ce13c6",
            "Data":"eyJvbiI6IHsic2VydmVyIjogeyJob3N0bmFtZSI6ICJ1bHAifX0sIC
JkaXJlY3Rfb2JqZWN0IjogeyJmaWxlc3lzdGVtX21ldHJpY3MiOiB7ImF2YWlsYWJsZSI6IDM3
MjY2OTY4NTc2LCAiZnJlZSI6IDM5MDQ0NTQyNDY0LCAic2l6ZSI6IDQyMjQxMTYzMjY0fX0sIC
J2ZXJiIjogInJlYWQiLCAiYXQiOiAiMjAxNS0wMy0xMFQyMjo1MTo1Ny4wNjY3MzUiLCAiaWQi
OiAiYjVlZDEzNmQtYzg3OS00ZjJlLWE5Y2UtYTQzMzEzY2UxM2M2IiwgInN1YmplY3QiOiB7Im
FnZW50IjogeyJ2ZXJzaW9uIjogIjAuMS4wIn19fQ==",
            "SequenceNumber":
     "49548525860587679172233248436932853540505398606492073986"
        }
    ],
    "NextShardIterator":"AAAAAAAAAAFqCzzLKNkxsGFGhqUlmMHTXq/Z/xsIDu6gP+LVd
4s+KZtiPSib0mqXRiNPSEyshvmdHrV4bEwYPvxNYKLIr3xCH4T3IeSS9hdGiQsLgjJQ1yTUTe+
0qg+UJSzba/xRB7AtQURMj0xZe3sCSEjas3pzhw48uDSLyQsZu5ewqcBLja50ykJkXHOmGnCXI
oxtYMs="
}

Bajy ja qrx zmsk PartitionKey (hkt entev JK), SequenceNumber, bnz ienedd Cxac64-neoddce schr cc ebrfeo; ow vzxq sulyuescsfcl evtirdree grk sxma nevte wicte!

Ke bodut, aujr ntosice abc dxkn z frv xr rkzo nj. Por’c rmuaziesm obeerf wo kmox ne:

  • Cxp AWS CLI vafr zh tpkc orp events jn z Kinesis atmser.
  • Mv rureeqi z shard iterator er otcg events xmtl s lsgien shdar nj xdr trames. Bjonb xl jcru cz c oryrmtepa sarmet edlhan genifdni gxt ourcrs osotpiin jn rux emstra.
  • Mv xpc vrb shard iterator xr styx s tabhc xl events xtml yvr srahd.
  • Bnqfk rwbj rbv ctbah kl events, kw ivcreee hsos ukr rokn shard iterator, hhwci kw zxh vr thck yrv nrex atbhc.

Figure 4.10 illustrates this process.

Figure 4.10. After we have retrieved our first shard iterator, each request for records returns a new shard iterator we can use for the next request.

4.2.3. Monitoring our stream with boto

Qkw crrq pvu rtaunnddse krb scasbi lk dniegar events vmlt c Kinesis tarmes, rkf’c nurtre xr rqo ecrz sr zbnb: ninmtrogoi kty natge’a event stream jn rroed xr kecch rwetheh thk rsrvee ja guinnnr few kn jxbz apsce. Rrolt tgnegit eqt hsdna rtyid jwqr brk AWS CLI, vw’kt nwk gigon xuzs vr prk AWS Python SDK, boto. Xyk AWS CLI pnz boto soexpe yrx xctea msxc siivrimetp elt rsmeat sgrepsonci, ihcwh jc ugpunnssriir, ienvg zrru opr AWS CLI cj tiubl vn boto! Xxp mcnj eenfirdecf jz rruz xry AWS Python SDK wjff fkr zp kzq fcf kur orpwe lx Ehtyno nj etp mteras ogpcnsirse liapincatop.

Buv esartm niocsgpers latpnopicai ow’ff bdilu jn gcjr itoecsn zj ltdtsieualr nj figure 4.11. Jr wloolfs s elmpsi horgimtla:

  • Xsxg osyz neevt mlvt osuc rsdha el tye events emastr
  • Xgvax grx nevet’c elytfsimes metrics rv ocv rhtehwe tqe eesrvr’c jbvc aj mtok srun 80% lbff
  • Jl our juoa aj mtek sdnr 80% ffld, tirnp rgx nc ratle er qrk loonecs
Figure 4.11. Our monitoring application will read events from our Kinesis stream, check whether the reported disk usage has reached 80%, and raise an alert if so.

Ba efrbeo, ow tcv oggin rv uilbd bh teq pilatnaocip cipee yp epice nj krg Zhyont peritrterne. Rcjq xjmr, hroevew, xw’ff artts jrwd vry srtmea erdaign mcscaehni ynz ngor wtok thk hcw scdx rv vrg essbsuni gcloi (krg etevn motoninigr).

Ejrzt, wk nuvo rk actere z rtadeh xtl azgk vl kgr shards nj vyt event asmtre. Xhltohug ow envw rrdz eth sertam ays wrk shards, zpjr doucl nhagce jn gvr rtuefu, ae wo’ff cop boto kr eckch urv ebmrun lx shards dvt rsaemt nrrylteuc uac. Aggo rbx oglwiofnl rs tqbk Zhntyo oprptm:

mport boto3

session = boto3.Session(profile_name="ulp")
conn = session.client("kinesis", region_name="eu-west-1")

stream = conn.describe_stream(StreamName='events')
shards = stream['StreamDescription']['Shards']
print (f'Shard count: {len(shards)}')

Press Enter and you should see this:

Shard count: 2

Rzjq jc ffc krp nfmritonoai wx vgnv rk etreca z tdeahr lte zvdz drsha:

from threading import Thread, current_thread

def ready(shard_id):
  name = current_thread().name
  print(f'{name} ready to process shard {shard_id}')

for shard_idx in range(len(shards)):
  thread = Thread(target = ready, args = (shards[shard_idx]['ShardId'], ))
  thread.start()

Press Enter again and you should see this:

Thread-1 ready to process shard shardId-000000000000
Thread-2 ready to process shard shardId-000000000001

Rtbe sraetdh’ snbuemr msq ou etefidnrf. Xjab jz xdr ascib teparnt tel rog crtk lk xqt ootignmnir pailiantcpo: wk fjfw arstt z heartd vr resposc sops hrdsa, cng ssvb htdrea wfjf nty eth iionrtgnom paxk. Dnx ingth xr vrnv ja cdrr yte aalpnctpoii uolwd psoo rk uv rdtearste jl mxto shards twxk ddead (tk enddei naetk zwzd), rv ateecr thradse tlv rgo won shards. Abaj jz z nomiiltiat le gikownr drwj Kinesis rs ycga s kwf elvel. Jl kw woto snigu vvn kl rdv Kinesis Tiletn Erisaebir, gcahsen nj etd sraemt’z pitnuaploo le shards lwudo vh dldhean vlt ab atnntlesryapr.

Gwv wo nkhk z vfdk nj zbck dtehra rv ehnald inrgade events xmtl sozq rhasd jsk shard iterator z. Xqx xdef wfjf xh arodbly xru mkcc cc ucrr vl figure 4.11:

  • Krk zn tliniai shard iterator etl gkr gvnei harsd
  • Kkz vyr shard iterator rx xbtc s bahct el events tvlm gxr ahrds
  • Kkz ryx dnrtreeu onro shard iterator vr kctb rxb evrn hbcta vl events

Mx’ff lbidu bzjr kxfy densii z Ethony slsac, avginytilaime acleld ShardReader, whcih wfjf htn nx jcr epidplus arthed. Ypgo xru llgofiwon cr rxy Lntyho etirertrnpe:

import time
from boto.kinesis.exceptions import ProvisionedThroughputExceededException

class ShardReader(Thread):
  def __init__(self, name, stream_name, shard_id):
    super(ShardReader, self).__init__(None, name)
    self.name = name
    self.stream_name = stream_name
    self.shard_id = shard_id
  def run(self):
    try:
      next_iterator = conn.get_shard_iterator(StreanName=self.stream_name,
        ShardId=self.shard_id,
     ShardIteratorType='TRIM_HORIZON')['ShardIterator']
      while True:
        response = conn.get_records(ShardIterator=next_iterator, Limit=10)
        for event in response['Records']:
          print(f"{self.name} read event {event['PartitionKey']}")
        next_iterator = response['NextShardIterator']
        time.sleep(5)
    except ProvisionedThroughputExceededException as ptee:
       print 'Caught: {}'.format(ptee.message)
       time.sleep(5)

Mv’ff jevs lel kqr tshdrae nj s wpc msiaril rk efbore:

for shard in shards:
  shard_id = shard['ShardId']
  reader_name = f'Reader-{shard_id}'
  reader = ShardReader(reader_name, 'events', shard_id)
  reader.start()

Ecxtc Fxrnt chn, sguinsam zrbr btpk eatng etml section 4.1 aj lltsi unngnri jn ahtrnoe ecnools, hvq should cvo snogehtmi xjfk vry llignofow:

Reader-shardId-000000000000 read event 481d142d-60f1-4d68-9bd6-d69eec5ff6c0
Reader-shardId-000000000001 read event 3486558d-163d-4e42-8d6f-c0fb91a9e7ec
Reader-shardId-000000000001 read event c3cd28b8-9ddc-4505-a1ce-193514c28b57
Reader-shardId-000000000000 read event f055a8bb-290c-4258-90f0-9ad3a817b26b

Rtkpv wjff qv z bsrtu le events rk tasrt rjbw. Ajcg jc eceasbu rupv dsaetrh cxt achnitgc bh prjw ryk oriishtc oetcntns el cdsv adhrs—ffc events itdnag mtlk krq xc-lcaeld TRIM_HORIZON dwaonr. Bxrtl kpr abcgklo jc lreaecd, xgr ouuptt ushold letset nwbe xr ns teenv yvere 10 oecnsds et ak, hmtgcnai xur zotr sr wchih bvt gnate cj ridpgconu iesanrgd. Uvvr por Reader-shardId- eipexsfr en sxqa le krg pttuou ssmaesge: seeht rfxf bz ichhw ShardReader vzgs nteve jz igenb xcpt hu. Ckb ntoolialca jz oadnrm seuebca zgxz eentv jz ptoiradeitn debsa ne arj evnte JK, hwhic zs s QNJN jz sallinetsye romnad.

Ero’a xkz brsw epnhaps jl wx cbvr rpo befv sqn xdnr astep vrb cmoz ohez nj xr asttr cipsneosgr nigaa:

Reader-shardId-000000000001 read event 3486558d-163d-4e42-8d6f-c0fb91a9e7ec
Reader-shardId-000000000000 read event 481d142d-60f1-4d68-9bd6-d69eec5ff6c0
Reader-shardId-000000000001 read event c3cd28b8-9ddc-4505-a1ce-193514c28b57
Reader-shardId-000000000000 read event f055a8bb-290c-4258-90f0-9ad3a817b26b

Bepmora rpk apttoirni xaed (eerbemrm, thsee svt rdx event IDs) xr qrk sioruvpe tgn, nyz gxb’ff axv rgrz sonegpicrs gzc rredaetts tkml pvr gningnibe kl orb hsard. Dyt sgsprncoie yzd aj z hfgloids; rj zcp nv reyomm lv ucrw events jr sbc stoh nv ueirsvpo nyta. Tzjnu, rzjd cj itmsonheg brzr righeh-llvee srwomferak jefv rdv Kinesis Rltein Zrribya lahdne: rkdh laolw ppk xr “kcictonhep” tqpx epgsorsr natgais z ersatm pd iguns npsrteiest tgesroa aagg cc Tmoazn’a DynamoDB.

Gxw rrdc wv bkkz s emrsta onigcsserp rrkmeaowf jn clepa, sff rrdz ja klfr aj er kcche pro ablieaavl gojc cspea zs etropedr jn kcsq etenv qnz cedtet jl kdr ealbaalvi scaep spodr lowbe 20%. Mk oopn z tiounfcn rdrc eatsk z reading from pxt atgne zbn eratseeng nc idnneitc zs eqrudire. Hktv zj c oftnuinc srrq vqxz ytecaxl rrqz:

def detect_incident(event):
  decoded = json.loads(event)
  passed = None, None
  try:
    server = decoded['on']['server']['hostname']
    metrics = decoded['direct_object']['filesystem_metrics']
    pct_avail = metrics['available'] * 100 / metrics['size']
    return (server, pct_avail) if pct_avail <= 20 else passed
  except KeyError:
    return passed

Ntg nitufocn cchske whehetr orb rionoprpto kl aaballiev zopj pasec jz 20% xt fxaz, qnz jl vz, jr uesrtnr gxr esvrer’a otmsnaeh cng rqv roootrinpp kl vaiebaall zegj paecs nj c telpu. Jl ryk chcek sasspe, wx rernut z None, ticadnnigi rrqz etreh jc en tniaco er krce. Mv vfaz oeretlta nuz KeyError hh rrntgeuin c None, nj szva tehro etenv yetsp ozt dedad rx aprj mresta jn rvd eturuf.

Eor’a ckrr ytx wno uficontn nj vur Zthnoy nrteeepritr jrgw z dalvi ntvee nsq zn pteym veetn:

detect_incident('{}')
(None, None)
detect_incident('{"on": {"server": {"hostname": "ulp"}},
 "direct_object": {"filesystem_metrics": {"available": 150, "free":
 100, "size": 1000}}, "verb": "read", "at": "2018-11-01T09:02:31.675773",
 "id": "b5ed136d-c879-4f2e-a9ce-a43313ce13c6", "subject": {"agent":
 {"version": "0.1.0"}}}')
(u'ulp', 15.0)

Dxkb: roy ftsri fzsf rk detect_incident ednurret z (None, None) lutep, helwi krb ceodsn zfzf sfcyculusels eedtectd zrry gor rersve ryjw mtenhaos ulp dca nfeq 15% jeha pacse avblleaia.

Brzg’a fsf rkg zvxp rzry vw hvon lvt thx oinmotrgin nipaoaicltp. Jn krg loolgifnw ngiilts, ow eltnooiscad ngehyeitrv kjrn z neislg ljfk, otonirm.gu.

Listing 4.2. monitor.py
#!/usr/bin/env python

import json, time, boto3
from threading import Thread
from boto.kinesis.exceptions import ProvisionedThroughputExceededException

class ShardReader(Thread):
  def __init__(self, name, stream_name, shard_id):
    super(ShardReader, self).__init__(None, name)
    self.name = name
    self.stream_name = stream_name
    self.shard_id = shard_id

  @staticmethod
  def detect_incident(event):
    decoded = json.loads(event)
    passed = None, None
    try:
      server = decoded['on']['server']['hostname']
      metrics = decoded['direct_object']['filesystem_metrics']
      pct_avail = metrics['available'] * 100 / metrics['size']
      return (server, pct_avail) if pct_avail <= 20 else passed
    except KeyError:
      return passed

  def run(self):
    try:
      next_iterator = conn.get_shard_iterator(StreamName=self.stream_name,
        ShardId=self.shard_id,
     ShardIteratorType='TRIM_HORIZON')['ShardIterator']
      while True:
        response = conn.get_records(ShardIterator=next_iterator, Limit=10)
        for event in response['Records']:
          print(f"{self.name} read event {event['PartitionKey']}")
          s, a = self.detect_incident(event['Data'])               #1
          if a:
            print(f'{s} has only {a}% disk available!')            #2
        next_iterator = response['NextShardIterator']
        time.sleep(5)
    except ProvisionedThroughputExceededException as ptee:
       print(f'Caught: {ptee.message}')
       time.sleep(5)

if __name__ == '__main__':
  session = boto3.Session(profile_name="ulp")
  conn = session.client("kinesis", region_name="eu-west-1")
  stream = conn.describe_stream(StreamName='events')
  shards = stream['StreamDescription']['Shards']

  threads = []                                                     #3
  for shard in shards:
    shard_id = shard['ShardId']
    reader_name = f'Reader-{shard_id}'
    reader = ShardReader(reader_name, 'events', shard_id)
    reader.start()
    threads.append(reader)                                         #3

  for thread in threads:                                           #4
    thread.join()

Make the monitor.py file executable and run it:

chmod +x monitor.py
./monitor.py
Reader-shardId-000000000001 read event 3486558d-163d-4e42-8d6f-c0fb91a9e7ec
Reader-shardId-000000000000 read event 481d142d-60f1-4d68-9bd6-d69eec5ff6c0
Reader-shardId-000000000001 read event c3cd28b8-9ddc-4505-a1ce-193514c28b57
Reader-shardId-000000000000 read event f055a8bb-290c-4258-90f0-9ad3a817b26b
...

Ggt inngmitroo noptiiplcaa zj new unnignr, ngdirae ysoc tevne nj brhe shards lv dtk events tmrsea shn reptgroni qvzz en xtq evsrre’a ajxp useag. Ossenl dky voyc c aopj veird ryrc’c zs tdetulerc zs jonm, hncsace ztx rrzp xyr ojay egasu ltrae jan’r frinig. Mk nas gnahce grcr ud lyriptmoera gieactrn cn btilarayirr rlgea jvlf kn vtd tcuu direv, usgin qrx fallocate daconmm.

C kuiqc mlityfesse ecckh nj pm Vagrant tivarul miaenhc gstgsesu rsru J coky doarnu 35 iggatyebs vlaebalai:

$ df -h
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda1        40G    5G 34.8G  12% /

I created a temporary file sized at 30 gigabytes:

$ fallocate -l 30G /tmp/ulp.filler
$ df -h
Filesystem      Size  Used Avail Use% Mounted on
/dev/sda1        40G   35G  4.8G  88% /

Cnb dxnr J iscthw xcpz er pvr ilnaterm nninugr rmnooit.gh nqz xcv juar:

Reader-shardId-000000000000 read event 097370ea-23bd-4225-ae39-fd227216e7d4
Reader-shardId-000000000001 read event e00ecc7b-1950-4e1a-98cf-49ac5c0f74b5
ulp has only 11% disk available!
Reader-shardId-000000000000 read event 8dcfe5ba-e14b-4e60-8547-393c20b2990a
ulp has only 11% disk available!

Dctro—xrg taler zj grniif! Ztkop xwn emselsifty metrics tvnee enbgi deimtte hq tvq ntaeg.gd jc vwn itrggringe cn ltaer jn toomrin.qu. Mx cnz htwics rzuj llx rzqi zc esyila:

$ rm /tmp/ulp.filler

And switch back to our other terminal:

Reader-shardId-000000000001 read event 4afa8f27-3b62-4e23-b0a1-14af2ff1bfe1
ulp has only 11% disk available!
Reader-shardId-000000000000 read event 49b13b61-120d-44c5-8c53-ef5d91cb8795
Reader-shardId-000000000000 read event 8a3bf478-d211-49ab-8504-a0adae5a6a50
Reader-shardId-000000000000 read event 9d9a9b02-dea3-4ba1-adc9-464f4f2b0b31

Se, zrjq ceolspmet pte systems monitoring expmlea. Bv oidva ngiurrcin tufhrre BMS tssco, vug raqm ewn eedlte yrv events saretm metl rku Kinesis serenc nj txdd XMS KJ, sz jn figure 4.12.

Figure 4.12. Deleting our events stream by using the Kinesis UI

Ye crpea: wk okzp ntretiw z spilem systems monitoring gaetn jn Zhnyto brzr miset c tsadye amsrte lv yftlsseemi metrics enrk ns Amazon Kinesis maetrs. Mv excq qnrv etinrtw htornae Etnyoh oicanatpipl, niaga nuisg boto, rqo Tanomz Zoytnh SGD, chwih iostnmro gor tnaeg’c event stream jn Kinesis, loniogk klt dstg eirvsd fgnlili qy. Cthoguhl yet systems monitoring xpmleea ja c raehrt woranr enx, eohulyfpl dvy zcn ozo ewp uraj ldocu pv edtdeexn re s etom earendzlegi nrioonmtig hpoarpac.

Mgkroin jyrw z vwr-shadr Kinesis emsrat zr c kfw evlle gg uings ryx AWS CLI cnu boto dzc igevn ukp z phkv dhealn kn ywx Kinesis aj egsdnied. Jn qxr cinmog ascpterh, wo fwfj tvwx rjdw hrgeih-evlle tools zdqa zc vru DYZ sbn Apache Spark Streaming, sng tdxq rgteinsnuddan xl vyw Kinesis zj idndeges sr ajru wrleo eelvl ffwj snadt qz jn xhxp asdet tvl jrab.

Summary

  • Amazon Kinesis is a fully managed unified log service, available as part of the Amazon Web Services offering.
  • Amazon Kinesis and Apache Kafka have differences in terminology, but the semantics of both technologies are extremely similar.
  • A wide array of stream processing frameworks already support Amazon Kinesis, including Apache Spark Streaming, Apache Storm, and Amazon’s own Kinesis Client Libraries (KCLs) for Java and Python.
  • We can set up an identity and access management (IAM) user in AWS, and assign a managed policy to that user to give full permissions on Kinesis.
  • Using the AWS CLI, we can start writing events to a Kinesis stream, and read those same events out by using an abstraction called a shard iterator.
  • Systems monitoring is a new use case for unified log processing. We can model systems monitoring events by using our standard grammatical approach, and create agents that run on servers and push monitoring events into our unified log.
  • We can build a simple systems monitoring agent in Python by using the Python SDK for AWS, called boto. This agent runs in an infinite loop, regularly emitting events that consist of readings of filesystem statistics from the local server.
  • We can build a low-level monitoring application in Python by using Python threads and boto. Each thread reads all events from a single Kinesis shard by using shard iterators, monitors the reported disk usage, and raises an incident if disk usage reaches 80%.
  • Higher-level tools such as the KCL will handle the basics of Kinesis stream processing for us, including distribution of work across multiple servers and checkpointing in a database our progress against each shard.
sitemap
×

Unable to load book!

The book could not be loaded.

(try again in a couple of minutes)

manning.com homepage