observer pattern
Padrão do observador

RxJS

De acordo com o site oficial,

RxJS é a implementação JavaScript de ReactiveX, uma biblioteca para compor programas assíncronos e baseados em eventos usando sequências observáveis.

Em termos simples, o RxJS é uma implementação do padrão Observer. Ele também estende o padrão Observer, fornecendo operadores que nos permitem compor Observables e Subject de maneira declarativa.

Observadores, Observáveis, Operadores e Sujeitos são os elementos básicos do RxJS. Então, vamos olhar para cada um com mais detalhes agora.

Observadores

Observadores são objetos que podem se inscrever em Observáveis ​​e Sujeitos. Após a inscrição, eles podem receber notificações de três tipos – próximo, erro e completo.

Qualquer objeto com a seguinte estrutura pode ser usado como um observador.

interface Observer {    closed?: boolean;    next: (value: T) => void;    error: (err: any) => void;    complete: () => void;}

Quando o Observable envia notificações seguintes, de erro e completas, o .next, .errore .complete métodos são chamados.

Observáveis

Observáveis ​​são objetos que podem emitir dados por um período de tempo. Pode ser representado usando o “diagrama de mármore”.

observable 1
Observável concluído com sucesso

Onde a linha horizontal representa o tempo, os nós circulares representam os dados emitidos pelo Observável e a linha vertical indica que o Observável foi concluído com êxito.

observable with error
Observável com um erro

Observáveis ​​podem encontrar um erro. A cruz representa o erro emitido pelo Observável.

Os estados “concluído” e “erro” são finais. Isso significa que o Observables não pode emitir nenhum dado após concluir com êxito ou encontrar um erro.

Criando um Observável

Observáveis ​​são criados usando o new Observable construtor que usa um argumento – a função de inscrição. Observáveis ​​também podem ser criados usando alguns operadores, mas falaremos sobre isso mais tarde quando falarmos sobre Operadores.

import { Observable } from 'rxjs';const observable = new Observable(subscriber => {   // Subscribe function });

Assinando um Observável

Observáveis ​​podem ser assinados usando seus .subscribe método e passando um Observer.

observable.subscribe({    next: (x) => console.log(x),    error: (x) => console.log(x),    complete: () => console.log('completed');});

Execução de um Observável

A função de inscrição que passamos para o new Observable O construtor é executado toda vez que o Observable é inscrito.

A função de assinatura aceita um argumento – o Assinante. O Assinante se parece com a estrutura de um Observador e possui os mesmos três métodos: .next, .errore .complete.

Observáveis ​​podem enviar dados para o Observador usando o .next método. Se o Observável for concluído com êxito, ele poderá notificar o Observador usando o .complete método. Se o Observable encontrou um erro, ele pode enviar o erro ao Observador usando o comando .error método.

// Create an Observableconst observable = new Observable(subscriber => {   subscriber.next('first data');   subscriber.next('second data');   setTimeout(() => {       subscriber.next('after 1 second - last data');       subscriber.complete();       subscriber.next('data after completion'); // <-- ignored   }, 1000);   subscriber.next('third data');});// Subscribe to the Observableobservable.subscribe({    next: (x) => console.log(x),    error: (x) => console.log(x),    complete: () => console.log('completed')});// Outputs://// first data// second data// third data// after 1 second - last data// completed

Observáveis ​​são Unicast

Observáveis ​​são unicast, o que significa que os Observables podem ter no máximo um assinante. Quando um Observer assina um Observable, ele obtém uma cópia do Observable que possui seu próprio caminho de execução, tornando o Observables unicast.

É como assistir a um vídeo do YouTube. Todos os espectadores assistem ao mesmo conteúdo de vídeo, mas podem assistir a diferentes segmentos do vídeo.

Exemplo: vamos criar um Observable que emita de 1 a 10 em 10 segundos. Em seguida, assine o Observable uma vez imediatamente e novamente após 5 segundos.

// Create an Observable that emits data every second for 10 secondsconst observable = new Observable(subscriber => {	let count = 1;    const interval = setInterval(() => {		subscriber.next(count++);                if (count > 10) {        	clearInterval(interval);           }    }, 1000);});// Subscribe to the Observableobservable.subscribe({	next: value => {        console.log(`Observer 1: ${value}`);    }});// After 5 seconds subscribe againsetTimeout(() => {    observable.subscribe({        next: value => {            console.log(`Observer 2: ${value}`);        }    });}, 5000);/* OutputObserver 1: 1Observer 1: 2Observer 1: 3Observer 1: 4Observer 1: 5Observer 2: 1Observer 1: 6Observer 2: 2Observer 1: 7Observer 2: 3Observer 1: 8Observer 2: 4Observer 1: 9Observer 2: 5Observer 1: 10Observer 2: 6Observer 2: 7Observer 2: 8Observer 2: 9Observer 2: 10*/

Na saída, você pode observar que o segundo Observer começou a imprimir a partir de 1, mesmo após a assinatura após 5 segundos. Isso acontece porque o segundo Observer recebeu uma cópia do Observable cuja função de assinatura foi chamada novamente. Isso ilustra o comportamento unicast de Observables.

assuntos

Um Assunto é um tipo especial de Observável.

Criando um Assunto

Um Assunto é criado usando o new Subject construtor.

import { Subject } from 'rxjs';// Create a subjectconst subject = new Subject();

Assinando um Assunto

A inscrição em um Assunto é semelhante à inscrição em um Observable: você usa o .subscribe método e passar um observador.

subject.subscribe({    next: (x) => console.log(x),    error: (x) => console.log(x),    complete: () => console.log("done")});

Execução de um Assunto

Ao contrário de Observables, um Assunto chama de seu .next, .errore .complete métodos para enviar dados aos Observadores.

// Push data to all observerssubject.next('first data');// Push error to all observerssubject.error('oops something went wrong');// Completesubject.complete('done');

Os assuntos são Multicast

Os assuntos são multicast: vários Observadores compartilham o mesmo Assunto e seu caminho de execução. Isso significa que todas as notificações são transmitidas a todos os observadores. É como assistir a um programa ao vivo. Todos os espectadores estão assistindo o mesmo segmento do mesmo conteúdo ao mesmo tempo.

Exemplo: vamos criar um Assunto que emita de 1 a 10 em 10 segundos. Em seguida, assine o Observable uma vez imediatamente e novamente após 5 segundos.

// Create a subjectconst subject = new Subject();let count = 1;const interval = setInterval(() => {    subscriber.next(count++);    if (count > 10) {        clearInterval(interval);    }}, 1000);// Subscribe to the subjectssubject.subscribe(data => {    console.log(`Observer 1: ${data}`);});// After 5 seconds subscribe againsetTimeout(() => {    subject.subscribe(data => {    	console.log(`Observer 2: ${data}`);	});}, 5000);/* OUTPUTObserver 1: 1Observer 1: 2Observer 1: 3Observer 1: 4Observer 1: 5Observer 2: 5Observer 1: 6Observer 2: 6Observer 1: 7Observer 2: 7Observer 1: 8Observer 2: 8Observer 1: 9Observer 2: 9Observer 1: 10Observer 2: 10*/

Na saída, você pode notar que o segundo Observer começou a imprimir a partir de 5 em vez de começar a 1. Isso acontece porque o segundo Observer está compartilhando o mesmo Assunto. Desde que se inscreveu após 5 segundos, o Assunto já terminou de emitir 1 a 4. Isso ilustra o comportamento multicast de um Assunto.

Os sujeitos são observáveis ​​e observadores

Os sujeitos têm o .next, .error e .complete métodos. Isso significa que eles seguem a estrutura dos Observadores. Portanto, um Assunto também pode ser usado como Observador e passado para o .subscribe função de observáveis ​​ou outros assuntos.

Exemplo: vamos criar um Observável e um Assunto. Em seguida, assine o Observável usando o Assunto como Observador. Por fim, assine o Assunto. Todos os valores emitidos pelo Observável serão enviados ao Assunto, e o Assunto transmitirá os valores recebidos a todos os seus Observadores.

// Create an Observable that emits data every secondconst observable = new Observable(subscriber => {   let count = 1;   const interval = setInterval(() => {       subscriber.next(count++);              if (count > 5) {        	clearInterval(interval);          }   }, 1000);});// Create a subjectconst subject = new Subject();// Use the Subject as Observer and subscribe to the Observableobservable.subscribe(subject);// Subscribe to the subjectsubject.subscribe({    next: value => console.log(value)});/* Output12345*/

Operadores

Operadores são o que torna o RxJS útil. Operadores são funções puras que retornam um novo Observable. Eles podem ser categorizados em 2 categorias principais:

  1. Operadores de criação
  2. Operadores Pipeable

Operadores de criação

Operadores de criação são funções que podem criar um novo Observável.

Exemplo: podemos criar um Observable que emite cada elemento de uma matriz usando o from operador.

const observable = from([2, 30, 5, 22, 60, 1]);observable.subscribe({    next: (value) => console.log("Received", value),    error: (err) => console.log(err),    complete: () => console.log("done")});/* OUTPUTSReceived 2Received 30Received 5Received 22Received 60Received 1done*/

O mesmo pode ser um Observável usando o diagrama de mármore.

from operator

Operadores Pipeable

Operadores Pipeable são funções que recebem um Observable como entrada e retornam um novo Observable com comportamento modificado.

Exemplo: vamos pegar o Observable que criamos usando o from operador. Agora, usando este Observable, podemos criar um novo Observable que emita apenas números maiores que 10 usando o filter operador.

const greaterThanTen = observable.pipe(filter(x => x > 10));greaterThanTen.subscribe(console.log, console.log, () => console.log("completed"));// OUTPUT// 11// 12// 13// 14// 15

O mesmo pode ser representado usando o diagrama de mármore.

filter operator

Existem muitos operadores mais úteis por aí. Você pode ver a lista completa de operadores, além de exemplos na documentação oficial do RxJS aqui.

É crucial entender todos os operadores comumente usados. Aqui estão alguns operadores que eu uso com frequência:

  1. mergeMap
  2. switchMap
  3. exhaustMap
  4. map
  5. catchError
  6. startWith
  7. delay
  8. debounce
  9. throttle
  10. interval
  11. from
  12. of

Redux Observables

De acordo com o site oficial,

RxJSmiddleware baseado em Restaurado. Componha e cancele ações assíncronas para criar efeitos colaterais e muito mais.

No Redux, sempre que uma ação é despachada, ela executa todas as funções do redutor e um novo estado é retornado.

Redux-observable pega todas essas ações despachadas e novos estados e cria dois observáveis ​​a partir dele – Ações observáveis action$e Estados observáveis state$.

As ações observáveis ​​emitirão todas as ações despachadas usando o store.dispatch(). Os estados observáveis ​​emitirão todos os novos objetos de estado retornados pelo redutor de raiz.

Epics

De acordo com o site oficial,

É uma função que executa um fluxo de ações e retorna um fluxo de ações. Ações dentro, ações fora.

Epopéias são funções que podem ser usadas para assinar Ações e Observáveis ​​dos Estados. Uma vez inscritos, os épicos receberão o fluxo de ações e estados como entrada e devem retornar um fluxo de ações como saída. Actions In – Actions Out.

const someEpic = (action$, state$) => {     return action$.pipe( // subscribe to actions observable        map(action => { // Receive every action, Actions In            return someOtherAction(); // return an action, Actions Out        })    )}

É importante entender que todas as ações recebidas na Epopeia já foram terminou de correr através dos redutores.

Dentro de uma epopeia, podemos usar qualquer padrão observável do RxJS, e é isso que torna o redux-observável útil.

Exemplo: nós podemos usar o .filter operador para criar um novo intermediário observável. Da mesma forma, podemos criar qualquer número de observáveis ​​intermediários, mas a saída final do observável final deve ser uma ação; caso contrário, uma exceção será levantada pelo redux-observável.

const sampleEpic = (action$, state$) => {    return action$.pipe(    	filter(action => action.payload.age >= 18), // can create intermediate observables and streams        map(value => above18(value)) // where above18 is an action creator    );}

Toda ação emitida pela Epics é despachada imediatamente usando o store.dispatch().

Configuração

Primeiro, vamos instalar as dependências.

npm install --save rxjs redux-observable

Crie uma pasta separada chamada epics para manter todos os épicos. Crie um novo arquivo index.js dentro de epics pasta e combine todos os épicos usando o combineEpics para criar a epopeia raiz. Em seguida, exporte a epopeia raiz.

import { combineEpics } from 'redux-observable';import { epic1 } from './epic1';import { epic2 } from './epic2';const epic1 = (action$, state$) => { ...   } const epic2 = (action$, state$) => { ...   } export default combineEpics(epic1, epic2);

Crie um middleware épico usando o createEpicMiddleware função e passá-lo para o createStore Função Redux.

import { createEpicMiddleware } from 'redux-observable';import { createStore, applyMiddleware } from 'redux';import rootEpic from './rootEpics';const epicMiddleware = createEpicMiddlware();const store = createStore(    rootReducer,    applyMiddleware(epicMiddlware));

Por fim, passe o épico raiz para o middleware épico .run método.

epicMiddleware.run(rootEpic);

Alguns casos práticos

O RxJS tem uma grande curva de aprendizado e a configuração observável ao redux piora o já doloroso processo de configuração do Redux. Tudo isso faz com que o Redux observável pareça um exagero. Mas aqui estão alguns casos de uso práticos que podem mudar de idéia.

Nesta seção, compararei redux-observables com redux-thunk para mostrar como o redux-observables pode ser útil em casos de uso complexos. Eu não odeio redux-thunk, adoro e uso todos os dias!

1. Faça chamadas de API

Caso: Faça uma chamada de API para buscar comentários de uma postagem. Mostrar carregadores quando a chamada da API estiver em andamento e também manipular erros de API.

Uma implementação redux-thunk terá esta aparência,

function getComments(postId){    return (dispatch) => {        dispatch(getCommentsInProgress());        axios.get(`/v1/api/posts/${postId}/comments`).then(response => {            dispatch(getCommentsSuccess(response.data.comments));        }).catch(() => {            dispatch(getCommentsFailed());        });    }}

e isso é absolutamente correto. Mas o criador da ação está inchado.

Podemos escrever uma Epopeia para implementar o mesmo usando redux-observables.

const getCommentsEpic = (action$, state$) => action$.pipe(    ofType('GET_COMMENTS'),    mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe(        map(response => getCommentsSuccess(response.data.comments)),        catchError(() => getCommentsFailed()),        startWith(getCommentsInProgress())    ));

Agora nos permite ter um criador de ações limpo e simples como este,

function getComments(postId) {    return {        type: 'GET_COMMENTS',        payload: {            postId        }    }}

2. Solicitação de devolução

Caso: Forneça o preenchimento automático para um campo de texto chamando uma API sempre que o valor do campo de texto for alterado. A chamada à API deve ser feita 1 segundo após o usuário parar de digitar.

Uma implementação redux-thunk terá esta aparência,

let timeout;function valueChanged(value) {    return dispatch => {        dispatch(loadSuggestionsInProgress());        dispatch({            type: 'VALUE_CHANGED',            payload: {                value            }        });        // If changed again within 1 second, cancel the timeout        timeout && clearTimeout(timeout);        // Make API Call after 1 second        timeout = setTimeout(() => {        	axios.get(`/suggestions?q=${value}`)                .then(response =>                      dispatch(loadSuggestionsSuccess(response.data.suggestions)))                .catch(() => dispatch(loadSuggestionsFailed()))        }, 1000, value);    }}

Requer uma variável global timeout. Quando começamos a usar variáveis ​​globais, nossos criadores de ações não são mais funções puras. Também fica difícil testar os criadores de ações que usam uma variável global.

Podemos implementar o mesmo com redux observável usando o .debounce operador.

const loadSuggestionsEpic = (action$, state$) => action$.pipe(    ofType('VALUE_CHANGED'),    debounce(1000),    mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe(    	map(response => loadSuggestionsSuccess(response.data.suggestions)),        catchError(() => loadSuggestionsFailed())    )),    startWith(loadSuggestionsInProgress()));

Agora, nossos criadores de ações podem ser limpos e, mais importante, podem ser funções puras novamente.

function valueChanged(value) {    return {        type: 'VALUE_CHANGED',        payload: {            value        }    }}

3. Solicitar cancelamento

Caso: Continuando o caso de uso anterior, suponha que o usuário não digitou nada por 1 segundo e fizemos nossa primeira chamada de API para buscar as sugestões.

Digamos que a própria API leva em média 2-3 segundos para retornar o resultado. Agora, se o usuário digitar algo enquanto a 1ª chamada da API estiver em andamento, após 1 segundo, faremos a nossa 2ª API. Podemos acabar tendo duas chamadas de API ao mesmo tempo, e isso pode criar uma condição de corrida.

Para evitar isso, precisamos cancelar a 1ª chamada da API antes de fazer a 2ª chamada da API.

Uma implementação redux-thunk terá esta aparência,

let timeout;var cancelToken = axios.cancelToken;let apiCall;function valueChanged(value) {        return dispatch => {        dispatch(loadSuggestionsInProgress());        dispatch({            type: 'VALUE_CHANGED',            payload: {                value            }        });        // If changed again within 1 second, cancel the timeout        timeout && clearTimeout(timeout);        // Make API Call after 1 second        timeout = setTimeout(() => {            // Cancel the existing API            apiCall && apiCall.cancel('Operation cancelled');                        // Generate a new token            apiCall = cancelToken.source();                                    axios.get(`/suggestions?q=${value}`, {                cancelToken: apiCall.token            })                .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions)))                .catch(() => dispatch(loadSuggestionsFailed()))             }, 1000, value);    }}

Agora, ele requer outra variável global para armazenar o token de cancelamento do Axios. Mais variáveis ​​globais = funções mais impuras!

Para implementar o mesmo usando redux-observable, tudo o que precisamos fazer é substituir .mergeMap com .switchMap.

const loadSuggestionsEpic = (action$, state$) => action$.pipe(    ofType('VALUE_CHANGED'),    throttle(1000),    switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe(    	map(response => loadSuggestionsSuccess(response.data.suggestions)),        catchError(() => loadSuggestionsFailed())    )),    startWith(loadSuggestionsInProgress()));

Como não requer alterações nos criadores de nossas ações, eles podem continuar sendo funções puras.

Da mesma forma, existem muitos casos de uso em que o Redux-Observables realmente brilha! Por exemplo, pesquisando uma API, mostrando lanchonetes, gerenciando conexões WebSocketetc.

Concluir

Se você estiver desenvolvendo um aplicativo Redux que envolva casos de uso tão complexos, é altamente recomendável usar o Redux-Observables. Afinal, os benefícios de usá-lo são diretamente proporcionais à complexidade do seu aplicativo e são evidentes nos casos de uso práticos mencionados acima.

Eu acredito firmemente que usar o conjunto certo de bibliotecas nos ajudará a desenvolver aplicações muito mais limpas e sustentáveise, a longo prazo, os benefícios de usá-los serão superiores aos inconvenientes.