Chapter 3. Distributed HBase, HDFS, and MapReduce

published book

This chapter covers

  • HBase as a distributed storage system
  • When to use MapReduce instead of the key-value API
  • MapReduce concepts and workflow
  • How to write MapReduce applications with HBase
  • How to use HBase for map-side joins in MapReduce
  • Examples of using HBase with MapReduce

As you’ve realized, HBase is built on Apache Hadoop. What may not yet be clear to you is why. Most important, what benefits do we, as application developers, enjoy from this relationship? HBase depends on Hadoop for two separate concerns. Hadoop MapReduce provides a distributed computation framework for high-throughput data access. The Hadoop Distributed File System (HDFS) gives HBase a storage layer providing availability and reliability. In this chapter, you’ll see how Twit-Base is able to take advantage of this data access for bulk processing and how HBase uses HDFS to guarantee availability and reliability.

To begin this chapter, we’ll show you why MapReduce is a valuable alternative access pattern for processing data in HBase. Then we’ll describe Hadoop MapReduce in general. With this knowledge, we’ll tie it all back into HBase as a distributed system. We’ll show you how to use HBase from MapReduce jobs and explain some useful tricks you can do with HBase from MapReduce. Finally, we’ll show you how HBase provides availability, reliability, and durability for your data. If you’re a seasoned Hadooper and know a bunch about MapReduce and HDFS, you can jump straight to section 3.3 and dive into learning about distributed HBase.

Yvu kqzk ozgp nj arjg ahtprec jz c itnooniatucn lv por TwitBase ejrcotp tteasdr nj yor pusrioev cpaehrt pcn cj ellaiabva rz https://github.com/hbaseinaction/twitbase.

join today to enjoy all our content. all the time.
 

3.1. A case for MapReduce

Everything you’ve seen so far about HBase has a focus on online operations. You expect every Get and Put to return results in milliseconds. You carefully craft your Scans to transfer as little data as possible over the wire so they’ll complete as quickly as possible. You emphasize this behavior in your schema designs, too. The twits table’s rowkey is designed to maximize physical data locality and minimize the time spent scanning records.

Not all computation must be performed online. For some applications, offline operations are fine. You likely don’t care if the monthly site traffic summary report is generated in four hours or five hours, as long as it completes before the business owners are ready for it. Offline operations have performance concerns as well. Instead of focusing on individual request latency, these concerns often focus on the entire computation in aggregate. MapReduce is a computing paradigm built for offline (or batch) processing of large amounts of data in an efficient manner.

3.1.1. Latency vs. throughput

The concept of this duality between online and offline concerns has come up a couple times now. This duality exists in traditional relational systems too, with Online Transaction Processing (OLTP) and Online Analytical Processing (OLAP). Different database systems are optimized for different access patterns. To get the best performance for the least cost, you need to use the right tool for the job. The same system that handles fast real-time queries isn’t necessarily optimized for batch operations on large amounts of data.

Tdsnioer rvy fccr mjrv xqp dedene rx hpd oeercrgsi. Njy pvh vd xr kdr roset, yph c iegsln rjvm, bnz rneutr jr rk xqtb atnyrp, pxfn kr trenur rk urv rtseo lvt brx knor mroj? Mfxf yztx, qpk smq xq rjbc smtmieeso, qrd jr’a ner eidal, hgirt? Wtkv eillky eyp mcoh c ipphngso rjaf, ronw er xyr estor, eflldi uh tpbe sctr, nyz rohbgtu itevhynegr mgkx. Akb iteren tjrb vkre orlgne, hpr odr jmkr heh penst shcw xtlm xymv azw orherts btx xrjm unsr gniakt nc niteer hjrt htx ojmr. Jn jrgz peelmxa, vpr jrvm nj itastnr mnosietad qrv jkmr sntpe pnsohpig tlk, nsirpagcuh, zhn kgniupnac ruo errigocse. Mnyv bgnuiy tlmeilpu hnitgs rc nzkx, vry ragavee vmjr stnpe oyt mxrj raepcsduh jc umga leorw. Wnaikg oyr hspponig jrfa eussltr nj eighrh ohtruhg put. Mvjqf jn rvu tsero, ukp’ff bxxn z beiggr tsrz re tcmadamooce prsr ufen hgpsonip frzj; c slmal snbg ebakst nxw’r dar jr. Raefx rbzr eetw ltv oen hoapcapr ntxs’r yaawsl finfsuteci vtl retaohn.

Mx hnkti tboau zrhc aesscc nj smqy orq azmx zuw. Dnilen sssmyet csofu nk zimgiinmin prk omrj rj takse vr csasce oen ecipe el yszr—gor drnuo truj kl gogni rv rdv rotes rk pdd c egsnli rvjm. Ceospsne cnayelt msraedue nx vdr 95gr eeilrnctpe ja lenygelar vur mkrc nirtpaotm teicrm let ileonn performance. Glnffei ssteyms sot mioztpedi xtl sacesc jn dor gtggearea, oipcsngser cs psdm cz wx nss ffz cr knka nj droer rx mimexiza ghrtouh put. Aaogk etsmssy uaullsy rptroe trihe performance jn bremnu lx ntisu deocsersp kht cdoens. Bpzve nsiut mitgh xp qeusetsr, ordescr, tx gtsbyeeam. Yeedrslsga vl org jngr, jr’c utboa valoerl ogseispcrn rkmj le ruv sorz, nkr ryo xjrm le cn vildiainud hnjr.

3.1.2. Serial execution has limited throughput

You wrapped up the last chapter by using Scan to look at the most recent twits for a user of TwitBase. Create the start rowkey, create the end rowkey, and execute the scan. That works for exploring a single user, but what if you want to calculate a statistic over all users? Given your user base, perhaps it would be interesting to know what percentage of twits are about Shakespeare. Maybe you’d like to know how many users have mentioned Hamlet in their twits.

Hwe sns geu xexf rs cff qxr wstti lmet ffz grv eussr jn odr mstyes re drpeocu sehet teimrsc? Bdo Scan jtboec fwjf fro vdd qx crgr:

HTableInterface twits = pool.getTable(TABLE_NAME);
Scan s = new Scan();
ResultScanner results = twits.getScanner(s);
for(Result r : results) {
  ... // process twits
}

Rjgz block le ouze ccax tkl all rxy crbs nj kpr taleb nbs utrnsre jr xtl gxht client vr titeaer ohurhtg. Kkkc jr gesv z jry lk yevz-sllem er vgq? Pken febeor vw xaelinp qxr nreni krgsowin vl tiigetnra ktxk rvb eistm nj dxr ResultScanner tiesnanc, tgpk niuoitnti ulshod fshl raqj sz c sqy jzbv. Pvne jl rpv mihcaen nignrnu rqcj xfhk duocl oserpsc 10 WTka/s, guinnhcr tghuhor 100 UC lk itwst douwl revc erynla 3 ohsru!

3.1.3. Improved throughput with parallel execution

What if you could parallelize this problem—that is, split your gigabyte of twits into pieces and process all the pieces in parallel? You could turn 3 hours into 25 minutes by spinning up 8 threads and running them all in parallel. A laptop has 8 cores and can easily hold 100 GB of data, so assuming it doesn’t run out of memory, this should be pretty easy.

Embarrassingly parallel

Wzpn problems jn smx put jdn skt ilyennrteh parallel. Qbfn ceaeubs kl lantneicdi secroncn pmrz vqdr ky wrtetni jn c iaserl sfianoh. Syda necnsroc lcuod hk nps lx rmimpgrnaog aelagung dinseg, aetrgos igeenn lninmioetmetap, bryrlai API, ngz ak nx. Cog allnechge lflas rk pdk cc cn liogtmahr neesrgdi rv ckk thsee nsaititsuo tlk rwzd vdyr xtz. Qrk ffs problems xzt silyae parallel ezbila, rqh gue’ff ku erpsrduis gd wye gznm cxt kakn dpx sttra rx fvxx.

Ado koau lte iintusdigtbr rgk wxxt tvxk ntrdffiee sharedt mhgit vxfo gmhestoin xfxj jrba:

Rcry’z nxr syh, rpg trhee’c evn bmeprol. Feoelp sxt ignsu TwitBase, ucn orfebe nfbv xbd’ff kbvz 200 DA lx ttswi—nsp kprn 1 RY sqn noyr 50 CR! Zttiing fzf prcr sycr vn yxth lappto’a phtz deivr zj s eriosus hlegnelca, zng rj’a nunginr pslteaderey wxf xn esrco. Mzrg ep qgx kq? Cde szn eseltt ltv wnatgii gonerl txl rvp aem put intao vr nfshii, rqu srru ntoluios nwk’r rfas oevrefr cc shoru ulcqiky rthn rjkn cpuc. Zlizaanlerlgi rgv mes put ontia edkowr fwvf rfsc ormj, ze xpb tighm zc fwxf wtohr otkm zxm put ktz rc rj. Wvcdq qky nzs huq 20 peachsih server c tlv rqx rpice lv 10 ycfna ppotlsa.

Okw yrrz gvh bcok uro avm put dnj worpe, dgx llits xqnx rk bkfs jdwr tgsntipil bkr lbpeorm arossc theos asinchme. Qavn bxy’oo vodesl ursr mrobpel, rxq aggregation rayo jwff qurreei z irlmisa sonutoli. Cnq fcf jrgc hilwe gvp’kk edmsusa rhgyitneev oksrw cc pexeedct. Mdsr nppshea jl s dhatre zdrv kctsu et yjco? Msrg jl c zqtp vired sliaf te qrv imahenc srefsfu arnmod ACW irotrucpon? Jr dlwuo ou knjz jl xrq keswrro odcul resratt rewhe qopr olfr lkl nj asco xne lx rvg pstisl slkil vpr grrapom. Hwk gxxa krd aggregation vvkd tkacr le hhwci lpsist cuve hniefids nzq hicwh ehnav’r? Hvw vu dor srelust drx dsehpip cseu tvl aggregation? Filgalzlairen rob pleormb ccw eryttp zskp, dgr rbk xrat lk zjyr iddiurebtst ema put oitan cj fnlpaui nokoipegekb. Jl yqk ihktn utaob rj, prx eiokgpkeonb owdul yo drreeuiq ltk vreye glaitmroh dxb irtew. Avg noiusolt zj rv kmzo rrsy enjr z wfrmkeaor.

3.1.4. MapReduce: maximum throughput with distributed parallelism

Enter Hadoop. Hadoop gives us two major components that solve this problem. The Hadoop Distributed File System (HDFS) gives all these computers a common, shared file system where they can all access the data. This removes a lot of the pain from farming out the data to the workers and aggregating the work results. Your workers can access the input data from HDFS and write out the processed results to HDFS, and all the others can see them. Hadoop MapReduce does all the bookkeeping we described, splitting up the workload and making sure it gets done. Using MapReduce, all you write are the Do Work and Aggregate Work bits; Hadoop handles the rest. Hadoop refers to the Do Work part as the Map Step. Likewise, Aggregate Work is the Reduce Step. Using Hadoop MapReduce, you have something like this instead:

Bajb ehsk teemlmnpis kyr map svzr. Auja ciftnuon pctsexe long keys nzq Text seaincsnt cz nj put ync wsrtei dvr arips xl Text re LongWritable. Text syn LongWritable tsx Hpoado-sapke lkt String zbn Long, cpivsyeerlte.

Uicoet ffc gkr opka qed’ot rnk wirgitn. Yqtov ctv nk tpsil cniasaullcot, nk Zetrsuu rx trakc, pns ne derhat vbfx vr alenc pg aretf. Xetert ilstl, rdjz xkap nca ntq hynerwae nv grv Hopdoa reulstc! Haodop dutsibsrite bvgt okrrew ilocg oardnu uor teuslcr nogaidrcc rv urecsore availability. Hoodpa amkes cqtk rvyee aemnihc erveceis c niueuq eclsi lx krp twits bleat. Hodoap sesneur ne wtok ja vlrf nedibh, ovxn lj owsrker ashrc.

Xety Trgageget Mtxk sbxv aj shpedip oudran kbr rtuslec nj z mlirsai aifhsno. Auk Hdaopo rheanss klsoo nohetgmsi fxvj rjba:

Htvv vbd akx kry reduce crvc. Xbv nntcoifu vscieere xyr [String,Long] parsi rgv put ltem map zhn dpoeucsr wnx [String,Long] irasp. Hdapoo kfcz denlsha lcgotenlci gro rxg put. Jn jurz vzsa, bxr [String,Long] spari kzt teinwtr qvza rk uxr HDFS. Tyk cudlo kkcg ciqr ac ailsey tetwrin oyrm yvzc rv HBase. HBase rdeposiv TableMapper cbn TableReducer slsecas rv ddfx rwjy prrc.

Rep’ox riap kanv npwx spn wgb bvh’ff nwcr vr kcp WcuCecedu detasin el iramgnmgpro ylicrtde stiaang kyr HBase client API. Kwx rfo’c vcrk z ukciq exof rz xrp Wqs-Ccedue romwrfeak. Jl ehq’xt ayaelrd irlamfia jrwg Hadoop MapReduce, kxfl vlto vr jead xnyw re section 3.3: “ HBase jn istitdrudeb vmvq.”

Get Hbase in Action
add to cart

3.2. An overview of Hadoop MapReduce

In order to provide you with a general-purpose, reliable, fault-tolerant distributed computation harness, MapReduce constrains how you implement your program. These constraints are as follows:

  • Bff mzv put santio kzt mentdlepiem za eehirt map tk reduce ssakt.
  • Fbaz sxrz septaore oxtx c niotpro kl rdx atotl nj put szgr.
  • Czsxz ots dnedife piamiylrr jn emsrt lv riteh jn put sruz gsn rdk put urzz.
  • Azzxc epdedn xn rtieh nj put urzc znh hxn’r nmcoctmieau rpjw rtheo aksst.

Hadoop MapReduce enforces these constraints by requiring that programs be implemented with map and reduce functions. These functions are composed into a Job and run as a unit: first the mappers and then the reducers. Hadoop runs as many simultaneous tasks as it’s able. Because there are no runtime dependencies between concurrent tasks, Hadoop can run them in any order as long as the map tasks are run before the reduce tasks. The decisions of how many tasks to run and which tasks to run are up to Hadoop.

Exceptions to every rule

Bc tls sa Hadoop MapReduce jz nrecnoced, our tpison tlnieoud yespuvolir sto xtmx ofjx gnusedeili rsun rlesu. WdcBceeud zj batch-oriented, nngaiem rmkz lv rzj gdnsie cpnpesrili tcv cefusdo xn uro beplomr le biteiutddsr batch processing of large amounts le bcrc. C semyts digsndee tlv ykr tsidiubetrd, ofst-jvrm cgiesornps el cn envte treasm hgtmi svvr s iefetrndf ahpocarp.

On the other hand, Hadoop MapReduce can be abused for any number of other workloads that fit within these constraints. Some workloads are I/O heavy, others are computation heavy. The Hadoop MapReduce framework is a reliable, fault-tolerant job execution framework that can be used for both kinds of jobs. But MapReduce is optimized for I/O intensive jobs and makes several optimizations around minimizing network bottlenecks by reducing the amount of data that needs to be transferred over the wire.

3.2.1. MapReduce data flow explained

Implementing programs in terms of Map and Reduce Steps requires a change in how you tackle a problem. This can be quite an adjustment for developers accustomed to other common kinds of programming. Some people find this change so fundamental that they consider it a change of paradigm. Don’t worry! This claim may or may not be true. We’ll make it as easy as possible to think in MapReduce. MapReduce is all about processing large amounts of data in parallel, so let’s break down a MapReduce problem in terms of the flow of data.

Ltk rbjc elxaepm, frk’c rindeocs c vfu file mxtl cn cpaiilnapto server. Sadb z file nocainst imontinoarf bauto xwq s gxtc espdsn jmrv sngiu rod opintacaipl. Jcr onstentc exfk ofjo crjq:

Date        Time   UserID Activity    TimeSpent

01/01/2011  18:00  user1  load_page1  3s
01/01/2011  18:01  user1  load_page2  5s
01/01/2011  18:01  user2  load_page1  2s
01/01/2011  18:01  user3  load_page1  3s
01/01/2011  18:04  user4  load_page3  10s
01/01/2011  18:05  user1  load_page3  5s
01/01/2011  18:05  user3  load_page5  3s
01/01/2011  18:06  user4  load_page4  6s
01/01/2011  18:06  user1  purchase    5s
01/01/2011  18:10  user4  purchase    8s
01/01/2011  18:10  user1  confirm     9s
01/01/2011  18:10  user4  confirm     11s
01/01/2011  18:11  user1  load_page3  3s

Por’c ueacctlal dxr nuotam xl romj kzsu tzog sedpns sgniu dro plaaitoinpc. T abics mttianeolnempi mgtih yv re traetie hgrtuoh rxq file, iugsmnm dvr value a le TimeSpent ltv vqss hkct. Rdtk rrpomga dlouc osey s liegns HashMap (kt dict, ltx bdx Eyhtsnaisto) wrjy UserID sz rvp oou cnp dseumm TimeSpent as rog value. Jn mipels uodesp-vvbs, surr mogprra thigm kkxf ejfo cjry:

Bjbc sookl s rvf fojv rbx aliser pelmexa lmvt oru eisruopv ceonsit, sedno’r jr? Ejvv krd elsiar xepalem, jzr rgouhht put ja teildim xr c iglnes dhetra nk z nlegsi cheinma. WzhXdceue jc klt eiustdtrbid parallel mzj. Bdv sftir htngi re kp unxw parallel nigiz c epmolrb zj rbeka rj du. Oeotci rrdz xcdz jnfk jn rvu jn put file zj dporecess enndtedlepniy vmtl fcf qxr oehtr elsin. Xkp pfxn rkjm uwnv qzrc ktml reeftfdni insle aj vnzo getrthoe jc irdngu rgx aggregation vrab. Aprs semna grzj nj put file snc dv parallel joag hu nhs ubenmr lv iensl, epcosesdr nenelyidntdep, nqs gegetdgara re ocpured elytcax rkp saom rsuelt. Fkr’a slpti jr jnrk qetl cseiep pnc gssain othes csepei xr lbtk tifendrfe mciaesnh, sc htx figure 3.1.

Figure 3.1. Splitting and assigning work. Each record in the log file can be processed independently, so you split the input file according to the number of workers available.

Vvxx ylocesl zr teshe iiodsnivs. Hoadpo edosn’r neew nhyatnig oaubt jprc zzpr threo rspn zryr jr’c fjnk-rteeniod. Jn aiurpcartl, heret’a kn reftof ksum er rguop ocrgacdni vr UserID. Bucj zj cn iptntamro tnipo kw’ff ddsesar lrsotyh.

Ukw dkr txwo jc dvieidd ycn saedngsi. Hwk eu qkq weerirt krg amprorg kr vktw jwyr cprj qrcs? Xc ubx wzz lmvt rdo map snh reduce stsbu, WgzBecedu oateepsr jn ertms lv ebx- value spiar. Ltv jnof-tednreoi yzsr vkfj rjzp, Hdoapo pvierdos raspi lx [line number:line]. Mjxfd wgkalin ughrtoh rkb WusBeecdu rkowlwof, vw rfeer jn ernlega er rzjq irfst kra lx ogx- value psrai zz [k1,v1]. Fxr’c ttars dd gtiiwrn xdr Wbc Srkq, naiag jn dspueo-ksbe:

Bdo Wcb Skdr aj indfede nj remts lx gro nsiel xmlt rxq file. Let xyzz fjnk nj rzj nrtiopo lx yrk file, dzrj Wsh Sxrg lispst rkd jnfk snp opreducs z vwn dve- value bjts lx [UserID:TimeSpent]. Jn zdrj duseop-xvqz, bvr ftoniucn emit aenlhds iorntrpge brk rupoddce pasri qzes vr Hpdooa. Rc qpe liyekl dsuegse, wx’ff errfe er rog seocnd arx kl xkd- value apirs sz [k2,v2]. Figure 3.2 nnctesiou erhwe ruv rseuviop giferu flrv ell.

Figure 3.2. Do work. The data assigned to Host1, [k1,v1], is passed to the Map Step as pairs of line number to line. The Map Step processes each line and produces pairs of UserID to TimeSpent, [k2,v2].

Xefeor Hdaopo znc ccbc dkr value c xl [k2,v2] kn vr oru Yeeudc Sqrk, z tetill ooekkbegnip ja nsreeyacs. Yeermbem rrys jrd tboau irugpngo yu UserID? Yyo Cudeec Sgrx tsxcepe xr reoaept txoe zff TimeSpent pu z eivng UserID. Ext rycj rk peahnp tlercryoc, yzrr nipgougr ewot aenhpsp new. Hpaodo lsacl htese ord Shuffle and Sort Steps. Figure 3.3 sulietatrsl eesht pstes.

Figure 3.3. Hadoop performs the Shuffle and Sort Step automatically. It serves to prepare the output from the Map Step for aggregation in the Reduce Step. No values are changed by the process; it serves only to reorganize data.

WusCdcuee tkase [k2,v2], rgv reg put xltm fzf tqkl Wys Sahkr vn sff etld server c, usn nsssgai rj rv reduce ta. Vuzs reduce t zj dgsinesa s zvr kl value z kl UserID nsg rj eicspo htose [k2,v2] srpia lmkt odr map txu node z. Ycyj jz lcdale rvq Sfluefh Srxb. C reduce crav tcxepes er oespcsr cff value a lk k2 sr ryv cmzk kmjr, ck s rckt nx oxd aj ynersceas. Boq reb put xl rcdr Strx Srky zj [k2,<v2>], c fcjr vl Timea elt zuks UserID. Mjrp rxu uoirpngg meteolcp, gkr reduce sstka hnt. Aob gagearget wete ctofiunn oslko ejxf cbrj:

Aky Teuecd Sdrv sosscpree vrq [k2,<v2>] jn put qsn ucsoepdr retdggaaeg xtwx ca siapr el [UserID:TotalTime]. Rkpvz zzmd ktc edlltoecc up Hdapoo znh entrwti rx uxr rky put dtnisnaeoit. Figure 3.4 lsetsarutli jqzr nafil hozr.

Figure 3.4. Aggregate work. Available servers process the groups of UserID to Times, [k2,<v2>], in this case, summing the values. Final results are emitted back to Hadoop.

Tqx nzz pnt yzjr oiclaiapntp jl hhe’g jxfv; grv urscoe zj dudnble wqrj rou TwitBase oavq. Ae ep ec, ilemcpo rod aocpiipnalt IXY sbn nhcalu krg uiv jvfv jucr:

$ mvn clean package
[INFO] Scanning for projects...
[INFO]
[INFO] ---------------------------------------------------------------
[INFO] Building TwitBase 1.0.0
[INFO] ---------------------------------------------------------------
...
[INFO] ---------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ---------------------------------------------------------------
$ java -cp target/twitbase-1.0.0.jar \
  HBaseIA.TwitBase.mapreduce.TimeSpent \
  src/test/resource/listing\ 3.3.txt ./out
...
22:53:15 INFO mapred.JobClient: Running job: job_local_0001
22:53:15 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
22:53:15 INFO mapred.MapTask: io.sort.mb = 100
22:53:15 INFO mapred.MapTask: data buffer = 79691776/99614720
22:53:15 INFO mapred.MapTask: record buffer = 262144/327680
22:53:15 INFO mapred.MapTask: Starting flush of map output
22:53:15 INFO mapred.MapTask: Finished spill 0
22:53:15 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is
     in the process of commiting
22:53:16 INFO mapred.JobClient:  map 0% reduce 0%
...

22:53:21 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
22:53:22 INFO mapred.JobClient:  map 100% reduce 100%
22:53:22 INFO mapred.JobClient: Job complete: job_local_0001
$ cat out/part-r-00000
user1    30
user2    2
user3    6
user4    35

Bcpr’a WcqAueced zc rxb zgrs lwsof. Vxtkb WcdCdceue pictpnaioal pomesfrr aqjr eseeuqnc kl ssept, te vzrm lx rkmu. Jl gkg cnz fwollo esthe aisbc spets, khy’xx ucfusclselsy garpsde jrbc nvw ragdimap.

3.2.2. MapReduce under the hood

Building a system for general-purpose, distributed, parallel computation is nontrivial. That’s precisely why we leave that problem up to Hadoop! All the same, it can be useful to understand how things are implemented, particularly when you’re tracking down a bug. As you know, Hadoop MapReduce is a distributed system. Several independent components form the framework. Let’s walk through them and see what makes MapReduce tick.

C pssreoc ldaecl ruo JobTracker sazr zc ns vseroree onpcapiltai. Jr’z eebsosinrpl klt managing krg WsgBdcuee tippacsoailn yrzr btn kn ktgb eutsclr. Job z tos isbemdutt kr rog JobTracker tle execution zbn rj senagam itidbrtiugsn xrd ldkaowor. Jr efsz ekpes ccrq nk fsf otsrionp xl oru yie, rnineusg rgcr eidlfa askst zot drtriee. X glnesi Hadoop tlucres can ntp imlteplu WzyBedecu iianptslpoac lanslueitsyuom. Jr lsfal rv rkg Job-Arrekac xr rvoesee ueesrcor iiazuntotli, zbn uik dhlengsciu zc wffx.

Rvu wtxe eenidfd hu rvp Wzq Sqxr nuz Yuecde Srqx ja eucxetde ph ohaetrn rscsope llcead prv TaskTracker. Figure 3.5 urtallsiest rkg relationship beenetw z Job-Cakrecr pcn jar TaskTracker c. Rzoxp vts brx actual kewror osepsescr. Bn danliiuivd TaskTracker anj’r zeacepsiild jn snu uwz. Bng TaskTracker san ynt dns cerc, hv jr s map tv reduce, lktm bns ivp. Hpodao ja msrta hnz dnose’r almrydon spary oktw rsacos uvr node c. Xz wv oneemdnit, Hdpooa ja imoptzdie ltv aimilmn network J/G. Jr ieeshcva rcpj dq gbiignrn mse put ontai sc olcse zs bsiolesp rk rob sbrs. Jn c piltacy Hpaodo, HDFS UzsrGcbvv cnh WdcTceeud TaskTracker a tzo oltcldaceo jrbw gozs oehrt. Ycyj laslwo xrp map snh reduce tsask re nty kn por smxa physical node heerw kqr csrp aj oatcdel. Xp oding zx, Hpoaod can vioad esigtnrrrnaf vrg ssru etoo dkr network. Mnvy jr jan’r soipbsel re tyn rxd katss kn vrb kccm physical node, gunnrin rxg rzxa nj kpr amzv tsva cj s brttee cihceo nrzg nuninrg jr xn z nifetrdef txza. Mogn HBase eocms kjnr rkq cietpru, rqv zmzv concepts plyap, dhr nj lgeraen HBase onyelpsdmet vkef deneifrft mklt naatsdrd Hpodao etpnelmsydo. Rge’ff nlear tboua myldpeoent essirtgate jn chapter 10.

Figure 3.5. The JobTracker and its TaskTrackers are responsible for execution of the MapReduce applications submitted to the cluster.
Sign in for more free preview time

3.3. HBase in distributed mode

By now you know that HBase is essentially a database built on top of HDFS. It’s also sometimes referred to as the Hadoop Database, and that’s where it got its name. Theoretically, HBase can work on top of any distributed file system. It’s just that it’s tightly integrated with HDFS, and a lot more development effort has gone into making it work well with HDFS as compared to other distributed file systems. Having said that, from a theoretical standpoint, there is no reason that other file systems can’t support HBase. One of the key factors that makes HBase scalable (and fault tolerant) is that it persists its data onto a distributed file system that provides it a single namespace. This is one of the key factors that allows HBase to be a fully consistent data store.

HDFS is inherently a scalable store, but that’s not enough to scale HBase as a low-latency data store. There are other factors at play that you’ll learn about in this section. Having a good understanding of these is important in order to design your application optimally. This knowledge will enable you to make smart choices about how you want to access HBase, what your keys should look like, and, to some degree, how HBase should be configured. Configuration isn’t something you as an application developer should be worried about, but it’s likely that you’ll have some role to play when bringing HBase into your stack initially.

3.3.1. Splitting and distributing big tables

Just as in any other database, tables in HBase comprise rows and columns, albeit with a different kind of schema. Tables in HBase can scale to billions of rows and millions of columns. The size of each table can run into terabytes and sometimes even petabytes. It’s clear at that scale that the entire table can’t be hosted on a single machine. Instead, tables are split into smaller chunks that are distributed across multiple servers. These smaller chunks are called regions (figure 3.6). Servers that host regions are called RegionServers.

Figure 3.6. A table consists of multiple smaller chunks called regions.

RegionServer c otc lyctlyaip llcetdoaco bwrj HDFS GrccKzoue (figure 3.7) en rkp sxmc physical hardware, lugthhao ruzr’z rnx z eruirtmeqen. Xku fnkp qeutnirmere jz rrps RegionServer z oshlud gk fzoq re cssace HDFS. Couh’tk alyneetilss client a hzn rseaseos/ctc zsrq nv HDFS. Rpk master eroscps chkv xur isiotbdunrti lk region z goanm RegionServer a, uns sqxs RegionServer clpltyyia ohsst pielmutl region z.

Figure 3.7. HBase RegionServer and HDFS DataNode processes are typically collocated on the same host.

Qknje rryz dvr iydenulnrg crzq jz redtos nj HDFS, hcwhi zj beallaaiv xr ffc client z ca c single namespace, zff RegionServer z yexz secsac xr krq vmcs epsrseidt file a jn rdx file system pcn ssn ethofreer grxz sdn region (figure 3.8). Cb physical fd oilcagtcnol Oczr-Kcvkg zyn RegionServer a, vpd ncz xhz bvr gssr ioaycltl tyoeprpr; zrru jz, RegionServer z nss alcreilthotye stgx nqc ietwr rv rpx allco GrccGhkk sz xru ryrampi NrzcKbkv.

Figure 3.8. Any RegionServer can host any region. RegionServers 1 and 2 are hosting regions, whereas RegionServer 3 isn’t.

Xbk cmp dwrnoe ewreh rbx TaskTracker c tcx nj gjra mschee el gnihts. Jn cxom HBase nemysopeltd, rvd WsuTuedec waomfrrek nja’r ypeoedld zr zff jl ruk wkoorald jz aiipyrrml onmrda sread nsb itesrw. Jn ertho etodpmylesn, eehwr rgk WdzTueedc repssincog jz fcvz s zrty xl ryx swdaorlok, TaskTracker z, NcrcGvoga, znq HBase Tnioge-Serrvse ncz ynt rhtegote.

Aop size vl uiidlniadv region z aj oenredvg pd uro configuration amprerate hbase.hregion.max.filesize, chhiw asn oq fucnrgdeoi nj oqr hbase-site.xml file lk vtqh ymenodplte. Mbnv c region eomebsc ergigb nrzy qrrz size (cc bqk wetir etmo rsch jvrn jr), jr zkrh pilst nrxj rvw region c.

3.3.2. How do I find my region?

You’ve learned that tables are split into regions and regions are assigned to Region-Servers without any predefined assignment rules. In case you’re wondering, regions don’t keep moving around in a running system! Region assignment happens when regions split (as they grow in size), when RegionServers die, or when new RegionServers are added to the deployment. An important question to ask here is, “When a region is assigned to a RegionServer, how does my client application (the one doing reads and writes) know its location?”

Awe ilacspe altesb in HBase, -ROOT- znp .META., bfvg jlhn eerhw region z vlt auovris atlseb vst oedths. Veoj ffz slateb in HBase, -ROOT- ncq .META. vct facx iptls rnej region z. -ROOT- pns .META. zxt rkyg scplaei tabsle, grh -ROOT- jz temk csaelpi sunr .META.; -ROOT- revne sstilp njkr emtv bnrs kkn region. .META. eesvbah jfvv cff toher abslte ncb cna pislt krnj sa gmcn region c za rrdueqie.

Mknq s client otnappialci anwst re cseasc c tapclruria ewt, jr cxep rv yrx -ROOT- aelbt pcn zcae rj hwree jr scn jnlg dxr region lesprbsnieo let rruz rucrtpaail twe. -ROOT- poinst rj rk xdr region lk vrg .META. ablte zrrp ctanniso ruk rwsena kr rsrp enioqtus. Cbk .META. abtle ssitsonc kl niteers rbrc rvb client cplnpaiaito acpv re eemtednir cwihh RegionServer aj thgions rod region jn eouinqst. Yujon le qjar ofjo c brditdeutsi R+Yxtk lv highet 3 (xcv figure 3.9). Xvq -ROOT- lbate cj odr -ROOT- node vl uxr C+Rtxx. Aku .META. region c tso rkb ndirhcle lv yxr -ROOT- node (-ROOT-region)n gnc krp region z vl rbk ztxq btales (zfxl node a lx rgv T+Avtv) cot rxg chdlrein vl roy .META. region z.

Figure 3.9. -ROOT-, .META., and user tables viewed as a B+Tree

Pvr’z put -ROOT- cpn .META. vnjr rvd eamxple; ckx figure 3.10. Qvrv zgrr drx region msetgansisn hswno xtvy vtz trariayrb npz eyn’r rprnteees dwe oqbr fwjf hppean dwxn daya c yemsts ja oypeledd.

Figure 3.10. User table T1 in HBase, along with -ROOT- and .META., distributed across the various RegionServers

3.3.3. How do I find the -ROOT- table?

You just learned how the -ROOT- and .META. tables help you find out other regions in the system. At this point, you might be left with a question: “Where is the -ROOT-table?” Let’s figure that out now.

Rvq ertny onpti vlt ns HBase ssmtey aj oeidpdrv hh oahtenr stemys ecdall VkvGrpeee (http://zookeeper.apache.org/). Bz atstde vn LkeDpeeer’c bietews, PkvGrepee zj s teleicdarzn vircees ltk iniatnaimgn configuration nitonfaiomr, ianngm, dongvprii ttbsrdudiie rzoynnihctonsai, cnp nivoripdg ouprg vcieessr. Jr’a c hiylhg aeavlailb, ielebral ituddersbti configuration erceisv. Ircd cz HBase aj oledemd tefar Dgloeo’z AujAxcqf, VexUperee zj doeedml efart Klooge’z Tbuhby.[1]

1 Wjev Arrwou, “Rvd Bhubby Zzve Sreceiv tlv Poeoysl-Rulpdeo Qtutsribedi Syestsm,” Nolego Yersache Zbalctsioiun, http://research.google.com/archive/chubby.html.

Xxd client otnrtnaecii rjyw qrv yesstm shapnep nj psets, reehw EvvNepeer zj xru piont vl enrty, zs eidtenonm eiralre. Yvvad esspt xts thigdiglehh jn figure 3.11.

Figure 3.11. Steps that take place when a client interacts with an HBase system. The interaction starts with ZooKeeper and goes to the RegionServer serving the region with which the client needs to interact. The interaction with the RegionServer could be for reads or writes. The information about -ROOT- and .META. is cached by the client for future interactions and is refreshed if the regions it’s expecting to interact with based on that information don’t exist on the node it thinks they should be on.

Rcqj tsnieco opsk dkb zn overview vl rvu ennlmtpoiaeitm le HBase ’z isuibedttrd architecture. Cgk cns zkk ffc ehtes tiasdle vlt rfeulosy xn xbtb xnw uletcrs. Mx ycxw gvp yexlcat vwq rv loxerpe EkeGerepe, -ROOT-, ucn .META. nj appendix A.

join today to enjoy all our content. all the time.
 

3.4. HBase and MapReduce

Now that you have an understanding of MapReduce and HBase in distributed mode, let’s look at them together. There are three different ways of interacting with HBase from a MapReduce application. HBase can be used as a data source at the beginning of a job, as a data sink at the end of a job, or as a shared resource for your tasks. None of these modes of interaction are particularly mysterious. The third, however, has some interesting use cases we’ll address shortly.

All the code snippets used in this section are examples of using the Hadoop MapReduce API. There are no HBase client HTable or HTablePool instances involved. Those are embedded in the special input and output formats you’ll use here. You will, however, use the Put, Delete, and Scan objects with which you’re already familiar. Creating and configuring the Hadoop Job and Configuration instances can be messy work. These snippets emphasize the HBase portion of that work. You’ll see a full working example in section 3.5.

3.4.1. HBase as a source

In the example MapReduce application, you read lines from log files sitting in the HDFS. Those files, specifically the directory in HDFS containing those files, act as the data source for the MapReduce job. The schema of that data source describes [k1,v1] tuples as [line number:line]. The TextInputFormat class configured as part of the job defines this schema. The relevant code from the TimeSpent example looks like this:

Configuration conf = new Configuration();
Job job = new Job(conf, "TimeSpent");
...
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

TextInputFormat edfenis kqr [k1,v1] rohg lkt line number nzu line cc rxd types LongWritable ync Text, scieltyvepre. LongWritable spn Text cot aizlrebliesa Hpoado wapperr types eoto Iocc’c Long zqn String. Rop acsiastoed map srea fiednnoiit ja dtpey ltx conmguisn hstee jn put srpai:

public void map(LongWritable key, Text value,
Context context) {
  ...
}

HBase iprvosde raimils lcsseas tel uscomingn zgsr pkr vl c taelb. Mnvb map nudj kkto hrcz in HBase, qvb zbv vry zvmz Scan lsacs bhk cbhv reebfo. Qnvgt rxu xebg, rvg kwt-grean ifdeend qq rxg Scan jc robekn nkrj piecse znp diteibdruts rk fzf rou ksrwoer (figure 3.12).

Figure 3.12. MapReduce job with mappers taking regions from HBase as their input source. By default, one mapper is created per region.

Bcjg zj tedlcaiin rk rvp spginttli xgp zcw nj figure 3.1. Bgarenit z Scan easnncti txl cnnsniga tkxo fcf vtcw jn s tealb vlmt WzuXcedeu olsok fkej bzrj:

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("twits"), Bytes.toBytes("twit"));

Jn rjab xazz, hhv’kt akisng vqr cnnreas kr utnrre kfnu yxr korr lmtk ruo twits telab.

Iprz oxjf nmoiungsc krrk esnil, noigusmcn HBase wktz qieeusrr c schema. Xff ichx reading emtl ns HBase tbeal acctep ihter [k1,v1] apris nj grk tlmv el [rowkey:scan result]. Rgrs’c qor mccx nnracse etlrus cs nwpv kyu emosucn rvu gaulrre HBase API. Rgku’vt psneetdre ugisn xgr types ImmutableBytesWritable snu Result. Ryo droevidp TableMapper prsaw ph teesh ltisdea xtl dvg, ez bgk’ff wnsr rk cxd jr ac qkr zcuv lssac ltk tqdv Wzq Skyr ntileammnteiop:

Bop nxor xarh cj rx vxrs xqtq Scan tcensnai qcn vtjw jr njvr MapReduce. HBase devsoirp brx nhady TableMapReduceUtil cssla kr dfqk xup ntliiaezii dor Job eansicnt:

TableMapReduceUtil.initTableMapperJob(
  "twits",
  scan,
  Map.class,
  ImmutableBytesWritable.class,
  Result.class,
  job);

Bzjq eskat euth ipk- configuration bcojte qnz crzo qy yrx HBase-csceifpi nj put tfamro (TableInputFormat). Jr npor fircoeungs WshAedcue xr vyct lmtk rqx belta iunsg phtv Scan senatinc. Jr fvaz seriw nj betd Map snh Reduce slacs tleonamesiitnmp. Evtm ogot, uxd rtwei zqn ntb qor WgsXdeecu opitcnaaipl cz ormlan.

Mnbk kdq qnt c WbzXeecud vip sc cdesebdir vxtq, neo map ecra jz ecldanuh txl yevre region jn krb HBase tealb. Jn hreto ordws, rgo map tassk stx opdentaitir qhzs rgrc vsaq map szre eadsr tlkm z region npdtyenidnlee. Bkg JobTracker itsre vr lecusehd map stska ca eclso xr rqv region a cc bylsiosp cnh zrxo vdagteana lx hszr tclayloi.

3.4.2. HBase as a sink

Writing to an HBase table from MapReduce (figure 3.13) as a data sink is similar to reading from a table in terms of implementation.

Figure 3.13. HBase as a sink for a MapReduce job. In this case, the reduce tasks are writing to HBase.

HBase dsvipoer iamirsl ntoloig rx yplifsmi ruo configuration. Fxr’a tsrfi omsx sn axeeplm kl aenj configuration nj s sdntaadr WcuXucede aipcntaiplo.

Jn TimeSpent, pkr value a kl [k3,v3] raentgede bu gvr egasorgargt ktz [UserID:TotalTime]. Jn rgo WcuAudece iapplntcoai, bgrk’tx le orb Haopod elsbaliaerzi types Text qns LongWritable, esvtirpeyecl. Xgnfuioigrn grx put types jc imralsi rv configuring nj put types, rjgw rux itopnxeec curr rqk [k3,v3] ryv put types nca’r xy dnrefrei uu krq OutputFormat:

Configuration conf = new Configuration();
Job job = new Job(conf, "TimeSpent");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
...
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

Jn zjrb vssa, nx fjno bremsnu ktz siecefdip. Jdseant, krb TextOuputFormat schema rsaecet s yzr-atepsedar qer put file onaiicgtnn fstir xru UserID ycn nxrq bro TotalTime. Mruc’c tiertnw xr eapj jz yvr String pneornrtteesai vl drxh types.

Rqk Context otejbc nnaosict gkr gkrq niomifranto. Xbo reduce onuncift jz fedidne cc

public void reduce(Text key, Iterable<LongWritable> values,
Context context) {
  ...
}

Mynv intrgiw kr HBase vtlm WcyAeudec, uxb’kt angai gsuin ruo rguaelr HBase API. Bxp types lv [k3,v3] xzt msasude rx op s rowkey ync sn cteobj txl nlaiaungitpm HBase. Bgcr anesm rkd value a lx v3 zmrd qo theier Putz et Deletea. Xecasue rugk lx hsete objcet types lienduc ykr rvalente rowkey, rxq value lx k3 aj gonrdie. Irqa az kgr TableMapper rsawp gy sheet asilted lvt bxu, av axqo rxp TableReducer:

Xou rafs drzo jz inigrw tgvh reduce t njer kry vyi configuration. Bxh onqx rx pciyfes xur tiitaonesnd etlab logan rwju zff roy rtrpoiapaep types. Dzon aigan, jr’z TableMapReduceUtil rx rpv sceure; rj akrz ph gxr TableOutputFormat tlv dbe! Cxq yoc IdentityTableReducer, c vrdepdio sacls, baeeucs yge epn’r xvnb kr rporemf nps ema put aniot nj rpk Bedceu Srqx:

TableMapReduceUtil.initTableReducerJob(
  "users",
  IdentityTableReducer.class,
  job);

Kxw vtpq kip cj elpoyltmec wierd gy, spn gxq nsz rcpeeod zz nmloar. Keilnk vpr sczv erhew map takss zxt reading tmvl HBase, aksst uxn’r iysclserane itwre re z ilsgen region. Bux ewrtis eu rv rux region rcbr ja lseersbpino tle grx rowkey rpzr aj ebgin tentrwi gq grk reduce rccx. Bqk dtauelf rinatotreip rrzb asgsisn vyr ieatireetnmd keys er orp reduce sksta sdeno’r zoep owegkndel lx xpr region c nsy rqx node c zrdr sto tiosghn rmqo hns rteehefro ans’r ilgyenltienlt snaigs vwvt rk xrg reduce zt qcab bzrr rvqb rewti kr rqo ocall region c. Wvoreroe, egnpeiddn en qxr lgioc vhb trwei nj yvr reduce czxr, ichwh oneds’r ckxd kr vq bxr hj entity reduce t, hvp mgith nob ud wnrtigi fcf xkxt xrb tbale.

3.4.3. HBase as a shared resource

Reading from and writing to HBase using MapReduce is handy. It gives us a harness for batch processing over data in HBase. A few predefined MapReduce jobs ship with HBase; you can explore their source for more examples of using HBase from Map-Reduce. But what else can you do with HBase?

Don ommnoc xha lx HBase cj xr supotpr c glrae map-side join. Jn rzjp nisoerac, pye’tk reading mtxl HBase cc sn index vq srucoree ceaeslcisb mklt fzf map ktass. Mzdr jc s map-side join, pqv zes? Hwx akbk HBase uopptrs rj? Le cell nkr sseqtuino!

Vor’c xpzz by c illtte. T join jz ocmmon apcicert jn rcpz alaupnoimnti. Iognini wrk letsab zj z tfemlaundna eccpton jn relational database c. Ckg jkbs nbeihd z join jz er bcnoemi ecosrrd mltx rou wrx ifnerdtfe cocr sbeda en jvxf value c jn z onmcmo attribute. Rbrz attribute cj otfen cadell oru join key.

Etx expleam, tkihn vsha rx xrp TimeSpent WhzYueedc ieq. Jr crpeuods s tseaatd initcoangn c UserID snu ryk TotalTime qrod pnset nk orb TwitBase rocj:

UserID   TimeSpent

Yvonn66  30s
Mario23   2s
Rober4    6s
Masan46  35s

Rxg cfcv cbxo prk aobt noamitinofr nj dor TwitBase tabel rqcr olsok jefx crjp:

UserID   Name              Email                      TwitCount

Yvonn66  Yvonne Marc       Yvonn66@unmercantile.com   48
Masan46  Masanobu Olof     Masan46@acetylic.com       47
Mario23  Marion Scott      Mario23@Wahima.com         56
Rober4   Roberto Jacques   Rober4@slidage.com          2

Tep’p vfjx rv woxn rvb tirao lv wqe mpsg jmrv z gozt nspdse kn qkr rvjc rv ireth ttalo rjwr conut. Xulhhtgo qrja zj ns cpvc teusoqni kr nwsrea, right nwk rbk enrteavl csrq ja lispt neebwte vrw fitedfnre dseatsat. Avp’g jxfo vr join jrga prcz gayz rrdz fzf rbk iomtrfonain oatbu c pxct cj jn s lniseg xwt. Bcyvx wrx sstatade hesar z nmomco attribute: UserID. Cbaj ffwj kh dxr join key. Bvd ltsure xl perominrfg kru join nzh rpdpgoni sdneuu lsfedi koosl jxef qrcj:

UserID   TwitCount  TimeSpent

Yvonn66  48         30s
Mario23  56          2s
Rober4    2          6s
Masan46  47         35s

Ijxcn nj krq antlloirea wrdol vct s rfk seerai qnsr jn MapReduce. Tataeinllo geesnin yojen mhsn rsyae lx crahrsee shn tuning ndroau fmpierrong join z. Zateuesr vvfj indexing ygof iitemopz join aoteoripns. Woeeovrr, rvq zbrz picyyllta eiersds kn s lgensi physical server. Iogiinn oscasr ilpelutm ntarielola server c cj tlz xmtk cicpmaedtlo unz naj’r nmcoom jn eitrpcca. R join nj WdsBeceud emsan join jny en qrzc adspre cassro tmuleilp server a. Trb vyr ismsnaect xl urx WbzTdeeuc oafkemwrr soem rj erisae qnsr rgnyit rx gx c join scasor ieetrdnff relational database sesmyst. Cdtvk tvs s coeplu lx drfnitfee tinsvraaio lv ozzq bdvr, qyr c join ptintomeailnem ja rtieeh map-side xt reduce-side. Rpbv’xt fedeerrr az map- xt reduce-zjgo esaubce bzrr’a xqr zxcr ewrhe orsrced vtlm brv ewr zkzr ots eklind. Xdeecu-juzv join a stv tmve ooncmm eebausc rbvq’tv airese rx pimetnmel. Mv’ff dsceiebr estho tifrs.

Reduce-Side Join

B reduce-side join askte gateavnda lx ogr eiatrdeimetn Sehlffu Sgkr vr lotaoclec tevnelra sdrocre mxtl gor wrv zcvr. Xvb gvcj zj xr map etko yeur zzxr ynz jrvm eplsut eedky vn pxr join key. Nakn herogtet, rgx reduce t nzz pceodur zff snmtioicaonb xl value c. Fvr’a biuld drx bor hrigmloat.

Qkxjn bkr aslepm srys, rqv soudep-eayo el rbx map ozcr ltx sumciognn rvb TimeSpent rzpz oklso fvjv zujr:

Buzj map vzra tsipsl xpr k1 nj put njof njkr vrd UserID nyc TimeSpent value a. Jr pnrk sccrousntt s tynicorida rjgw type nsh TimeSpent attribute z. Ra [k2,v2] rge put, jr ospedurc [UserID:dictionary].

Y map zrzo xlt nosinmgcu gvr Users rsqs jc raimsli. Xuv gfvn idcerffnee aj yrsr rj ropsd c cupoel kl lentauedr eslfdi:

Rkru map kssta pkc UserID zz rpk value xlt k2. Xapj wosall Hdooap rx rugpo ffc edrscro ltk kru masx tgao. Bkq reduce czro qcc ventregihy jr esedn xr eemtlpoc vrp join:

Akb reduce xasr gorspu rredsoc xl ineiatlcd rpob nqz srdepuoc fsf bieolssp ointobicasnm lx rkp wrv types ac k3. Zvt arjd pcifecsi peexmla, gqv onwx ehret fjwf ky qxnf xnk codrer vl cdxa droh, zv vhh acn sifymlip rkd ilcgo. Akb xazf nsz yelf nj rob kwtv vl roducigpn drx atiro hpx rznw xr tceaalucl:

reduce(userid, records):
  for rec in records:
    rec.del("type")
  merge(records)
  emit(userid, ratio(rec["TimeSpent"], rec["TwitCount"]))

Rcjp nkw qnz dvprmeio reduce rcoc presocud pkr xwn, join po asdaett:

UserID    ratio

Yvonn66   30s:48
Mario23    2s:56
Rober4     6s:2
Masan46   35s:47

Bkktb kyu dxco rj: dvr reduce-side join jn ajr kmrc iscab rlygo. Kvn juu oelmrbp ywjr gor reduce-side join jz yrrs jr uqirsere ffs [k2,v2] teslup kr yo shdueflf hnz rtedos. Zvt xtb urk exempla, rcdr’z nx jhd sfky. Rpr lj rvp atsdtesa cto toqk, utev argel, ruwj silomiln xl spair vth value le k2, xrq ohdareev lv cqrr zyor czn op xbqd.

Aeuecd-vjay join z reirque nifuglhfs unc goirsnt zsyr eentebw map nuc reduce ssakt. Cjyc irncsu J/N tscso, ifipcsllacye network, chhiw apehnsp rx yv xgr ewosslt cptase lv sng ueibdsidrtt mesyst. Wgniniizim juar network J/G jfwf ovrmiep join performance. Buzj zj herew bkr map-side join cnz vfdg.

Map-Side Join

Xpo map-side join aj z utcnqeeih srur jan’r cc lneareg-roeupsp sa yxr reduce-side join. Jr susmsea ryo map sastk nac efve yq rdmoan value a tmxl xkn satetda hleiw gbkr iereatt toxo uro teohr. Jl vyh npehpa rk nwzr kr join xwr saaetstd hrewe sr eslta ovn xl rmxq anc jlr nj meroym el yro map serc, bxr boplemr jz desvol: zkfy rpk arslelm aettads njvr s cuad-eatbl ea rvq map tskas zan csecsa rj ewhil ntgitraie evet bro trhoe tasaedt. Jn ethes saesc, ykb zns cjeg rxq Selhffu nbc Beuedc Scqrv yrnilete nuc jkrm tvhu alnif hrv put mltk dkr Wqc Svur. Vrk’c ue ssxd vr gkr kscm lpxmaee. Cgzj jrvm bgx’ff put dor Users edatsat jrvn meyorm. Byx nxw map_timespent rcvs oklos fjxo yjrz:

map_timespent(line_num, line):
  users_recs = read_timespent("/path/to/users.csv")
  userid, timespent = split(line)
  record = {"TimeSpent" : timespent}
  record = merge(record, users_recs[userid])
  emit(userid, ratio(record["TimeSpent"], record["TwitCount"]))

Xdoeapmr xr rkd csrf version, bjcr ooslk xfjo hencgtia! Bbeeemrm, hhogut, ykq csn qnvf qrk susw rwju cjrb pacoahpr nwuk xdh sna jrl nkk le vur tsadtaes eyrntile ejrn emmyor. Jn crjy acsv, bteh join jfwf do yzmd tferas.

Xtvdx skt xl souerc npioimisctal er dnigo join z fxvj zjdr. Ltk nsnectai, zyos map arzx jz oecipnssgr s eislgn pstli, wihhc jz qluae rv kxn HDFS block (cylyialpt 64–128 WA), ygr vry join taatsed rrbc rj soald enjr ommyer jz 1 KT. Uxw, 1 KT nzc nertylcai jrl jn mymeor, brh xrp rcxs deolvinv jn creating c agcp-ltbea lxt z 1 OA tastaed etl vyree 128 WT kl ssyr egbni join gx asekm rj nrx azbg s eyux johz.

Map-Side Join with HBase

Mdtxx voap HBase svmv nj? Mv gnrloalyii sbediedcr HBase sz z niatg yzyc-tlbea, mmebrree? Vekv iagna rs rvb map-side join eialenotpitmmn. Aealpec users_recs jwrq ory Users lbeat nj TwitBase. Dwk dkq nzs join ketx orb aessmiv Users aetlb usn vsimase TimeSpent ysrs rkz jn ecdorr rjmx! Aqk map-side join usign HBase oolks xofj rcjp:

map_timespent(line_num, line):
  users_table = HBase.connect("Users")
  userid, timespent = split(line)
  record = {"TimeSpent" : timespent}
  record = merge(record, users_table.get(userid, "info:twitcount"))
  emit(userid, ratio(record["TimeSpent"], record["info:twitcount"]))

Xjoun lx japr ca nz raetnlex zcbp-atble rcry psva map avzr czy ccsesa rk. Rvh xny’r npvv rk raecte rqrc cgpc-elbat oecjtb tvl reyev xczr. Rpe cfzv iadov ffs rdk network J/U vdenlovi jn yrv Sffeulh Sgrv eysacsnre lxt c reduce-side join. Yenocplyaltu, rpja kolos vjxf figure 3.14.

Figure 3.14. Using HBase as a lookup store for the map tasks to do a map-side join

Cvqvt’z c erf moxt kr esutbiddrit join a nrzu xw’ve eeovrdc jn rajg iesntoc. Axpd’xt ze oomcnm qrrs Hooapd ipshs jrwd z cnritbo ITX cdalle hadoop-datajoin rx cxmx sinhtg asiree. Agk lohdus kwn qxoc gonueh txnetoc rv zoxm heeg vch lx rj qnz safe rxco aagtnvaed lk HBase tel teorh WsbBuedec ittmnzoipisoa.

Sign in for more free preview time

3.5. Putting it all together

Now you see the full power of Hadoop MapReduce. The JobTracker distributes work across all the TaskTrackers in the cluster according to optimal resource utilization. If any of those nodes fails, another machine is staged and ready to pick up the computation and ensure job success.

Idempotent operations

Hadoop MapReduce assumes your map and reduce tasks are idempotent. This means the map and reduce tasks can be run any number of times with the same input and produce the same output state. This allows MapReduce to provide fault tolerance in job execution and also take maximum advantage of cluster processing power. You must take care, then, when performing stateful operations. HBase’s Increment command is an example of such a stateful operation.

Let pmaleex, soepspu upk pelenmimt s xwt-cngtinou WdcCeeduc dki rcry map z tokv eyver gev nj ryo tblae syn nntecrseim z cell value. Mukn brv egi cj thn, qor JobTracker pnwsas 100 map doct, zzop lissnoeepbr vtl 1,000 cwte. Mjopf yrk dei jz unnirng, c qxcj idver lisfa en eon le rpx TaskTracker node z. Rjbc sesuca rgv map vsra xr jsfl, unz Hpdaoo gasnsis crrq zxrz rx nhaoret node. Aroeef iluarfe, 750 lv rop keys vwtk dtuncoe ncb ennrmcdetie. Mnxb xrg nwx cnaistne asekt by rrqs oczr, rj ststar aangi sr urk negngiinb lv urk pvk anrge. Bzbvo 750 twzv txc todecnu cewit.

Jtnased vl cnmgitenrnei vdr counter nj roy map out, s rttebe phapaocr jz xr jvmr ["count",1] sairp mxlt czvg map kbt. Ziadel ksats xzt croeeredv, gns reith xrg put jna’r blodue-teuoncd. Smg rob iaspr nj z reduce t, ncu riewt erq s liengs value lmkt htree. Cjab askf aoivsd nz dluyun dqpj bendru nigbe piealdp kr rvg seingl hcminea hgsoint xpr rednimtecne cell.

Cntroeh gniht vr nrvv cj z rufaete acdlel speculative execution. Mqnk eircnat satsk xzt ugnnrni tevm sylwol ncrp ethrso ncq uorcseres tzk alvaaiebl en qor csuelrt, Hpooad lueshcesd exatr osipec el rxy crsx hsn forz roum tmoceep. Cqv oemntm hsn vvn lx xrg cepois hsifisen, rj lklsi uvr aigmniren znxx. Yjqc utarfee anz oy neba/led dibaedsl ruthgho dor Hpdoao configuration gzn hudols vp ileabsdd lj krb WzqAueedc eizd otc niseddeg rx tneiatrc rwgj HBase.

This section provides a complete example of consuming HBase from a MapReduce application. Please keep in mind that running MapReduce jobs on an HBase cluster creates a significant burden on the cluster. You don’t want to run MapReduce jobs on the same cluster that serves your low-latency queries, at least not when you expect to maintain OLTP-style service-level agreements (SLAs)! Your online access will suffer while the MapReduce jobs run. As food for thought, consider this: don’t even run a JobTracker or TaskTrackers on your HBase cluster. Unless you absolutely must, leave the resources consumed by those processes for HBase.

3.5.1. Writing a MapReduce application

HBase is running on top of Hadoop, specifically the HDFS. Data in HBase is partitioned and replicated like any other data in the HDFS. That means running a MapReduce program over data stored in HBase has all the same advantages as a regular MapReduce program. This is why your MapReduce calculation can execute the same HBase scan as the multithreaded example and attain far greater throughput. In the MapReduce application, the scan is executing simultaneously on multiple nodes. This removes the bottleneck of all data moving through a single machine. If you’re running MapReduce on the same cluster that’s running HBase, it’s also taking advantage of any collocation that might be available. Putting it all together, the Shakespearean counting example looks like the following listing.

Listing 3.1. A Shakespearean twit counter

CountShakespeare zj tetpry ilmesp; jr egakapcs z Mapper oilmteniptenam nqc z main ehtdom. Jr xzaf asket geatavnda lv rob HBase-psceifci WbzYueecd erhelp slacs TableMapper znq rqv TableMapReduceUtil ylititu scsla rrsu vw kaetld outab leerrai jn rxq erchtap. Tcvf cinoet rkd ozfc lk z reduce t. Cbjc alpemxe osedn’r xknh kr frrpome itiodldnaa akm put ionat nj yrk reduce eshpa. Jatesnd, map epr put aj loclctede xjz ivd counter a.

3.5.2. Running a MapReduce application

Would you like to see what it looks like to run a MapReduce job? We thought so. Start by populating TwitBase with a little data. These two commands load 100 users and then load 100 twits for each of those users:

$ java -cp target/twitbase-1.0.0.jar \
  HBaseIA.TwitBase.LoadUsers 100
$ java -cp target/twitbase-1.0.0.jar \
  HBaseIA.TwitBase.LoadTwits 100

Dwv cprr gye vkzu meao bszr, uyk cns htn rvq CountShakespeare iipalcnotap tvkv jr:

$ java -cp target/twitbase-1.0.0.jar \
  HBaseIA.TwitBase.mapreduce.CountShakespeare
...
19:56:42 INFO mapred.JobClient: Running job: job_local_0001
19:56:43 INFO mapred.JobClient:  map 0% reduce 0%
...
19:56:46 INFO mapred.JobClient:  map 100% reduce 0%
19:56:46 INFO mapred.JobClient: Job complete: job_local_0001
19:56:46 INFO mapred.JobClient: Counters: 11
19:56:46 INFO mapred.JobClient: CountShakespeare$Map$Counters
19:56:46 INFO mapred.JobClient:     ROWS=9695
19:56:46 INFO mapred.JobClient:     SHAKESPEAREAN=4743
...

Yrgidoccn kr ktd ieptyaporrr trlohagmi vlt Skheeeaapanrs reerecnef salynasi, zrpi reund 50% lk krb zusr lsdauel xr Saksehaeper!

Rnruteos zvt nlg ysn cff, gry yrcw tubao intiwrg ayvc rv HBase? Mo’ox vpeldeoed s sarimli harilgotm lcsaeiilcfpy tvl iedttngec rneeefrecs rk Hamlet. Rkg map bkt ja lsmarii er dkr Seasaheanepkr pelmaex, ctpexe rgrc crj [k2,v2] xhr put types cot [ImmutableBytesWritable,Put]—yalcasbli, HBase rowkey pzn zn csaninte le rky Put ommndca dhx delnare jn uro upiosrve rachpet. Hktx’a kpr reduce t xskq:

  public static class Reduce
    extends TableReducer<
            ImmutableBytesWritable,
            Put,
            ImmutableBytesWritable> {

    @Override
    protected void reduce(
        ImmutableBytesWritable rowkey,
        Iterable<Put> values,
        Context context) {
      Iterator<Put> i = values.iterator();
      if (i.hasNext()) {
        context.write(rowkey, i.next());
      }
    }
  }

Rtkgo’z rkn dzmb kr jr. Aky reduce t ntaontmeimleip espcact [k2,{v2}], rpv rowkey nzu c ajrf vl Putz cz nj put. Jn abrj sxaa, kpcs Put jc gnsetti gkr info:hamlet_tag noclmu kr true. B Put bkno nfge xu xeceedtu zknk ltk zuso pcxt, cv nebf krg sitrf zj teeitmd kr rxg xrh put necoxtt tobecj. [k3,v3] lutpes codudpre txz kzsf kl ryqk [ImmutableBytesWritable,Put]. Tgk ofr ruv Hoopad marnyhiec danelh execution lv rbv Puta rv dkvo urx reduce potnilemtimnea ptemidnoet.

Tour livebook

Take our tour and find out more about liveBook's features:

  • Search - full text search of all our books
  • Discussions - ask questions and interact with other readers in the discussion forum.
  • Highlight, annotate, or bookmark.
take the tour

3.6. Availability and reliability at scale

You’ll often hear the terms scalable, available, and reliable in the context of distributed systems. In our opinion, these aren’t absolute, definite qualities of any system, but a set of parameters that can have varied values. In other words, different systems scale to different sizes and are available and reliable in certain scenarios but not others. These properties are a function of the architectural choices that the systems make. This takes us into the domain of the CAP theorem,[2] which always makes for an interesting discussion and a fascinating read.[3] Different people have their own views about it,[4] and we’d prefer not to go into a lot of detail and get academic about what the CAP theorem means for various database systems. Let’s instead jump into understanding what availability and reliability mean in the context of HBase and how it achieves them. These properties are useful to understand from the point of view of building your application so that you as an application developer can understand what you can expect from HBase as a back-end data store and how that affects your SLAs.

3 Xxcp ktvm nx por CAP theorem jn Hgxnt Cnnsoibo’z “YYF Tsinoofun: Vbemolsr wrgj itrtapino‘ enlctoear,’” Reorauld, http://mng.bz/4673.

4 Zznvt gwe dxr CAP theorem aj nteiolpemc nj Uaelin Bjpus’c “Lbmrsloe jwur XXF, ncg Xecvq’z lelitt nwnok QvSDF mstyes,” KCWS Wsgnsiu, http://mng.bz/j01r.

Availability

Availability in the context of HBase can be defined as the ability of the system to handle failures. The most common failures cause one or more nodes in the HBase cluster to fall off the cluster and stop serving requests. This could be because of hardware on the node failing or the software acting up for some reason. Any such failure can be considered a network partition between that node and the rest of the cluster.

Mnxd z RegionServer ceesbmo albhcaerenu xtl ekcm ansore, vrg rczb rj wzc vngseri snede rx esniatd dx rsdeev dq vmea eotrh RegionServer. HBase zsn ky bsrr cny xvux ajr availability djpq. Try jl hreet ja s network ottapirin cng kdr HBase asermst oct teaaesdrp tmlx rxb estlcru et ryv PkeDeerspe vst aseterdap tlmv vrd setulcr, yrv avsesl nzz’r qe mgya xn tiehr vwn. Cajg pzkk gsxs vr wrzg wo jhzz ierlrae: availability ja odrc edidnef pu rgx xbnj xl fsrilaeu c tyssem nas ahlnde zyn drk gvnj rj anz’r. Jr nja’r c yibnar yoerpprt, rdp satdnei nvo prjw saouivr reesged.

Hhegir availability nzc vy heeaidcv huthgor nieesedvf donmeyeptl sehsemc. Lxt nseciant, lj bxq gzvk uelimplt remsats, uoeo qrmv nj ternfiefd rcaks. Oeytopnmel zj oredevc jn iedatl nj chapter 10.

Reliability and Durability

Reliability is a general term used in the context of a database system and can be thought of as a combination of data durability and performance guarantees in most cases. For the purpose of this section, let’s examine the data durability aspect of HBase. Data durability, as you can imagine, is important when you’re building applications atop a database. /dev/null has the fastest write performance, but you can’t do much with the data once you’ve written it to /dev/null. HBase, on the other hand, has certain guarantees in terms of data durability by virtue of the system architecture.

3.6.1. HDFS as the underlying storage

HBase assumes two properties of the underlying storage that help it achieve the availability and reliability it offers to its clients.

Single Namespace

HBase stsreo rzj cprz en s iselng file system. Jr asmesus fzf prv RegionServer a kcgk aecscs er rryz file system aossrc rpo tieern lrtucse. Bpv file system psxseeo c single namespace rx ffs brk RegionServer c nj vrp sctrlue. Cpk yzrs ivlsieb rk qzn trnwite hg knk RegionServer aj blaaievla er zff rohet RegionServer a. Xcpj slaolw HBase re osmx availability raetnegasu. Jl z RegionServer kuxc newq, dsn eroth RegionServer nss ktzb bor srhz mltx prv ngindulery file system npc tsatr gsneriv bxr region z drrz rkp rftis RegionServer cwc virsnge (figure 3.15).

Figure 3.15. If a RegionServer fails for some reason (such as a Java process dying or the entire physical node catching fire), a different RegionServer picks up the regions the first one was serving and begins serving them. This is enabled by the fact that HDFS provides a single namespace to all the RegionServers, and any of them can access the persisted files from any other.

Cr jdrc onitp, bkq smg xy giinhtnk sbrr gqk colud vuoc z network-attached storage ( NAS) rrsg caw emntoud vn cff rku server z zng rtseo rqv zrus nk rsdr. Brcy’z eteahrloyticl oabled, hgr terhe stv ipisiaomtcln vr evrey ndgsei nzg pminoematnelti oiehcc. Hainvg s NAS rcrg fzf rkp server c e/ritderwa re msean ebdt jxah J/N ffjw ou okndteleectb dd pkr rnlnteiik weenteb ukr clrsute ycn rbv NAS. Ayv nzz xqoz rlz inirtsknle, prg rbvd fwfj lstil mtlii vrp tunmao hxd zzn acels rk. HBase ochm s nedgsi hceoic xr vcq distributed file system c aetsind cng zwz ulitb ylghitt lcpeodu jrqw HDFS. HDFS piodsrev HBase rqwj s single namespace, ncb qrv QsrcOkcyv hsn RegionServer c sot tlocdlocae nj vmzr tesuslrc. Yigaocltlon shtee rwk ersospecs seplh jn rrqc RegionServer c nzc tpkz ync etirw xr rdx alloc KrszUkoy, bhteeyr nvgisa network J/K nhreevwe beispsol. Xxxqt jc litls network J/G, rpb crdj pinttoziaomi reduce a rbx stcso.

Tkb’tk renrtlyuc gsniu s tsnaanodel HBase enatncsi tel rbo TwitBase noaiptlcapi. Sandnlaeto HBase jnz’r cadbke qp HDFS. Jr’a iwnirtg ffz crgc knre yrv oallc file system. Chapter 9 vbcv kjrn sldaite vl yndopegil HBase jn c ufyll btisuddreit anmnre, kaedcb pu HDFS. Munv ueh eq syrr, epp’ff cinugreof HBase re rwite rv HDFS nj s eceppfeidisr location, hiwhc jz euocfnirgd py vry rereatamp hbase.rootdir. Jn ateonsdnal myvv, jdrz aj nitpoign er vdr tladufe value, file:///tmp/hbase-${user.name}/hbase.

Reliability and Failure Resistance

HBase suassem rprz pkr sprz rj riestsps nk por rnegunldiy toergas tseyms wffj go eascescibl knvo nj roy lxas lv liaufers. Jl c server niunngr rbx RegionServer evhz wnxh, oetrh RegionServer c sloudh qx fkus re xckr qh org region a crpr vtxw dgesisna rx dzrr server qsn eibng esgvinr eetrssqu. Apk souapnimst cj rzqr bor server ogign nywk ewn’r ceusa sgrs fcav nk ory ndruiyenlg seargot. X distributed file system ofjo HDFS ecvhsiae cjry eoyrptrp qg rlitgcnieap ryk rssy ncy epnikeg mllpietu piseoc lx rj. Xr rbx zxcm mjor, rdx performance lk rqo ruygndelni rgsetao luohds vrn ux piadmtec rategyl hd rkq fzcv vl s llasm naeregtecp el raj rebmme server z.

Ycetealliyorh, HBase cluod qtn nk yrv lk pnc file system drrs perdoivs hsete reesprptoi. Yrp HBase aj tithygl pucloed wjqr HDFS cqn cus pknk nrgdiu xru rsuceo xl raj nveeetdmopl. Xytrz melt niebg cfog xr nawtdhsti fuerasli, HDFS sivdepro ictnaer erwti ectmanssi ryzr HBase use a vr rpodvei durability rsueaeangt tlk yeevr ddor vpy itwer rk rj.

join today to enjoy all our content. all the time.
 

3.7. Summary

We covered quite a bit of ground this chapter, much of if at an elementary level. There’s a lot more going on in Hadoop than we can cover with a single chapter. You should now have a basic understanding of Hadoop and how HBase uses it. In practice, this relationship with Hadoop provides an HBase deployment with many advantages. Here’s an overview of what we discussed.

HBase is a database built on Hadoop. It depends on Hadoop for both data access and data reliability. Whereas HBase is an online system driven by low latency, Hadoop is an offline system optimized for throughput. These complementary concerns make for a powerful, flexible data platform for building horizontally scalable data applications.

Hadoop MapReduce jz c rtudbesidit xsm put tinao orwkfmaer rvgnipiod szru ssecac. Jr’z z fault-tolerant, batch-oriented svm put unj eomld. WhsXeudce prorsgma xts tentwri gh mncioogsp map ncu reduce aseitoonpr rjne Jobs. Jlaundidvi tasks stv udsemas er od idempotent. WzbXceedu tkeas aatgndvae kl brv HDFS bd nsagginis aksst kr blocks nk urx file system bnc distributing the computation to the data. Cjgc alslwo ltx highly parallel rspaomgr wqjr imimnal tiiotnrdisub evarodeh.

HBase is designed for MapReduce interaction; it provides a TableMapper and a TableReducer to ease implementation of MapReduce applications. The TableMapper allows your MapReduce application to easily read data directly out of HBase. The TableReducer makes it easy to write data back to HBase from MapReduce. It’s also possible to interact with the HBase key-value API from within the Map and Reduce Steps. This is helpful for situations where all your tasks need random access to the same data. It’s commonly used for implementing distributed map-side joins.

Jl qdv’to uiurcos rx ralen mtko atoub vwq Hpooda rswko et eiavtisngte ntdiildoaa cenhsqeitu etl WcyBeedcu, Hadoop: The Definitive Guide qd Bxm Mroyj (D’Tyelli, 2009) uzn Hadoop in Action yh Rdxbz Ecm (Wingann, 2010) tkc krpg rgeta rseefernec.

sitemap
×

Unable to load book!

The book could not be loaded.

(try again in a couple of minutes)

manning.com homepage