Utilizatorii se așteaptă să vadă informații în timp ce se intamplă. Vor ca atunci când comandă un produs pe un site de e-commerce să vadă imediat confirmarea comenzii. Atunci, când caută informații într-o aplicație, textbox-ul să le ofere sugestii de căutare în timp real. In acest articol, vom discuta despre o procesarea fluxului de evenimente generat de interacțiunea utilizatorului sau de alte servicii externe pentru ca să expunem informații în timp real utilizatorilor folosind ReactiveExtensions (mai multe detalii: [1][2] si [3]).

Aplicabilitate

ReactiveExtensions pentru .NET (prescurtat Rx) este o librarie pentru a procesa fluxul de evenimente ce apar într -un sistem ”event-based”, cum ar fi:

  • Evenimente UI generate de user: mouse-moved, key-pressed, etc.
  • Evenimente care apar datorită programării asincrone: cum ar fi cererile și răspunsurile venite în urma unor apelări unor servicii web
  • Evenimente venite dintr-un Enterprise Service Bus: JMS, Windows Azure Service Bus, NServiceBus, etc. sau prin integrarea cu un motor de CEP cum ar fi StreamInsight sau StreamBase

Arhitectura Rx

Rx pentru .NET se bazează pe conceptul de “Observable Collections” (colectii observabile), definit prin interfețele:

RxInterfaces

Aceste interfețe au un rol complementar:  IObservable<T> reprezintă sursa evenimetelor generate (e.g. mouse-moved) care pot fi observate de catre niște obiecte reprezentate de IObserver<T>. IObservable<T> atașează prin metoda Subscribe un obiect ce tratează evenimentele generate și returnează un obiect IDisposable ce poate fi folosit pentru a detașa observatorul pentru ca evenimentele să nu mai fie tratate de acesta.

Aceste interfețe sunt duale pentru IEnumerable<T> and IEnumerator<T> prin faptul că “Observable Collections” utilizeaza modelul “Push” pentru a “împinge” datele către observatori, pe când sursele de date “Enumerable” sunt interogate de catre enumeratori pentru a primi “datele”. Modelul “Push” de procesare a datelor este denumit ca și model reactiv, iar modul prin care este implementată această procesare este prin definirea unor set de extensii (disponibile in C#) pentru IObservable<T> și de aici numele de ReactiveExtensions.

PullVsPushModel

Pentru a folosi Rx, trebuie adăugate librariile System.Reactive (disponibile aici [4] sau prin NuGet – ReactiveExtensions). Vom folosi ca prim exemplu clasa Observable pentru a genera date ce vor fi “împinse” către un observator pentru a fi procesate. In exemplu de mai jos, vom genera evenimente la anumite interval de timp și vom atașa un observator ce se va ocupa de datele primite:

ObservableSource

 

 

Clasa Observable generează numerele 1,4,9,16 la interval de timp 1,2,3,4 secunde și parametric pasați metodei Subscribe procesează datele primite la evenimentele de OnNext (când date noi sunt disponibile pentru procesare), OnError (când sursa observată aruncă o excepție) și OnCompleted (când sursa notifică observatorul că a terminat de transmis datele).

Procesarea evenimentelor in Rx

In cazul procesării evenimentelor de UI, apar probleme de genul filtrării unor evenimente sau a dezabonării de la anumite evenimente.  Aceste probleme sunt ușor rezolvate prin importarea de evenimente generate de către elemente de UI in Rx prin clasa Observable, în exemplul următor vom intercepta toate evenimentele de tipul MouseEventArgs și EventArgs pe care fereastra construită le generează prin miscarea mouse-ului, respectiv tastarea într-un text-box și combină fluxul de evenimente într-un “Observable Collection” care reprezintă practic o bază de date de evenimente de tipul MouseMove, respectiv TextChanged, generate de cursor sau de text-box.

Fluxul de evenimente de tip TextChanged pot fi ușor filtrate, în scenariul în care nu este nevoie de procesarea tuturor evenimentelor, ci alegem doar datele distincte la un interval de o secundă (în scenariul in care utilizatorul scrie foarte repede și fiecare input necesită o procesare complexă dorim să nu facem cereri către backend ca să proceseze fiecare cerere). Abonările pot fi combinate, in cazul nostru vom combina evenimentele MouseMove, respectiv TextChanged și vom invoca Garbage Collector-ul pentru ambele subscripții.

 

UIEventObservation

TextChangedSourceObserver

Pipeline-ul procesării evenimentelor de UI în cazul evenimentului “TextChanged” pentru un textbox

 

Monitoring

Rx și procesarea asincrona a apelarii serviciilor web

Apelarea serviciilor web se face in mod tradițional prin doua metode: o metodă care inițiaza apelul spre serviciul web și altă metodă care primește rezultatul. Această tehnică duce, în cazul unor sisteme ce folosesc intens comunicare cu servicii web, la un cod greu de întreținut – “spaghetti code”. Procesarea apelării serviciilor web (cererea catre serviciul și raspunsul returnat) poate fi mult simplificat, prin transformarea acestor perechi de metode de catre Rx.De exemplu, în cazul unui serviciu web, invocat prin SOAP, ce returnează o lista de cuvinte ce au ca și prefix parametrul transmis, proxy-ul genereaza perechea de metode ce se poate folosi în felul urmator:

WSCall

Fragmentul de cod prezentat mai sus se poate rescrie folosind metoda FromAsyncPattern din clasa Observable, unde se specifică ca și parametri generic, semnatura metodei ce inițiază apelul si care se folosește la obținerea rezultatului in urma invocarii serviciului web, și produce un “Observable Collection” din rezultatele obținute de la serviciul web, la care putem abona un observator care să proceseze rezultatele. In fragmentul de cod de mai jos, este prezentat modul in care transformăm apelurile catre serviciul web, pentru a returna ca rezultat cuvintele care încep cu “react” într-o sursa observabilă la care abonăm un observator ce afișează rezultatele primite:

WSReactCall

 

Implementari disponibile

Pe langă Rx pentru .NET, modelul reactiv – “Push model” – prezentat, a fost de asemenea implementat in Javascript (pentru browser cât și server-side Node.js), C++, Ruby și Python. De asemenea, o implementare pentru JVM folosind Clojure a fost dezvoltată recent de Netflix [5]. O prezentare a acestor implementări poate fi gasită in [4], [5] și [6].

Concluzii

Mecanismul reactiv de tratare a evenimentelor, în sisteme unde o varietate de evenimente sunt generate in timp real, de catre utilizatori sau prin comunicare cu anumite servicii externe, se dovedește a fi un instrument deosebit de folositor ce ușureaza munca programatorului și conferă claritate și concizie codului.

Rămane la analiza cititorului un număr de aspecte ce nu au fost tratate in acest articol, cum ar fi threading-ul, implementările disponibile și pentru alte platforme, testabilitatea codului scris cu ajutorul Rx și un numar foarte mare de metode utilitare ca și extensii pentru colecțiile observabile, detalii ce pot fi gasite la referințe!

Referinte

[1]. “Rx Design Guidelines.pdf”, disponibil la http://msdn.microsoft.com/en-US/data/gg577612

[2]. “Rx HOL .NET.pdf”, disponibil la http://msdn.microsoft.com/en-US/data/gg577612

[3]. http://www.introtorx.com/

[4]. http://rx.codeplex.com/

[5]. http://techblog.netflix.com/2013/02/rxjava-netflix-api.html

[6]. https://github.com/Netflix/RxJava