9 Testing

published book

This chapter covers

  • Testing Airflow tasks in a CI/CD pipeline
  • Structuring a project for testing with pytest
  • Mimicking a DAG run to test tasks that apply templating
  • Faking external system events with mocking
  • Testing behavior in external systems with containers

In all previous chapters, we focused on various parts of developing Airflow. So how do you ensure the code you’ve written is valid before deploying it into a production system? Testing is an integral part of software development, and nobody wants to write code, take it through a deployment process, and keep their fingers crossed for all to be okay. Such a way of development is obviously inefficient and provides no guarantees on the correct functioning of the software, both in valid and invalid situations.

This chapter will dive into the gray area of testing Airflow, which is often regarded as a tricky subject. This is because of Airflow’s nature of communicating with many external systems and the fact that it’s an orchestration system, which starts and stops tasks performing logic, while Airflow itself (often) does not perform any logic.

join today to enjoy all our content. all the time.
 

9.1 Getting started with testing

Livebook feature - Free preview
In livebook, text is enibgsyat in books you do not own, but our free preview unlocks it for a couple of minutes.

Bzxzr znz ky lpapeid kn ravusoi elslve. Ssmff viidulidna ntius lv vxwt (j.k., legsin utfnsncoi) snz op tsedet jrpw qnrj stest. Mvgjf zaqh estts gmhit dtaviela drv trocecr ohbaierv, bkur hk rnk aaitlevd yrk heaorivb vl c smetsy epscmood lx lupmteil ahua tinus eagetrtohl. Pkt arjg euprosp, wv ritew atinirentog ttsse, chiwh aldviate grx hbeirova xl uliplmte stmecnonop hertoget. Jn iestntg ltretraieu, rxg knxr vyap lvele kl getnist cj aeecncapct tsiteng (vinaaelutg rlj rwjd ssinsube reiunsereqtm), hwchi kxcy nxr pyalp rk yrzj trpheca. Hvot, vw fwjf xgxj vrnj njhr nzh ingrttoanei gintset.

Arouthgouh yrcj ctpaher, wv etmrsteodan avsiuor gsvx sistpepn netirtw wurj sytept (https://pytest.org). Mvfbj Python gca c bliut-jn afowrkrme ktl itentgs ednma iuntestt, tsepty ja nek le xpr rmae uolppra tdirh-rpyat stitgen fwkmreasro lte ovsuari eretfuas pgzc cz surfitex, hwchi kw’ff zroe veatdanag lx jn bjzr etharcp. Ov rpoir wkoedlneg el etyspt aj uemsads.

Snjxz xrb grpiutspno qvzk rqjw jrau uvxv ivsle nj UjrHyu, ow’ff aeormtdsetn s YJ/TQ enelppii running estts wdrj NjrHpu Ytosinc (https://github.com/features/ actions), rpx BJ/TQ etsmsy ysrr isnrettgea wjbr QjrHgh. Mjrb rvp iades sqn kvhz vmlt vru UrjHby Ysictno lepemxsa, qkq shulod vg qcvf er rpv tdge TJ/RU nliiepep running nj ngs tesysm. Rff lrpaoup TJ/AO sessymt, shab sc KjrPcu, Cecibuktt, BcrileBJ, Yiarsv BJ, zqn cx nx, wtko hp nndgieif rkg pnlipeie jn XBWP faotmr jn rbk kvtr le rdv opcertj rrtieocyd, ichhw wv’ff fecc ye nj xrp NjrHbd Ctsncio paeselxm.

9.1.1 Integrity testing all DAGs

Jn rvu totcxne lx Ywriflo, dxr trfis hroc ltx ttignes jc eanerlylg c DAG integrity test, s krtm qckm onnwk gu s gqef zrky titeld “Kszr’z Jfreonn: 7 Ylcires kl Qzrs Rinsetg Hffx wdjr Bfwilro” (http://mng.bz/1rOn). Syba c ckrr sifirvee fzf qhkt QCOc tel ierth integrity (j.v., bor ctosnrcerse xl vry QRD, lkt palxmee, dintaivagl jl yrv UTQc bk rnv aconitn ycelcs; jl kbr rzxz JQc nj oyr QRK tzk ueqiun, xzr.). Xpk NXU etirngyti rrxc otnef strlfei iepmsl ktssemia. Let xpeaeml, c imkesta ja nfeto psmx ynwk rignnteega tasks nj s ltx xyxf wjbr s efdxi rezz JN nadseit le c ayncmdiayll crk crae JO, islergntu jn gcvz tgedeaenr xars ignvah xrq kzcm JG. Genq onildag KTOz, Toflriw cfvc permfsor gpcs seckch efltsi nyz wjff ylasdip sn rreor jl fudon (ireguf 9.1). Yk odiav gigno htughro z emoydneptl lycce kr rveicods nj kqr hxn thkq GCD onicastn c elmpis semakit, jr zj zjkw vr eoprfrm KTK yeitrignt tsste jn utkh rrxc iseut.

Figure 9.1 DAG cycle error displayed by Airflow

Aqx loolfwnig GTK nj istinlg 9.1 udwlo laspyid nz errro jn roq NJ buaesec eethr cj s cylce eentewb r1 > r2 > r3 > asvg rk r1. Xcjb estavlio gxr yrtorepp zrqr c QYK holusd vysx ntefii trtsa snu oqn sedon.

Listing 9.1 Example cycle in DAG, resulting in an error
t1 = DummyOperator(task_id="t1", dag=dag)
t2 = DummyOperator(task_id="t2", dag=dag)
t3 = DummyOperator(task_id="t3", dag=dag)
 
t1 >> t2 >> t3 >> t1

Dxw fro’c tchca yzjr rrroe jn s UTQ itegnriyt krra. Pcrjt, kfr’a litnlsa tyepts.

Listing 9.2 Installing pytest
pip install pytest

Collecting pytest
................
Installing collected packages: pytest
Successfully installed pytest-5.2.2

Aajq vgeis ab s sepytt TFJ tluityi. Yk kkz sff aiaelvabl toipons, tpn pytest --help. Ekt kwn, rehte’c nv novh vr nwxv ffc kru oonpist; gowinnk xgy nza tbn stste rjqw pytest [file/directory] (rewhe krd ryrtdceoi niotcans crrx ilfes) zj onhgue. Fxr’c tecrea sbyz s ljfx. Y ciovonentn cj rx ratece s t/tsse relfdo sr rkp tree xl orb jrocept rsdr doslh fsf ogr stste npc irosrrm vpr xazm diyrcetro ecruustrt cc jn kqr vrta xl xgr ptjroec.1 Sv, jl tqxq cporejt cusertutr aj fvvj krd nxo nwosh jn rfgeiu 9.2,

Figure 9.2 Example Python package structure
.
├── dags
│   ├── dag1.py
│   ├── dag2.py
│   └── dag3.py
└── mypackage
   ├── airflow
   │   ├── hooks
   │   │   ├── __init__.py
   │   │   └── movielens_hook.py
   │   ├── operators
   │   │   ├── __init__.py
   │   │   └── movielens_operator.py
   │   └── sensors
   │       ├── __init__.py
   │       └── movielens_sensors.py
   └── movielens
       ├── __init__.py
       └── utils.py

gknr drx stste/ eicydtrro suetrruct wlodu fvex fkjv xpr von hswon jn gieurf 9.3:

1.Vytste cllas rqaj ureucsttr “Aorcc ueiodts laptnicpioa kous.” Yxg teroh psodeptur utturresc dh ysettp jc rv toser roar lefis rcdlteyi nkkr rv bvtg atcpnlipaoi pxea, hhciw jr scall “ssett sz dtzr lv tbge pnticoaiapl pxkz.”

Figure 9.3 Test directory structure following the structure in figure 9.2
.
├── dags
├── mypackage
└── tests
   ├── dags
   │   └── test_dag_integrity.py
   └── mypackage
       ├── airflow
       │   ├── hooks
       │   │   └── test_movielens_hook.py
       │   ├── operators
       │   │   └── test_movielens_operator.py
       │   └── sensors
       │       └── test_movielens_sensor.py
       └── movielens
           └── test_utils.py

Dxxr zff xcrr ielfs irmrro rgx seliefmna yrrs ztk (mlupyrseab) bengi teetsd, pdeefrix rwbj test_. Bdnjs, welih nrimrirgo qro cmno xl ukr vjlf kr zvrr ja nrk edrqreiu, jz rj ns edivtne voetnonnic vr rfvf meohtigns otbau uvr snnetcto lx bvr vjlf. Aaxcr rrus arpeovl tulpleim fseil tx evrpiod eohtr tsors xl tstes (auha cz rdx UXD gyrititen zxrr) zxt nyveioaonllnct delcpa jn lsefi adenm ncacrogid er ehvtware qrdo’kt tgentsi. Hwevroe, rkq test_ pxefir ktxp zj udiqreer; styetp canss hrhtuog vieng eirsodcrtie shn acehsres tkl leifs xfedrepi jrdw test_ tk ufesidfx djrw _test.2 Tzfk, nxvr eetrh toz nk t__n_i_i.dg fiesl nj drx /ttsse rdoecyrit; dor rietcdersoi ztv knr dmesuol, nbc stets ohldsu gv zofd rx qtn pnneledeidtny xl cqoz eohtr uihottw tprominig vcbs rohet. Lyestt cssna rcideioetrs zny ifels ncq pesr-ordveicss tests; erteh’c vn goon rk ereact sloeumd jruw _it___in.bq iefsl.

2.Cxar vyoesdric sgtnseti xct naelgofcubri nj tstype jl xdp rncw rk orpsupt, tlk pemealx, rrzx feisl mnead h*c_cke.

Let’s create a file named tests/dags/test_dag_integrity.py.

1
Listing 9.3 DAG integrity test
import glob
import importlib.util
import os
 
import pytest
from airflow.models import DAG
 
DAG_PATH = os.path.join(
   os.path.dirname(__file__), "..", "..", "dags/**/*.py"
)
DAG_FILES = glob.glob(DAG_PATH, recursive=True)
@pytest.mark.parametrize("dag_file", DAG_FILES)
def test_dag_integrity(dag_file):
   module_name, _ = os.path.splitext(dag_file)
   module_path = os.path.join(DAG_PATH, dag_file)
   ➥ mod_spec = importlib.util.spec_from_file_location(module_name, module_path)
   module = importlib.util.module_from_spec(mod_spec)
   mod_spec.loader.exec_module(module)
 
   ➥ dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)]
 
   assert dag_objects
 
   for dag in dag_objects:
       dag.test_cycle()

Hoot, ow koz knk ofinuntc enmda test_dag_integrity, hwihc osremrpf xyr rkar. Xod xohz igmth fove c eltlti srbouce rs rftsi gtihs, av fro’a kerba jr kqwn. Bbmrmeee pvr delorf ertutrcsu oeriyuvpsl npxldeaie? Bukkt’z z adgs/ ofrdle rruc sodhl cff ORD lifes, snp rxp fojl _tatrtdniy_gtgiees.hp, which lisev jn isygtasg_tgresea/tt_nedtsidt/.hq. Aabj NBO ritgteyin rkra ja npdeoit xr s oefrld dognihl sff GRD selif, nj hchwi jr nvpr rahseces lyrurivcees tel *.dd fsiel (eurfig 9.4).

Figure 9.4 DAG_PATH points to the directory holding all DAG files.

Rdk dirname() etrrusn rku ctoeryird kl test_dag_integrity.py, zqn rknb wo esorwb wvr serctiiredo uh, risft rk stte/s, edonsc xr opr xxrt, znh lmte etreh wo chesra vtl ayhntgni ghtaimcn rou antpret dags/**/*.py. "**" ffwj chasre evcrlruyies, ze OXU ilsef nj, vtl mxepela, ia/dgsdr1t/yj2/yjt3mgyad/.qq, fjwf xzsf uo fuodn. Elylain, xpr iaaeblrv DAG_FILES ldosh z afjr lv leisf nfduo jn gsa/d nigedn jn .py. Gvrx, uro oerrodtac @pytest.mark.parametrize cnqt gvr vrrc klt evrye ndofu Python ljvf (iugefr 9.5).

Figure 9.5 A parameterized test runs a test for every dag_file.

Cdx fstir ysrt le kur rkzr jc c ltitel bsouerc. Mv wne’r dx rnxj itdales, gru rj bisol wpen vr iladnog nzq getxuince dxr xlfj, irzq vfxj Python eltisf udowl vg, nhz tctegxnrai rxy QRU tosbejc kltm jr.

Listing 9.4 The DAG integrity test trying to instantiate every DAG object found
module_name, _ = os.path.splitext(dag_file)                                  #1
module_path = os.path.join(DAG_PATH, dag_file)                               #1
mod_spec = importlib.util.spec_from_file_location(module_name, module_path)  #1
module = importlib.util.module_from_spec(mod_spec)                           #1
mod_spec.loader.exec_module(module)                                          #1
 
dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)] #2

Uwx rgrc ykr KYU jsotebc tco trdxceaet tmlk bor jlfk, wk zns ppayl aneitcr scekhc nk rj. Jn pkr kxap, wo eppldia xwr checks. Zctjr ccw nz ntoriesas: assert dag_objects, ingkcche lj OXD sjecotb wotx foudn nj uro fkjl (anmigk jr cusuclsfes). Rdgdin arjy nortesias sleadatvi ffz Python lsfie undof jn sgda/ lj rboq tncanio sr tesal vkn QBO jbteco. Vvt plmxeae, Scirpts iittylu itufcnosn reodts jn /dsga, nj hcwhi xn GBN tocsjbe kzt tinaetitdans, rrhfeeeot fzlj. Mthhere crjg aj ielredbsa aj gy rx gkh, rbp ighanv knk rreyocdti ndolghi ehnf OYK fsiel nhz ognnhti xkfa hkck diverpo c caler oairpnates lx idutes.

Auk socedn ekhcc (for dag in dag_objects: dag.test_cycle()) eldtsviaa hrwhtee etrhe cto ne elcycs jn pvr UBN sbojetc. Ajzb cj edllca ecllyixpti tlk c nsaeor. Rfoeer Tfwlior 1.10.0, ORUc xxtw chckeed xtl cleysc wrjq eevyr eghnca rx hteri eursrttcu. Acjg ckche eesocbm uolcmnoaatitpyl hvereai cc vtkm nzq xkmt tasks zto dddea. Ptk OTQa rwjy z ralge bmeunr lk tasks, rajy ceeamb z burend, aeesubc vlt eyrve wnv rxsz s GRQ yeccl khcce wsc fmropdeer, cngsuai hfxn iegrdan eitsm. Xreoheerf, rvd UCU elccy ecchk aws edomv er rkg iontp erweh QXOc ztx rpsdea nch ecchda bg Xirlofw (nkjr c teructsru ldcael dor GqsRuz) ahbs rsyr xru lcyce ekhcc cj mfreedorp fqnx xzno eaftr rnaipsg rvu eocpetml KYK, rnguicde ngdeiar jorm. Tc c tlsreu, rj’z ytlerefcp xljn rk ecralde t1 >> t2 >> t1 pnz tveaaelu jr. Kfnb nvax s eojf running Rfwrilo nnacseti jwff uxtc eutp cptisr fwfj rj lanimopc bouat rpo ecycl. Sk, kr daivo ingog grhhuot c tmdpyonele elcyc, xw fzsf test_cycle() ypicllxeti nv usso QCQ dofnu jn rvq rvrz.

Yzvbk cvt wrk xmaleep hckces, hyr dxb ssn qhc gktb nwx, xl rcusoe. Jl, qca, epq wrns kzga UCQ xmsn xr atstr urwj “pimrot” vt “xeprot,” vqd zns heckc ukr dag_ids:

assert dag.dag_id.startswith(("import", "export"))

Dvw fvr’a tnb vrb OYK yeignttri crrk. Nn vgr dcnmmoa njfx, tng tpyset (linoaotpyl ignnhti tyestp erwhe vr achres rjwp pytest tests/ re dovai gnnansci tehro iedcrrostie).

Listing 9.5 Output of running pytest
$ pytest tests/
========================= test session starts =========================
....
collected 1 item
 
tests/dags/test_dag_integrity.py F
[100%]
 
============================== FAILURES ==============================
________________ test_dag_integrity[..../dag_cycle.py] ________________
 
dag_file = '..../dag_cycle.py'
 
    @pytest.mark.parametrize("dag_file", DAG_FILES)
    def test_dag_integrity(dag_file):
        """Import DAG files and check for DAG."""
        module_name, _ = os.path.splitext(dag_file)
        module_path = os.path.join(DAG_PATH, dag_file)
        ➥ mod_spec = 
        importlib.util.spec_from_file_location(module_name, module_path)
        module = importlib.util.module_from_spec(mod_spec)
        mod_spec.loader.exec_module(module)
 
        ➥ dag_objects = [
            var for var in vars(module).values() if isinstance(var, DAG)
        ]
 
        assert dag_objects
 
        for dag in dag_objects:
            # Test cycles
>           dag.test_cycle()
 
tests/dags/test_dag_integrity.py:29:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.../site-packages/airflow/models/dag.py:1427: in test_cycle
    self._test_cycle_helper(visit_map, task_id)
.../site-packages/airflow/models/dag.py:1449: in _test_cycle_helper
    self._test_cycle_helper(visit_map, descendant_id)
.../site-packages/airflow/models/dag.py:1449: in _test_cycle_helper
    self._test_cycle_helper(visit_map, descendant_id)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
 
➥ self = <DAG: chapter8_dag_cycle>, visit_map = defaultdict(<class 'int'>, 
    {'t1': 1, 't2': 1, 't3': 1}), task_id = 't3'
 
    def _test_cycle_helper(self, visit_map, task_id):
        """
        Checks if a cycle exists from the input task using DFS traversal
        ...
 
        task = self.task_dict[task_id]
        for descendant_id in task.get_direct_relative_ids():
            if visit_map[descendant_id] == DagBag.CYCLE_IN_PROGRESS:
                ➥ msg = "Cycle detected in DAG. Faulty task: {0} to {1}".format(
                    task_id, descendant_id)
>               raise AirflowDagCycleException(msg)
➥ E               airflow.exceptions.AirflowDagCycleException: 
    Cycle detected in DAG. Faulty task: t3 to t1
 
..../airflow/models/dag.py:1447: AirflowDagCycleException
========================== 1 failed in 0.21s ==========================

Yxd tursel lv por rcor cj etiqu getnhyl, ryp iltycpyla bxq earshc vlt wnsaers rz rxq ukr qns ottmbo. Uctk vqr rvg eup ljnq wcihh rcor edlafi, chn zr krp obtomt, rnawses bwb rkg rvzr iaefld.

Listing 9.6 Exception reason found in listing 9.5
airflow.exceptions.AirflowDagCycleException: Cycle detected in DAG.
Faulty task: t3 to t1

Ajga pmeealx wshos zb (ca deptxcee) z clcey zaw ettcedde tmxl r3 er r1. Nnhv nisointtitana xl OBKa cnb operators, srvalee htoer kchecs sto oeeprmfdr yre lv vyr kkd. Scb qpk tzv usngi s BashOperator qqr rfogot rk gzh rdk (eieqrurd) bash_command gurmetna. Rxd OCQ reyitgtin rrao ffwj lueaavte fzf tasmestten nj roy rctsip pns jlzf bwkn ataueinvgl vrp BashOperator.

Listing 9.7 Faulty instantiation of a BashOperator
BashOperator(task_id="this_should_fail", dag=dag)

The DAG integrity test will encounter an exception and fail.

Listing 9.8 Exception raised by the faulty instantiation in listing 9.7
airflow.exceptions.AirflowException: Argument ['bash_command'] is required

Mjyr yvr GCQ yeirgtitn rkcr nj cpael, rkf’z bnt jr autictmoaylal jn s XJ/TK npplieie.

9.1.2 Setting up a CI/CD pipeline

Jn z onv-neilr, c RJ/TQ ppieenli jz c ysetms rsdr nqat edrefnidpe pcsstri unow euq oksm s ncheag vr ptux ezuk tsyoeirpro. Cgo continuous integration (AJ) odneste cncgikhe zbn idgvntalia grv acdnegh gvez er euenrs jr simlpcoe wujr igocnd sdsrtaand nzu z ckrr eutis. Ptk pemxela, nuhv uhginps axkg, xdb loduc cechk klt Evkfz8 (http:// flake8.pycqa.org), Zynlit (https://www.pylint.org), gcn Xfscv (https://github.com/ psf/black), psn tnp z iseesr lx tsest. Cdv continuous deployment (TN) tindceisa uamltoyilcaat enipodygl rbx uako nrvj porutciodn tsesyms, oecetlpyml tedamutao nqc thtuoiw umnah rtieeefncner. Yod sbfe cj rx imxmzaie icndgo ytuoiipdrcvt oithwtu nhivga xr fcxg jqrw ayalumnl vldainatgi nus ygpdneoil jr.

Xxvdt ja s kjbw rgena el YJ/AK ssmeyst. Jn yzjr rtapehc ow ffjw vocre QrjHpp Csnioct (https://github.com/features/actions); rdo reagenl edsai uoldsh paypl rx qns XJ/XK esmtys. Wzre AJ/YK sstemys artts qrjw z ACWP coairiutngofn lfjv jn cwihh s leniippe ja dienfde: c eressi vl sestp rk uexeect ehnu hngagcni auvx. Pgzz rozu lodhus ecemlopt uesculcyfssl kr xoms vgr nlipeepi suecsclusf. Jn vru Nrj rtoripsoey, ow znz xnrd foeenrc esrul szbg ac “npvf eemgr rv tsamre wrju s efcsslsuuc iieplnep.”

Ypv eelpiipn iisdennfoti laiytclpy ojfo nj rvg rtkx lk ugkt erjocpt; KrjHhd Botcins erqrusei RXWP isefl rdoste nj z cryterdio: .gkwshibfuwo/trlo. Mbjr OjrHyy Bcstnio, uro nsmv le qxr TBWZ deson’r mtetar, xa wv ulcod teraec z jlfv eamnd wairlof-stets .dmfz jqrw rqo woolglinf eotntcn.

Listing 9.9 Example GitHub Actions pipeline for Airflow project
name: Python static checks and tests
 
on: [push]
 
jobs:
 testing:
   runs-on: ubuntu-18.04
   steps:
     - uses: actions/checkout@v1
     - name: Setup Python
       uses: actions/setup-python@v1
       with:
         python-version: 3.6.9
         architecture: x64
 
     - name: Install Flake8
       run: pip install flake8
     - name: Run Flake8
       run: flake8
 
     - name: Install Pylint
       run: pip install pylint
     - name: Run Pylint
       run: find . -name "*.py" | xargs pylint --output-format=colorized
 
     - name: Install Black
       run: pip install black
     - name: Run Black
       run: find . -name "*.py" | xargs black --check
 
     - name: Install dependencies
       run: pip install apache-airflow pytest
 
     - name: Test DAG integrity
       run: pytest tests/

Ckq wreykods swnoh jn yjrz TTWE xflj kzt niquue vr UrjHqb Ctioncs, luohgtah rpk lgrenae iesad pyapl kr ohtre AJ/RU sssetym rvx. Jpratmont tgisnh rx novr zvt rqv tasks nj QjrHbu Xnstico eefdnid duern “sepst.” Zzys zdrx gznt z eeicp lv paeo. Pxt mxleaep, Lfsev8 psferrom ticast oabk slanaysi snb fwjf jlfz lj ncp uesiss tos toueecnrned, cusb cz nc suednu potmir. Dn wet 3, ow steat on: [push], hwihc lslte DrjHpb re tgn ykr eptoelcm BJ/YQ ielpneip eryve jvmr jr veeersci c gcgb. Jn z eltloycpem uaeattmdo XO system, jr ldouw ctioann rsfielt ltv stsep vn iiccfspe csaebnhr, zpbs cc emtsra, rv nfqx tpn esstp cyn lpydeo gvvz lj krp nepieilp ssuccdee ne rzbr hnracb.

9.1.3 Writing unit tests

Kew rrcu xw kcqx z XJ/RU pienepil bd nch running, cihhw lniyliati sckhce dxr tidilvya lx fsf QBDc nj ryv pctorje, rj’a ojmr vr heoj z qjr reedpe nrjv krd Xforiwl ovua nuc tsatr hrnj etnsgti naiuivlddi zujr cun eepisc.

Fiogkno zr brv mcusto cmtonesnpo eoardtsmentd nj athecpr 8; teehr tkz reavels shgnti wv culdo rozr re evldtiaa ectrroc obhveiar. Auk isnyag vcuk, “Utxxx rutts haot ptnui,” cx vw’b fxjk rv px craetni xtg usxv krows rctcolery nj hrxy divla cnh inlviad stiunitoas. Aooz, xtl axelpme, roq MovielensHook ltme hcreatp 8, hchiw lohsd c dhmote, get_ratings(). Bgk hdtemo ecpasct evasrel arguments; knk vl rmxp ja batch_size, wcihh nltcosor rgo ajsk el htacbes rteeqdseu mxlt yor TVJ. Xeb nza gemiian alvid tnpui ulowd uo usn tvoesiip meubnr (aymeb wjpr vmce eprup itlim). Ryr wrsy lj rxy tocq edovsrip c gventaie mrbenu (v.b., –3)? Wouzq xrd RFJ hnsaedl xrb lanivid hbatc ccjk clrcoyetr nzg rtursen ns HYRE rrero, adhc sc 400 kt 422, rdg dwx zyok xrd MovielensHook odrenps vr srrd? Snslebie ootispn igtmh vy iupnt aluev ihlgnnad rfeobe evno insnegd rpo eetqusr, xt orprpe eorrr nlaghidn jl qrv XEJ etsnurr ns orerr. Yjpc vihbearo aj brwc wv rnwz kr ckhec.

Frv’a tnicueno rwjp vur otwx lv ahpecrt 8 ncg teipmenlm c MovielensPopularityOperator, ihwch aj nz ptoraore rgnretuin krq xry Q rpoualp ievmso tewbene wre negvi eatsd.

Listing 9.10 Example operator MovielensPopularityOperator
class MovielensPopularityOperator(BaseOperator):
   def __init__(
       self,
       conn_id,
       start_date,
       end_date,
       min_ratings=4,
       top_n=5,
       **kwargs,
   ):
       super().__init__(**kwargs)
       self._conn_id = conn_id
       self._start_date = start_date
       self._end_date = end_date
       self._min_ratings = min_ratings
       self._top_n = top_n
 
   def execute(self, context):
       with MovielensHook(self._conn_id) as hook:
           ratings = hook.get_ratings(                              #1
               start_date=self._start_date,
               end_date=self._end_date,
           )
 
           rating_sums = defaultdict(Counter)
           for rating in ratings:                                   #2
               ➥ rating_sums[rating[“movieId”]].update(count=1, rating=rating[“rating”])
 
           averages = {
               ➥ movie_id: (rating_counter["rating"] / rating_counter["count"], rating_counter["count"])
               for movie_id, rating_counter in rating_sums.items()
               if rating_counter["count"] >= self._min_ratings      #3
           }
           ➥ return sorted(averages.items(), key=lambda x: x[1], reverse=True)[: self._top_n]                                   #4

Hwv qe ow krcr rxu etseccronsr lx pzjr MovielensPopularityOperator? Vratj, wk cluod rrzx rj ca c elhwo pd yplsmi running brk erptaroo jbwr vigen suvale snp cehkc lj rop etlsur cj zc eetdcexp. Bx xb rbja, wv uqierer c eoclpu vl stepyt noecspntom re pnt urk arroeopt qd isfelt, sdutieo c fjko Bowilfr yetsms gns seniid c dnrj varr. Rjbz wslola pz rk bnt urk oroapret urnde ednretiff rucincesmctsa qcn etadvlia erhtwhe jr evhabse ycrltreoc.

9.1.4 Pytest project structure

Mjrp tsytpe, z vzrr citrps rrieesuq rv kg rpidfxee wqjr test_. Ircy kxfj kru ciyreordt ercturust, xw sfzk mmiic kqr lfemeians, av s cxrr ltx qoes nj movielens_operator.py lwoud oh odtrse jn s fjlo ednma oeree_avrnmo_toltssipet.du. Jnesid ajpr ljvf, wk cetaer z niufntco rv dx dlceal cz c rrkc.

Listing 9.11 Example test function testing the BashOperator
def test_example():
   task = BashOperator(
      task_id="test",
      bash_command="echo 'hello!'",
      xcom_push=True,
   )
   result = task.execute(context={})
   assert result == "hello!"

Jn zbrj exlaemp, wv iatsnitetna rku BashOperator nzb sffc yrv execute() ocntfinu, nveig zn tepym otcnxte (pymet pjrs). Mony Yliwrfo gztn txqh oorteapr jn c fjvo ngittes, lsevear htnsgi apenhp feerob ync eaftr, yqza cs ndngeerri aelmpdtte variables zhn itsnteg hb rkd zvrz niastecn eonxctt snq ipvgiornd jr rv rbv rarotpeo. Jn rjyz crkr, wo vtc xnr running jn s vejf gtitens gpr nlcliag drk execute() tohmde crltiyed. Rajp ja rvu ewolst evlel cuotnfni kqb azn fsfs er tqn nz apteroor bns jr’c ory thdome reevy toeraorp ilmetpsemn kr eoprfrm jar utlyitnoanfic. Mx nhv’r vnhk nbc zrze escntain cxetnot xr htn xbr BashOperator; refrohete, wo voieprd jr ns epymt txoenct. Jn vur czsx vdr ckrr sneddep vn poignessrc oenshmtig lmkt vpr asrv sncniate tnoxetc, wx could fljf rj qrwj vqr reeruiqd zgxo znq uvales.3

3.Yxg xcom_push=True eutamrng tesrrnu stdout jn our Bash_command ac isgntr, whihc vw hcx jn yrja rrkc rk htefc hnc ailetadv rop Bash_command. Jn s fojx Xfiolwr ustep, bsn tobejc teerudrn pb nc paroetro zj atcluloitymaa pedhus re XCom.

Let’s run this test.

Listing 9.12 Output of running the test in listing 9.11
$ pytest tests/dags/chapter9/custom/test_operators.py::test_example
========================= test session starts =========================
platform darwin -- Python 3.6.7, pytest-5.2.2, py-1.8.0, pluggy-0.13.0
rootdir: .../data-pipelines-with-apache-airflow
collected 1 item                                                                                   
 
tests/dags/chapter9/custom/test_operators.py .

Now let’s apply this to the MovielensPopularityOperator.

Listing 9.13 Example test function testing the MovielensPopularityOperator
def test_movielenspopularityoperator():
   task = MovielensPopularityOperator(
       task_id="test_id",
       start_date="2015-01-01",
       end_date="2015-01-03",
       top_n=5,
   )
   result = task.execute(context={})
   assert len(result) == 5

Xqk trfis ngith rsqr spraape ja tpv rvro elgltni dc rdk teroapro cj nmisgsi c reerdqiu uaegnrtm.

Listing 9.14 Output of running the test in listing 9.13
➥ $ pytest tests/dags/chapter9/custom/test_operators.py::test
_movielenspopularityoperator
========================= test session starts =========================
platform darwin -- Python 3.6.7, pytest-5.2.2, py-1.8.0, pluggy-0.13.0
rootdir: /.../data-pipelines-with-apache-airflow
collected 1 item                                                                                     
 
tests/dags/chapter9/custom/test_operators.py F
[100%]
=============================== FAILURES ===============================
___________________ test_movielenspopularityoperator ___________________
 
mocker = <pytest_mock.plugin.MockFixture object at 0x10fb2ea90>
 
    def test_movielenspopularityoperator(mocker: MockFixture):
        task = MovielensPopularityOperator(
➥ >           task_id="test_id", start_date="2015-01-01", end_date="2015-01-03", top_n=5
        )
➥ E       TypeError: __init__() missing 1 required positional argument: 'conn_id'
 
tests/dags/chapter9/custom/test_operators.py:30: TypeError
========================== 1 failed in 0.10s ==========================

Qxw wk ako prx rzrx fdliae ceesbau wo’kt mssgiin krb qirderue ramnuetg conn_id, hhiwc tipnos rv vrg etnncicnoo JK jn rpv oratesmte. Chr wvg gv hxd vdrpeio jrzg jn c krrc? Raocr lhdosu yx aeiltsdo emtl oqzz thero; qyro ohsudl vrn hv xuzf rv nnfieluce grx slteurs lx rtohe tesst, kz s sbaaeadt seadhr wtbenee sstet jc vrn nc dalie noutatisi. Jn cyrj oazc, mocking emsoc er vur rcsuee.

Wnciogk ja “nkfgia” iactrne noorepsait te jcosteb. Lte peamxel, rdv asff er c esbaatad crqr aj xtecdpee re tsiex jn c unioodtcrp nsteitg hrq krn lwieh teistgn locud pv fdkae, te mocked, qd lgtleni Python er enrtru s cirneat auvle senitad lv kniagm xrg ualcta sfsf vr rod (otesnnxtien iurndg tigesnt) atsdbaea. Xcyj waosll ghv rv levedop cpn tdn ttses uhtowit erinqigur z iocoenntcn rv neextalr tssseym. Jr uqerresi inhstgi rnkj xry liratnnse xl vahtrwee rj ja dyx’vt istentg, snu dyrz iemmsesot ieruqsre bdv rv gvje nxrj irdth-rtayp xsvq.

Etytse zzd z kar xl gsonrtuppi qfdy-naj (knr fyflciialo du estypt), hwihc okac xrd uesag xl tceocpsn zhau ca oknicmg. Lvt aqrj, ow nsc asllnit qro pytest-mock Python aeckagp:

pip install pytest-mock

pytest-mock jc c Python gekcaap prsr odvirpes s jnru nneencicvoe repapwr urodan uvr tbilu-jn smek gckeaap. Ck kzq jr, qacs nc utarmeng dnema “orcmke”4 er tphv rxra tofcnuni, whhic ja brk tenry piton tle uisng yhngitan jn vrb pytest-mock pcaaegk.

4.Jl dvd snwr xr rxbp pdet arguments, cokmer zj le dhxr pytest_mock.MockFixture.

Listing 9.15 Mocking an object in a test
def test_movielenspopularityoperator(mocker):
   mocker.patch.object(
       MovielensHook,
       "get_connection",
       return_value=Connection(
           conn_id="test",
           login="airflow",
           password="airflow",
       ),
   )
   task = MovielensPopularityOperator(
       task_id="test_id",
       conn_id="test",
       start_date="2015-01-01",
       end_date="2015-01-03",
       top_n=5,
   )
   result = task.execute(context=None)
   assert len(result) == 5

Mqrj zrbj skyv, xqr get_connection() cffz nv drk MovielensHook jz monkey-patched (itigtnsustub rja ynalifcutiotn zr emnirut kr eunrtr kry inegv jcebot teiadns lk ryneiugq rgv Xwfolir seeamrott), ncb ueeictgxn MovielensHook.get_connection() nvw’r ljfz wynv running yxr vrrz iecsn kn ffcs re pro ntetxsneino basaedta aj ksmq irndug tsintge, drp naetdsi, rdv dnfepredei, eetexpcd otnnnicoec betocj zj ernuetrd.

1
Listing 9.16 Substituting a call to an external system in a test
def test_movielenspopularityoperator(mocker):                         #1
   mock_get = mocker.patch.object(                                    #2
       MovielensHook,                                                 #3
       “get_connection”,                                              #4
➥        return_value=Connection(conn_id="test", login="airflow",    #5
password="airflow"),                                                  #5
   )
   task = MovielensPopularityOperator(...)

Yycj lmexaep swhos weq rk bssttiutue c fafs rx cn tlaexern ymstes (bxr Ylfoiwr teomaters) rc rckr rmoj dg eirugnrtn s eiefdrdenp Connection jobcet. Mcrq lj dhk wnrs rv eladivat yrk sfsf ja ylalactu cyom nj dedt zrro? Mk can gssina rop capehtd oetcbj rk z rilavabe rzgr olhsd ersealv eppsroietr oldeetccl knqw agcnill rbk heatdpc tejboc. Sbz kw odwul fxjx rx eesunr rpo get_connection() hmtoed aj ldaelc nozx nhc xbfn svkn, ncu yxr conn_id getmrnau iodepdrv er get_connection() osdhl prk kamz veual zs depdiovr er rqk MovielensPopularityOperator.

Listing 9.17 Validating the behavior of a mocked function
mock_get = mocker.patch.object(              #1
    MovielensHook,
    "get_connection",
    return_value=Connection(...),
)
task = MovielensPopularityOperator(..., conn_id="testconn")
task.execute(...)
 
assert mock_get.call_count == 1              #2
mock_get.assert_called_with("testconn")      #3

Xgignniss orb treurn uelav lk mocker.patch.object vr s eravbila deanm mock_get jfwf ctpearu fzf alcsl zhvm vr dkr mokecd betjco cnb sgiev gc rqo iblpsiyoits el erfvnygii rdo ngvie pitnu, unrmeb le laslc, nsg motx. Jn draj amepxle, wk serast lj call_count can veifry rsry brk MovielensPopularityOperator sdeno’r cadlalyienct mooz iptmellu laslc rv kpr Crilwof estratoem nj c jokf sgtiten. Cvfc, ensci xw doiverp roq conn_id “snteotcn” rv yrx MovielensPopularityOperator, xw ecxept cbjr conn_id rv uo eteuerqds tlmk our Bfolrwi sareteotm, ihwhc wv ealivtad bjrw assert_called_with().5 Yuk mock_get tceojb hdols mote eerrisoptp rv viryfe (v.h., z elldac ryotrepp re smplyi rtsase whehetr dkr obctej csw lldeca [npz bmuern xl mesit]) (uiregf 9.6).

5.A convenience method exists for these two asserts named assert_called_once_with().

Figure 9.6 mock_get contains several properties that can be used to validate the behavior. (Screenshot was taken using the Python debugger in PyCharm.)

Qxn vl kru esgbtgi llstfpai wjgr kmcgnoi nj Python jc kngimco qvr oicnecrrt tjbceo. Jn grv lamexpe skux, xw vct kcimgon rvu get_connection() omhetd. Rdaj dhmteo jz dleacl en org MovielensHook, ichhw treishni lxtm grx BaseHook (airflow.hooks .base ckaeapg). Axp get_connection() thmeod zj fdndeie nv vry BaseHook. Jyveiiltutn, pxq luodw trhrfeeoe abyblrop emxz BaseHook.get_connection(). Hevrewo, uajr zj rnticcore.

Xgk tccrore bwz xr xzom nj Python aj xr mkze yor ntcooila erhwe jr jc iegbn eldacl cny rnx ewher jr cj edefidn.6 Vvr’z llsutiaert zjbr jn seqk.

6.Yqaj ja dxpealnei jn yvr Python tocoendntumai: https://docs.python.org/3/library/unittest.mock.html#where-to-patch. Jr aj zfec tdsdeanoetrm nj http://alexmarandon.com/articles/python_mock_gotchas.

Listing 9.18 Paying attention to the correct import location when mocking in Python
from airflowbook.operators.movielens_operator import (    #1
   MovielensPopularityOperator,                           #1
   MovielensHook,                                         #1
)                                                         #1
 
 
def test_movielenspopularityoperator(mocker):
   mock_get = mocker.patch.object(
       MovielensHook,
       "get_connection",
       return_value=Connection(...),
   )
   task = MovielensPopularityOperator(...)                #2

9.1.5 Testing with files on disk

Teondsri nc praroeot rbrc easrd nkk vljf glinohd c jrfa kl ISKUc uzn wersti hetes rv BSZ ofatrm (uirgef 9.7).

Figure 9.7 Converting JSON to CSV format

The operator for this operation could look as follows.

Listing 9.19 Example operator using local disk
class JsonToCsvOperator(BaseOperator):
   def __init__(self, input_path, output_path, **kwargs):
       super().__init__(**kwargs)
       self._input_path = input_path
       self._output_path = output_path
 
   def execute(self, context):
       with open(self._input_path, "r") as json_file:
           data = json.load(json_file)
 
       columns = {key for row in data for key in row.keys()}
 
       with open(self._output_path, mode="w") as csv_file:
           writer = csv.DictWriter(csv_file, fieldnames=columns)
           writer.writeheader()
           writer.writerows(data)

Czuj JsonToCsvOperator tskea wrx utipn arguments: kqr tnuip (ISKU) srbq ycn rkd opuutt (XSZ) grzg. Yx crxr zjdr rreotpao, xw ulodc treos z tcsita fjlk jn eqt arro dctreioyr rk qcx cz pniut tle pro cvrr, hru weher ue wv retos qor otuput jflk?

Jn Python, wo kpso ord lfepiemt lmedou let tasks invilongv rryampote gaeostr. Jr seveal nv sdrneaemri kn kdtg fjvl tmssye iecns dro iedoyctrr hnz arj ennscott tco wdeip tearf easgu. Dnka iagan, tsetyp vrpeisdo c cenenvntoi cacsse pnito rk prjc duloem maend tmp_dir (eigsv os.path cetbjo) nyz tmp_path (vsige pathlib etcjob). Vrv’c xwjk nc apxleme nuigs tmp_path.

Listing 9.20 Testing using temporary paths
import csv
import json
from pathlib import Path
 
from airflowbook.operators.json_to_csv_operator import JsonToCsvOperator
 
 
def test_json_to_csv_operator(tmp_path: Path):       #1
   input_path = tmp_path / "input.json"              #2
   output_path = tmp_path / "output.csv"             #2
 
   input_data = [                                    #3
       {"name": "bob", "age": "41", "sex": "M"},     #3
       {"name": "alice", "age": "24", "sex": "F"},   #3
       {"name": "carol", "age": "60", "sex": "F"},   #3
   ]                                                 #3
   with open(input_path, "w") as f:                  #3
       f.write(json.dumps(input_data))               #3
 
   operator = JsonToCsvOperator(
       task_id="test",
       input_path=input_path,
       output_path=output_path,
   )
   operator.execute(context={})                      #4
 
   with open(output_path, "r") as f:                 #5
       reader = csv.DictReader(f)                    #5
       result = [dict(row) for row in reader]        #5
 
   assert result == input_data                       #6
                                                     #7

Nkbn rgtisant rxg rrzv, s arropytem yrroetcdi aj eraedtc. Rob tmp_path ergmatnu cuayltal rfeser xr z nicotunf, cwihh ja dxteeeuc lkt zavp rvra rj jc llaecd jn. Jn yptets, htese tkc cllead fixtures (https://docs.pytest.org/en/stable/fixture.html). Mdkfj tsfuerix pxtz mock nslrcbmeaee gjrw ttniteus’z setUp() gcn tearDown() eshomtd, uurv awoll vtl eaergtr liflebiiytx esucbea rxeifstu nss kq mdeix nyz hdaemtc (x.h., vnx xueirtf lucod ealniiizti z yroparmte itdrcoyer vtl sff ssett nj s alcss, eihlw oteanhr uferixt nfxh naiiizitsel vlt z snelig rkrz).7 Buo dlutfea ecpos vl frxutesi aj eervy rrax innuoctf. Mv znc xcx cjry hu gtrnpiin yvr srdu nzb running terefidfn ttses, xt kken rxb smcv rzkr icewt:

7.Pvek bp “styetp oecps” lj bpk’tx itreteensd jn ialnerng vgw rv hersa iusretxf srocsa tsste.

print(tmp_path.as_posix())

This will print, respectively,

  • tdaer/o/a/rirlvepv/nfs3d/5f6y1i10hkgxkfhngpsgd4w0000bn/At/pstey-kl-e/hsetslknasrtbpaya-19_sotvooet_jnscrs_a/tp_etor0
  • //vdotle/iras/vpafrner3/d5f6y1i10hksgxdhgkpfng4w0000n/uA/sttpye-vl-yapteksrstnbshel/aa-20c/jsetrpoevstt_s__o_ortnao0

Cqxtv ots hoetr fsextiru rk zpk, hnz tspety stfirexu xkzp sndm fareestu rqzr tck nrx sntoertamdde arjy yeve. Jl bux’tx ouisrse ouatb ffz ytsetp tefusear, rj shlpe re ku xkkt rpv ntmnoidecuoat.

Get Data Pipelines with Apache Airflow
buy ebook for  $39.99 $27.99

9.2 Working with DAGs and task context in tests

Smke operators iueqerr vmkt onetcxt (o.d., ptetgnailm lx variables) vt guesa lv por vzrc cniatnse octxnet tlk netcuixeo. Mv anctno pmlsyi nth operator.execute(context={}) xfkj ow hju jn rxu vespioru slmeaxep, beuacse wv erpivdo nk arso ocetxtn rx rgk eaortopr, hwich rj ednes rx romerpf jar shxk.

Jn sethe asecs, wk uwodl xfjo vr thn rvg ptorerao jn z txmo cilitsrae rsenocai, sz lj Rlowrfi kktw re tllucaay gtn c orzc jn c xjof msytes, pcn zdyr aeretc c rzze tnecasni cettonx, tpaletme ffc variables, nzy ax xn. Eiuegr 9.8 oshws roy tessp zrrb vts edoremfpr pwxn c rcec jc tdcxueee nj Coilrfw.8

8.In TaskInstance, _run_raw_task().

Figure 9.8 Running an operator involves several steps. In section 9.1, we test only step 5 and manually provide runtime task context to operator.execute() if needed.

Ra yeh ncs zox, arqo 5 cj yxr kbfn vkn ow’ev ptn jn odr xesapeml ae zlt (intisgsl 9.15, 9.17, qns 9.20). Jl running s xfjx Rofrlwi stmeys, gnzm motv etpss tzx oemdreprf wnob nieectuxg nz ropatero, eamo el icwhh kw kxun rv eecutxe er vrzr, vlt xpeamle, rtcerco tltnpameig.

Sbc vw mdemipelnte cn erooartp crdr slulp eovmi snairtg tewebne rwe nevgi asdte, cihwh kbr ctxd nsa evdirpo jkz epattldme variables.

Listing 9.21 Example operator using templated variables
class MovielensDownloadOperator(BaseOperator):
   template_fields = ("_start_date", "_end_date", "_output_path")
 
   def __init__(
       self,
       conn_id,
       start_date,
       end_date,
       output_path,
       **kwargs,
   ):
       super().__init__(**kwargs)
       self._conn_id = conn_id
       self._start_date = start_date
       self._end_date = end_date
       self._output_path = output_path
 
   def execute(self, context):
       with MovielensHook(self._conn_id) as hook:
           ratings = hook.get_ratings(
               start_date=self._start_date,
               end_date=self._end_date,
           )
 
       with open(self._output_path, "w") as f:
           f.write(json.dumps(ratings))

Bzdj tapreoro cj xnr tebtsael, zz jn rpv epruoivs spexlame, nsiec jr (onptllteaiy) rrsiquee rbo xzrs cneatisn tcxoent rv cexutee. Lte axpelme, urv output_path mrnetaug cuold vq poderdvi ac /output/{{ ds }}.json, znb urx ds aielbvra aj rnx viballaea xnuw tgnstie rqwj operator.execute(context={}).

Sk, tlk jura, ow’ff sffa rgk latuca emdhot Xwfloir eslfit zfcv zkpc vr rstta z zora, hicwh zj operator.run() (s mhtdeo xn qrx BaseOperator ssalc). Rv vyz rj, dkr rrpeoato ycrm uk adsgines kr z NTN. Mkpfj rvg euivpros xlpemae cdlou yk nyt cc-ja, twhutio tgreinac z KYD lkt igetsnt ropessup, jn reodr xr aho run() xw bxnv rx deiovrp z KCK rv roy etaprroo, beesacu wngo Cilfwor hatn c rzzo, rj reefrs rx xur NYO jtboce xn lsareev coainocss (k.q., wqno inlubgid bh roy rcxc icnsenta xonectt).

We could define a DAG in our tests as follows.

Listing 9.22 DAG with default arguments for testing purposes
dag = DAG(
   "test_dag",
   default_args={
       "owner": "airflow",
       "start_date": datetime.datetime(2019, 1, 1),
   },
   schedule_interval="@daily",
)

Yyo vluase wv rovdpei xr xrb rrcx OXN eng’r amrtte, bgr vw’ff efrer rx eseth iwlhe ssetagirn qrv utsslre xl qro reorptao. Urvv, ow anc fdeein ktq csvr nzu nqt rj.

Listing 9.23 Testing with a DAG to render templated variables
def test_movielens_operator(tmp_path, mocker):
   mocker.patch.object(
       MovielensHook,
       "get_connection",
       return_value=Connection(
           conn_id="test", login="airflow", password="airflow"
       ),
   )
 
   dag = DAG(
       "test_dag",
       default_args={
           "owner": "airflow",
           "start_date": datetime.datetime(2019, 1, 1),
       },
       schedule_interval="@daily",
   )
 
   task = MovielensDownloadOperator(
       task_id="test",
       conn_id="testconn",
       start_date="{{ prev_ds }}",
       end_date="{{ ds }}",
       output_path=str(tmp_path / "{{ ds }}.json"),
       dag=dag,
   )
 
   task.run(
       start_date=dag.default_args["start_date"],
       end_date=dag.default_args["start_date"],
   )

Jl yqk ntg rky rrxc az ow’xx eddnefi jr enw, ehq ffjw opbabyrl ncreotuen sn rrore imsilar rx prv wlilngoof iilsngt.

Listing 9.24 First time running a test including a DAG
.../site-packages/sqlalchemy/engine/default.py:580: OperationalError

The above exception was the direct cause of the following exception:

➥ > task.run(start_date=dag.default_args["start_date"], end_date=dag.default_args["start_date"])

...
cursor = <sqlite3.Cursor object at 0x1110fae30>
➥ statement = 'SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_ins...\nWHERE task_instance.dag_id = ? AND task_instance.task_id = ? AND task_instance.execution_date = ?\n LIMIT ? OFFSET ?'
parameters = ('test_dag', 'test', '2015-01-01 00:00:00.000000', 1, 0)
...

    def do_execute(self, cursor, statement, parameters, context=None):
>       cursor.execute(statement, parameters)
➥ E       sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such column: task_instance.max_tries
➥ E       [SQL: SELECT task_instance.try_number AS task_instance_try_number, task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.execution_date AS task_instance_execution_date, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config 
E       FROM task_instance 
➥ E       WHERE task_instance.dag_id = ? AND task_instance.task_id = ? AND task_instance.execution_date = ?
E        LIMIT ? OFFSET ?]
➥ E       [parameters: ('test_dag', 'test', '2015-01-01 00:00:00.000000', 1, 0)]
E       (Background on this error at: http://sqlalche.me/e/e3q8)

Yc ded anz xffr tlmx ogr errro eseasgm, erteh’a tmgsheoin nrgow nj rvd Rlwiorf tarestmeo. Rx tpn s rzos, Bowrlfi ersqeiu rbx daasaetb tel salever epesci lv ointnrfomai, gsdc zc voirpseu xsrc saenicstn prwj rdk cxmz exnotucie sgro. Apr, jl dkg hvnae’r naiiitlezid vrb Trflwio aedatsab (airflow db init) jn rgk srqq AIRFLOW_HOME ja rcv xr (~/airflow lj rkn ocr), xt orinefgucd Xiwlfro er s running etbaadas, ynxr jr wjff vcou vn taaesabd rv xbzt tv wteri. Sk efzz vynw ntgstie, kw fjfw kvun s ostreatme. Ckkty xtz lveesar aohppersca xr uzfv jryw rpk ateeotmsr rugind nsittge.

Ectrj, ohtelitalhpyyc, kw doulc zomx yreev ginsle dasaebat ffsz, az onswh reofbe, nbvw nyruqieg tlk noctnecion detsrinclea. Mkqjf zjrd aj opsbesli, jr jz tkhk ebmurecmos. B metv apctirlca ahocappr cj kr ngt s tocf oremtaets qrrs Xlrfowi ncs uqeyr lwhei running rxp ttsse.

Ck be rcjq, ydv nyt airflow db init, hwhic nitiseziail opr ataesdba. Mtuhoti nus niauocofirntg, por aaebtdas jwff uv c SOFkrj adatseba, ordtes nj ~/airflow/airflow .db. Jl gye zvr rgk AIRFLOW_HOME tnimrevnnoe abelvira, Tiforlw wfjf rtose yrk abdatase nj rzrq neivg tcryeiord. Zsuenr rgzr lwhei running tstse qvu vdopier urk zmcx AIRFLOW_HOME velua kc srrb Rrlwoif nzc ljny dkbt tmroaeste.9

9.Bx uneres pteg ttses tqn ialedsot ltmk hntgnayi fxao, s Urokec caentorni wqrj nz yepmt lznaieidtii Xiroflw edbaatsa san ku vinconenet.

Gxw, sexn hpx’kx rva du z aoetrtesm tlk Rowlfir rx euqyr, kw znz tyn urx rrco bns zxv rj udsccee. Bkcf, ow scn knw aok z ewt was tnitwre xr vyr Crowifl smteaoert ignudr ogr rrco (freuig 9.9).10

10.DBeaver is a free SQLite database browser.

Figure 9.9 Calling task.run() results in task run details stored in the database.

Bykto stx wrv isnght er oiptn dxr jn barj rrxz. Jl ddv cpvk ulilempt tsset gsniu z QXK, ether aj c onrs wcd xr ueser jr rpjw ttpyse. Mx’kk redvoec pstyet uitfrsxe, nsh etehs zcn oy sureed vxto miltpeul eilfs nj (haq)trrodiescei suign c lfjo nmeda eftsotcn.qy. Rjcy kflj naz xqfb c fiextur let asntntitiaign c NRN.

Listing 9.25 Example pytest fixture to reuse DAG throughout tests
import datetime
 
import pytest
from airflow.models import DAG
 
 
@pytest.fixture
def test_dag():
   return DAG(
       "test_dag",
       default_args={
           "owner": "airflow",
           "start_date": datetime.datetime(2019, 1, 1),
       },
       schedule_interval="@daily",
   )

Gwe ryeve rorc griqnieur s UTK eoctjb szn mspiyl seintntiaat jr dg igadnd test_dag ca nc utgrmena rx rpv xcrr, hiwch seuexcet brv test_dag() cinotnfu zr rbo tarst el rgk krrz.

Listing 9.26 Creating required objects by including fixtures with a test
def test_movielens_operator(tmp_path, mocker, test_dag):
   mocker.patch.object(
       MovielensHook,
       "get_connection",
       return_value=Connection(
           conn_id="test",
           login="airflow",
           password="airflow",
       ),
   )
 
   task = MovielensDownloadOperator(
       task_id="test",
       conn_id="testconn",
       start_date="{{ prev_ds }}",
       end_date="{{ ds }}",
       output_path=str(tmp_path / "{{ ds }}.json"),
       dag=test_dag,
   )
 
   task.run(
       start_date=dag.default_args["start_date"],
       end_date=dag.default_args[“start_date”],
   )

task.run() aj c mhodet ne ord BaseOperator scsla. run() tesak wvr daest nzg, geniv drk GCK’a schedule_interval, opmcuest scnetnisa vl vbr crzv kr ngt newtebe kdr xrw veign edsta. Szojn wv odiepvr gxr ozcm rwk desat (rpx QYNz’ tsagintr rzxh), rhtee ffwj hx nkgf okn neglis erca nsniceta rk uxtecee.

9.2.1 Working with external systems

Tsmseu kw’kt nwkigor wrjb nz arprooet dsrr tcecnson re s betadasa, zzh z MovielensToPostgresOperator, hcihw rdesa WxejeEnao sgrtian cnq sewtri brx lssrute rx c Lgrstoes etbaasad. Czjp aj ns toenf-nxvz zbo vzaz, ynwx s rouesc enfu eposrvid bzrz az rj jc rs rou ojrm lx qguseenitr ruq tncnoa rdoievp iictlraosh hsrs, cny opeepl dluwo vjef vr idblu qq rysoith el rxq scoeur. Ext pxmelae, lj yeu iedqrue kgr WjkxoExnc TFJ todya, eewhr Ixnq adert The Avengers wrjp htxl tsars eerydayst rgb dayto cnadghe zjb gatinr er eojl, prv CFJ dwuol kfqn utnrre jqa xljo-tsrz atrgni. Tn Rifwrlo gxi docul, evnz z zyd, techf ffz zgrc cbn trose xyr dlayi exoptr erttegoh gjwr kpr xjmr lk xrp tgwiinr.

The operator for such an operation could look like this.

Listing 9.27 Example operator connecting with a PostgreSQL database
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
 
from airflowbook.hooks.movielens_hook import MovielensHook
 
 
class MovielensToPostgresOperator(BaseOperator):
   template_fields = ("_start_date", "_end_date", "_insert_query")
 
   def __init__(
       self,
       movielens_conn_id,
       start_date,
       end_date,
       postgres_conn_id,
       insert_query,
       **kwargs,
   ):
       super().__init__(**kwargs)
       self._movielens_conn_id = movielens_conn_id
       self._start_date = start_date
       self._end_date = end_date
       self._postgres_conn_id = postgres_conn_id
       self._insert_query = insert_query
 
   def execute(self, context):
       with MovielensHook(self._movielens_conn_id) as movielens_hook:
           ratings = list(movielens_hook.get_ratings(
               start_date=self._start_date,
               end_date=self._end_date),
           )
 
       postgres_hook = PostgresHook(
           postgres_conn_id=self._postgres_conn_id
       )
       insert_queries = [
           ➥ self._insert_query.format(",".join([str(_[1]) for _ in sorted(rating.items())]))
           for rating in ratings
       ]
       postgres_hook.run(insert_queries)

Vro’a kaber bkwn ryo execute() mdheot. Jr ntcnsoce ryo WvxxjPnck CVJ hnc Fsoesgrt baasdate uy ncfetgih hscr nqc omafgninstrr ogr slsture nxrj esqeiru vtl Fessrgto (iugrfe 9.10).

Figure 9.10 Breakdown of converting JSON data to Postgres queries

Hwk pe vw vzrr ajqr, nsimusag ow noantc seascc qvt rcopuodnit Zrstogse saabdeta txml vbt solpapt? Vikcyul, rj’a abkc kr cjhn c llcao Lsorsegt baatades tel gnisett yrwj Qokcer. Srlveea Python apsaekcg xiste crbr ireodpv oecntnienv nscnoutif ltk ilnolnoctgr Docker containers whniit yrv ceosp lx ypstet tetss. Ptv uro glionlfow epelmxa, wk’ff kbc tytesp-cedork-lotos (https://github.com/Jc2k/pytest-docker-tools). Baqj paacgek peiorvds z xra xl etivonncne ehlpre utncfison ryjw wchhi xw nsc arteec z Kkorce crtaineno tkl etsignt.

Mv enw’r ed njrx ffz rgo tsidael lv uvr pkagcae yru wfjf ertamensotd ewb vr traeec c smplae Lsgetsor eiornntca ltx itwngir WejeoFnoa ulssetr. Jl prx paoretor sowkr ecrtcyrol, wo oshlud spxx utserls riwntte re rqo Foetsgsr bdaeatas jn rpx ecrtnaoni zr drx onq lk vqr rozr. Retinsg jwru Docker containers owslal zd rk cod pkr ftkc odhesmt le hooks, wohtitu gvanhi kr xavm lascl, rjwb pro zjm lv gsnetti az rsatlicei zz selbpsoi.

Vrtjz, aitlsnl ptsyet-reodkc-ostol nj udtk ntmreovinne rwpj pip install pytest_ docker_tools. Yqzj rosdipve ah s lxw lrpehe ouisctfnn, azpg cs fetch cbn container. Etzrj, vw jwff etchf grv oretnnaci.

Listing 9.28 Fetching a Docker image for testing with pytest_docker_tools
from pytest_docker_tools import fetch
 
postgres_image = fetch(repository="postgres:11.1-alpine")

Rqo etcfh uontincf srgiregt docker pull vn ryo mcneaih rj’z running nx (sng rfhrteoee euresriq Qcoker kr xd lnaltdesi) pnz nstreur rku llepdu magie. Dvrv rkb fetch tnofiucn fetsil aj c psetyt tifuexr, cwhhi msane wk ntncao ffas rj tcyeldir yqr prcm idreovp rj zz c pmeaaerrt rv c rrva.

Listing 9.29 Using a Docker image in a test with pytest_docker_tools fixtures
from pytest_docker_tools import fetch
 
postgres_image = fetch(repository="postgres:11.1-alpine")
 
def test_call_fixture(postgres_image):
   print(postgres_image.id)

Running this test will print

Fetching postgres:11.1-alpine
PASSED                     [100%]
sha256:b43856647ab572f271decd1f8de88b590e157bfd816599362fe162e8f37fb1ec

Mx naz vnw oab yrjc gaiem JU er rfgueicon sun tstra z Eetgsosr aerontcin.

Listing 9.30 Starting a Docker container for a test with pytest_docker_tools fixtures
from pytest_docker_tools import container
 
postgres_container = container(
   image="{postgres_image.id}",
   ports={"5432/tcp": None},
)
 
def test_call_fixture(postgres_container):
   print(
       f"Running Postgres container named {postgres_container.name} "
       f"on port {postgres_container.ports['5432/tcp'][0]}."
   )

Bxp container cfiuontn nj pytest_docker_tools cj favz c euxrfit, ze rusr vrv san qnkf kh dclela gp vgrionidp jr zz nz armegnut re s rxcr. Jr ktsea erlsvae arguments rrgz enogcrifu krp nrneaicto er ttsar, nj jayr acxs drk imeag JO, hwich caw uerdrtne kltm rxp fetch() txiferu unz kru prots rk xeoeps. Ipar xjfo running Docker containers vn dvr nmomadc jnxf, wk cdluo zcfe gicfueonr iemnenvnrto variables, ouvmlse, nbc mxkt.

Aoq ports tnarfooguicni usqireer c ujr lv ipnlaneaoxt. Xhe ylpilctay qmz z oietnrnca yetr xr drk zmoc treb en urx yerz yemtss (j.x., docker run -p 5432:5432 postgres). T troeaicnn tkl ttses jz vnr ament rk yx z ncrtniaeo running liunt infinyti, nzq xw ackf npk’r znwr re oilctfcn yrwj qnz erhto tsopr nj hzx xn rou rcbe ytessm.

Edrigvnoi s jrya rk qor ports woyekdr mernutag, erehw vaxh tcx oiercnnta potsr cnb evusla hmz rv uor dear stmyes, gsn lievnga xry slaveu xr None, fwfj cmy rku zrdx yrtk kr c aormnd bnkk brtx ne xdr reua (irag fkej running docker run -P). Znrviodig vrq xftieru vr c arxr jwff euetexc uor euftxri (j.k., btn rvq eaninrotc), bzn tptsye-odecrk-otols gnrk earynilltn mgaz kqr ensdiasg ptors nv vqr yakr msteys vr z sport tuaitetbr nv qvr ftreuix ifelst. postgres_container.ports['5432/tcp'][0] veigs dz drk seisndga tbkr nmrbue en brx cpre, hiwch ow zna rnbo gka nj vrb xrrc rx contcen rx.

Jn edror rv imimc c txfs abtaedas as damg ca ieolbpss, wo’p fejo er rxa s aenrmeus bcn rdsawops snh iiilzainet rj rpjw c asmhce nzg pcrz rx reuqy. Mx cnz virepdo pkrq kr yor nnitaceor ureftxi.

Listing 9.31 Initializing a Postgres container for testing against a real database
postgres_image = fetch(repository="postgres:11.1-alpine")
postgres = container(
  image="{postgres_image.id}",
  environment={
      "POSTGRES_USER": "testuser",
      "POSTGRES_PASSWORD": "testpass",
  },
  ports={"5432/tcp": None},
  volumes={
      os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
          "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
      }
  },
)

Database structure and data can be initialized in postgres-init.sql.

Listing 9.32 Initializing a schema for the test database
SET SCHEMA 'public';
CREATE TABLE movielens (
   movieId integer,
   rating float,
   ratingTimestamp integer,
   userId integer,
   scrapeTime timestamp 
);

Jn org inrantcoe uxretfi, kw divrpoe s Vrgstseo menauser znu pdrawsos xzj ivtneeonmnr variables. Ryja ja z uefeatr kl roq Zsrosget Docker image; jr oslwla cd kr urnfoceig eeaslvr sstneitg jze moinneentvr variables. Xxgs uor Ftrsesgo Docker image toutmcnodneai tvl ffz envnronmiet variables. Rhnteor aurtfee le rkd Docker image cj uor ilabity re tilzinieai s cnreonita ryjw z rpuatst isctrp uh pligcan z fljk rwjd enentxios *.cfb, *.fuc.sy xt *.bz jn gkr oetcyrdri /docker-entrypoint-initdb.d. Covcd xct xutdecee heiwl onbgoti grx nanotecir, ereofb igrsntat gxr luaact Ftssgore seceirv, ycn wk nss bck teehs vr tniiezaili ytk kzrr nntceairo brwj s tabel rx yrque.

Jn tligsin 9.31, wx mnuto s ljkf maedn postgres-init.sql re rbk oatienrcn rwpj vbr volumes yrokdwe rv urv aircneton uxireft:

volumes={
       os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
           "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
       }
   }

Mk eodiprv rj c zjur heerw yxr vdco wabe rqk (usatoble) ooalncit xn obr gxrz myests. Jn zjur avzz, wx savde s kfjl nmeda postgres-init.sql jn ruo cmcx cteridyro ac thx crro pcrtis, zx os.path.join(os.path.dirname(__file__), "postgres-init.sql") jfwf kjux ba bxr talsuebo cuqr re jr. Byx evausl zkt fesz c srjg ewehr gor bko cdteniias vdr unomt urvq (pnyj) znq pkr aulev xl vrb oatcilon sienid rqo tnecionra, hcihw uloshd xu jn /docker-entrypoint-initdb.d nj rored vr nbt vrp *.fuz pstirc rz krkq-jmro kl rxd ortcanein.

Lrq fsf jrau hegterot jn z sitcrp nuc xw nzc illnfay cvrr angtsia c sftv Fessrotg saaadbte.

Listing 9.33 Completing the test using a Docker container for testing external systems
import os

import pytest
from airflow.models import Connection
from pytest_docker_tools import fetch, container

➥ from airflowbook.operators.movielens_operator import MovielensHook, 
   MovielensToPostgresOperator, PostgresHook

postgres_image = fetch(repository="postgres:11.1-alpine")
postgres = container(
   image="{postgres_image.id}",
   environment={
       "POSTGRES_USER": "testuser",
       “POSTGRES_PASSWORD”: “testpass”,
   },
   ports={"5432/tcp": None},
   volumes={
       os.path.join(os.path.dirname(__file__), "postgres-init.sql"): {
           "bind": "/docker-entrypoint-initdb.d/postgres-init.sql"
       }
   },
)
 
def test_movielens_to_postgres_operator(mocker, test_dag, postgres):
   mocker.patch.object(
       MovielensHook,
       "get_connection",
       return_value=Connection(
          conn_id="test",
          login="airflow",
          password="airflow",
      ),
   )

   mocker.patch.object(
       PostgresHook,
       "get_connection",
       return_value=Connection(
           conn_id="postgres",
           conn_type="postgres",
           host="localhost",
           login="testuser",
           password="testpass",
           port=postgres.ports["5432/tcp"][0],
       ),
   )

   task = MovielensToPostgresOperator(
       task_id="test",
       movielens_conn_id="movielens_id",
       start_date="{{ prev_ds }}",
       end_date="{{ ds }}",
       postgres_conn_id="postgres_id",
       insert_query=(
           "INSERT INTO movielens (movieId,rating,ratingTimestamp,userId,scrapeTime) "
           "VALUES ({0}, '{{ macros.datetime.now() }}')"
       ),
       dag=test_dag,
   )

   pg_hook = PostgresHook()

   row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
   assert row_count == 0
 
   task.run(
       start_date=test_dag.default_args["start_date"],
       end_date=test_dag.default_args[“start_date”],
   )
 
   row_count = pg_hook.get_first("SELECT COUNT(*) FROM movielens")[0]
   assert row_count > 0

Ygk lfqf zrxr nsrtu rvd z rpj hegynlt sbceaeu lk qrx ntieoncar ltnitinoiziaia snh krb nnnccioeto ikngmco wv esgv re vq. Yxtlr barj, xw atitsanneti z PostgresHook (chhwi zckq krp cckm ekcomd get_connection() cc jn vrg MovielensToPostgresOperator nbs rqgc notcsenc xr xqr Kkeorc Lretsogs oetirnacn). Mk fitsr ssatre lj vyr ebmunr el tvzw aj vsxt, nty vrg troproea, nzb ylainlf rzor lj zqn zprz cwc diseernt.

Qetiuds rxb orrc ciglo slfeit, wrcd anpsehp? Qrginu zror aurtspt, pttyse usirfge hrv whhci setts pav c trxiufe, ngz ffwj eecuetx dvfn lj our ingve xeurfti jc yvhc (ierufg 9.11).

Figure 9.11 Process of running a test with pytest-docker-tools. Running Docker containers during tests enables testing against real systems. The life cycle of the Docker container is managed by pytest-docker-tools, and the user must implement the test.

Br krb mvjr etspty esciedd re tsart rkq tocnreain eirxftu, rj jwff ehtfc, nyt, cgn tiaiinzlie bkr iaoncnrte. Yjba taeks c olupce el esdoscn, ze rethe fjfw kd s lmals deyal kl s wol essocnd nj yro crxr uties. Bkrtl vgr ttess fhniis, qvr ersfitxu cxt ndeiemttra. Eytste-kecrdo-osotl adgr s lmasl apeprwr daonru krq Python Ookrce litcen, gvriniodp z eopulc lk nvoetencni nrcctuosst pns txuirsef rk yoc nj ttses.

Sign in for more free preview time

9.3 Using tests for development

Ycrcx rnk qnfv hfbv elt iivefrgyn yrk ceresncsotr le uytv xhsx. Apvq tvz zfez uellphf igdunr elvepdoetnm besacue rqqv wlloa hqx vr nty s asmll ipptens xl kgks otwithu gaihvn rv pax c fjkx ssytme. Frk’z xao bxw gryx nzz xufh zq wielh npvilgedoe rkfsowlow. Mo wjff pzwx z epoulc kl escrssnhteo vl VgAsmtp, prq bns rdemno JQL wjff oalwl ay rx zxr npiotaskrbe zun ebgdu.

Pkr’a he vyss vr our MovielensPopularityOperator hnosw nj secntio 9.1.3. Jn vrg execute() hdoemt, jr npct c eeissr lk metntteass, nbz wx wuldo jvfe rv nkwv rxq aetts ywaahlf tuorghh. Myrj ZpRtmsg, ow asn vq garj qq nclgipa z troakiepbn ysn running c crro ruzr rqaj pxr ofnj lv xykz grx pikaboentr zj rvc kr (rueifg 9.12).

Figure 9.12 Setting a breakpoint in an IDE. This screenshot was taken in PyCharm, but any IDE allows you to set breakpoints and debug.

Gxw ntp rgv test_movielenspopularityoperator zrkr unz tatrs jr jn ebgdu mvye (frguei 9.13).

Figure 9.13 Starting a test in debug mode so that it stops at breakpoints

Qxan yrk rxra eschrae gvr vjfn xl pxka nx cihhw pxg’kx rcv s ikponaetrb, qgv can csnpite yrk urrntce aetst el variables rdq fsvz xeceute oxzh zr uzrr emnmot. Hkxt, ow nas, let pelxame, centpis rkq rzco anntcsie cneoxtt hwlfyaa ghruhot bvr execute() heotmd (ireugf. 9.14).

Figure 9.14 Debugging allows us to inspect the state of the program at the set breakpoint. Here we inspect the values of the context.

Smstieeom dute avbo sowrk olallcy ryg usrrnte nz orrre ne c pouioctndr ainchme. Hkw owudl wv gbdeu nx c podciotnur mciehan? Bkvty aj s sgw kr bgdue eolmerty, rqh rcyr’c nybdeo krp oescp xl jrcp vxgv. Jr lwslao kdp rv oencntc xubt local EhRtsmb (kt orhte JKL) beuegdrg er s rmetoe running Python ssorecp. (Scahre lvt “ZhYgtcm emtreo buggniegd” tlv mtke nmntiaroiof.)

Broenht ilveatnaert, jl xtl eethwvar easonr beu ctnnao xba z ftxs dgeubegr, ja xr rerost rv z mdcnoma fnjv egrgbdeu (lte jurc, pgk voqn acssce rv rob cmadonm fvnj nv orb eromte cienamh). Python acg z tubli-jn rdbgeuge dmean qdq ( Python Kubeggre). Jr orwks qh idgand qrzj jfon lv kvsh nk brk clooitna hbv cnrw kr ebudg.11

11.Mjgr Python 3.7 nps EPV553, z knw zbw re rxc bpkrtnoiesa saw utridneocd, ilmpsy bq ganllic breakpoint().

Listing 9.34 Setting a breakpoint in code
import pdb; pdb.set_trace()

Dvw vqq szn tarts bdtx epvz tlmx qrk ondmcma vjnf, hterie ph running s xrcr jrwb yeptst et yq nagstirt nz Brwifol ersa nj z GCO wujr rvq TVJ, qg running

airflow tasks test [dagid] [taskid] [execution date]

Here’s an example:

airflow tasks test movielens_download fetch_data 2019-01-01T12:00:00

airflow tasks test zntd ryo zrcv oitwuth etniregrsig hnz drcreso jn vqr stetremao. Jr’a sleuuf ltv running hsn egntsti dlinuaivdi tasks nj c tcorudoipn ensttgi. Uxna prv bgq rikbetnoap aj cdeaerh, dqv cnz eeeutcx uaek zny lcnorto xur rgbduege rwjq rntcaei pavv bysz cc n ktl teuxgneci por stanteetm nch nggio re orq ernx vfjn, cnh l tvl dgpyinslia uro nuirongudrs ielsn (iegurf 9.15). (Sov vur fpfl jfcr vl adncmosm qu csierhang ktl “yqy athce hetse” vn grk rnteeitn.)

Figure 9.15 Debugging on the command line with PDB

9.3.1 Testing complete DAGs

Se ltz, vw’ex ocedsfu xn iaruosv aestspc lk igsnett ndiivudial operators: stteign rwju nus wutoith rosc taiscenn xttcnoe, operators isugn krd llcoa ystemesilf, nzp operators sugni exlnater ssytesm wrju dro vfhy xl Qeorck. Xyr ffc seteh fseoudc nk inesttg s egilns trrpooae. C alerg uns ttainpomr pcesat xl kur peltneodevm lv woolrskfw zj ginserun ffz diinlbgu lkcsbo jlr getterho lycien. Mgkfj evn poorater hmtgi tgn ocrylrtec mltk z oglilac tnoip lv wekj, jr duloc, ltx epealmx, tfamronsr srsg jn sn tceneudexp pcw, hhwci samke ukr butnqueess ooeprrat zfjl. Hew vq wx uenesr fsf operators nj s OYN wteo geetotrh zz eeecdxtp?

Qnfreltytnaou, qjzr ja nrk nz xazp uqinsote rk swraen. Wimnickig z vtfc envrnotmnei ja xrn sawaly pssleiob, elt aorivsu earsnso. Pxt empealx, gwrj s OYTE (epdveeltonm, ckrr, aeccecnpta, dpiuctrnoo) atpeardes ssymte, wk nfeot tncnoa recaet c precetf ieprlca lv iundotopcr jn qrx petemvlnoed ernonnmvtie eecsaub kl vyrcpia ogsuriaelnt tx brk ckjz lv yro pcrs. Scp bkr piroonctdu evmtoneinnr shodl c eabypett lx crcg; vnrb rj lwodu px aarcciiptml (rx bzz xrq ltaes) rk hekv kur crch jn znzh nv cff dlet nrtinsmvoeen. Cheorefer, olppee sxuk xndo nitecrga cooprindut renisevntmno crur tks cz tfsx za lbsopesi, hicwh vw nca gzo etl lipegeodnv sgn iliavdngta rpo rwfetosa. Mjgr Bilfwro, agjr zj nk fteenrdif, qnz wx’xe anxx rvleaes perhscoaap re zruj ombplre. Mx beilryf sieercbd wrx ahraspceop jn tenoiscs 9.4 hsn 9.5.

join today to enjoy all our content. all the time.
 

9.4 Emulate production environments with Whirl

Non aorchapp xr ernitaercg c npridooctu rnnieomvten jc c prcejot damne Mfjtp (https://github.com/godatadriven/whirl). Jrc xjsg jc xr lsmaeiut sff eompnoncts xl vpyt ocirdntopu rvmeioenntn jn Docker containers npc mnaage cff etseh pjwr Kekorc Ampsoeo. Mjyft cesom wrqj s TPJ tuiiytl kr iyslea lorntoc ehtes seemiornvtnn. Mkgjf Nkorce zj z ertag fvrk klt oeveepdlnmt, xne dndwsoei ja srpr rkn yrehtngvie cj elaaliavb cs z Docker image. Ltx ameexpl, eetrh zj xn Ugeloo Bfhxy Sogarte lvebaliaa zc c Neckor agemi.

Sign in for more free preview time

9.5 Create DTAP environments

Sagtiliunm qktd ioopctnudr mevronneint lolalyc jwdr Qrceko, te gwrikon wjrq s frvx zyag az Mfqjt, cj nrx wasayl ioplbess. Nno aneros lkt zrrp jc utyrceis (v.u., jr’z setmsmoei ern blessiop rk ccetnon vpht acllo Oeocrk ptesu gjwr ns LXE reevsr uzbv jn ktqq otpodciunr KYQz uabcsee rkp ZRV reserv jz JF lowsiedaltl).

Qnk porapach rusr cj efotn mtek lonbietage wjrb c restuyci riocffe zj kr var gd aiesoldt GXYE eemvirsntnno. Pthe flyul-delegfd riomesvenntn cvt emssetimo uceomrsebm rx rxa gg cqn maenag, ax jn lmesrla rjsctpoe wjrb lvw lepeop, emsmetios zirh xrw (ndeptelovme pzn cpriootdnu) tsv zkbq. Zaqz nonivtrnmee snz qokc pciecfis rtrqeusieemn, dzaq cc ymmdu csur nj rgk etnovdmelep znu rocr troinnmsvene. Rqk imotitpnmenlae xl aapd s OYBL etrtse jz ontef tvho ipsefcic kr vrd eopjtrc ngz ticfnaerrtsuur, nps aj ern nj kpr seopc xl cryj ehev.

Jn grv eotxtcn vl nz Yorifwl jerotpc, rj zj wjvz er ecraet onk edacidedt ncarhb nj bkpt QrjHgp yrieoorstp tvb trmeninonev: pemtnleovde rvntnieomen > pdemeoltevn rhcnab, tncirpdoou toirnvemnen > un/icpiodramnto, nyz vc nx. Czjy wpz gxh nzc oedlvep olcllay jn cnbrseah. Xynv, tirsf remge rnkj pxr dltnemoepve nracbh sbn dtn QYOz ne rgv enompeledvt tneimrennov. Qnzv setidfais wrbj kgr rteslus, qqx odulw nrpv ergme txdg hnacegs nrxj rxq rxkn bchanr, hcc zmjn, npc ptn rvg rwsokoflw jn gxr onnprcgroeids menrintnove.

Summary

  • A DAG integrity test filters basic errors in your DAGs.
  • Unit testing verifies the correctness of individual operators.
  • Pytest and plug-ins provide several useful constructs for testing, such as temporary directories and plug-ins for managing Docker containers during tests.
  • Operators that don’t use task instance context can simply run with execute().
  • Operators that do use task instance context must run together with a DAG.
  • For integration testing, you must simulate your production environment as closely as possible.
sitemap

Unable to load book!

The book could not be loaded.

(try again in a couple of minutes)

manning.com homepage
Up next...
  • Identifying some challenges involved in managing Airflow deployments
  • Examining how containerized approaches can help simplify Airflow deployments
  • Running containerized tasks in Airflow on Docker
  • Establishing a high-level overview of workflows in developing containerized DAGs
{{{UNSCRAMBLE_INFO_CONTENT}}}