RxJS
De acordo com o site oficial,
RxJS é a implementação JavaScript de ReactiveX, uma biblioteca para compor programas assíncronos e baseados em eventos usando sequências observáveis.
Em termos simples, o RxJS é uma implementação do padrão Observer. Ele também estende o padrão Observer, fornecendo operadores que nos permitem compor Observables e Subject de maneira declarativa.
Observadores, Observáveis, Operadores e Sujeitos são os elementos básicos do RxJS. Então, vamos olhar para cada um com mais detalhes agora.
Observadores
Observadores são objetos que podem se inscrever em Observáveis e Sujeitos. Após a inscrição, eles podem receber notificações de três tipos – próximo, erro e completo.
Qualquer objeto com a seguinte estrutura pode ser usado como um observador.
interface Observer { closed?: boolean; next: (value: T) => void; error: (err: any) => void; complete: () => void;}
Quando o Observable envia notificações seguintes, de erro e completas, o .next
, .error
e .complete
métodos são chamados.
Observáveis
Observáveis são objetos que podem emitir dados por um período de tempo. Pode ser representado usando o “diagrama de mármore”.
Onde a linha horizontal representa o tempo, os nós circulares representam os dados emitidos pelo Observável e a linha vertical indica que o Observável foi concluído com êxito.
Observáveis podem encontrar um erro. A cruz representa o erro emitido pelo Observável.
Os estados “concluído” e “erro” são finais. Isso significa que o Observables não pode emitir nenhum dado após concluir com êxito ou encontrar um erro.
Criando um Observável
Observáveis são criados usando o new Observable
construtor que usa um argumento – a função de inscrição. Observáveis também podem ser criados usando alguns operadores, mas falaremos sobre isso mais tarde quando falarmos sobre Operadores.
import { Observable } from 'rxjs';const observable = new Observable(subscriber => { // Subscribe function });
Assinando um Observável
Observáveis podem ser assinados usando seus .subscribe
método e passando um Observer.
observable.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log('completed');});
Execução de um Observável
A função de inscrição que passamos para o new Observable
O construtor é executado toda vez que o Observable é inscrito.
A função de assinatura aceita um argumento – o Assinante. O Assinante se parece com a estrutura de um Observador e possui os mesmos três métodos: .next
, .error
e .complete
.
Observáveis podem enviar dados para o Observador usando o .next
método. Se o Observável for concluído com êxito, ele poderá notificar o Observador usando o .complete
método. Se o Observable encontrou um erro, ele pode enviar o erro ao Observador usando o comando .error
método.
// Create an Observableconst observable = new Observable(subscriber => { subscriber.next('first data'); subscriber.next('second data'); setTimeout(() => { subscriber.next('after 1 second - last data'); subscriber.complete(); subscriber.next('data after completion'); // <-- ignored }, 1000); subscriber.next('third data');});// Subscribe to the Observableobservable.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log('completed')});// Outputs://// first data// second data// third data// after 1 second - last data// completed
Observáveis são Unicast
Observáveis são unicast, o que significa que os Observables podem ter no máximo um assinante. Quando um Observer assina um Observable, ele obtém uma cópia do Observable que possui seu próprio caminho de execução, tornando o Observables unicast.
É como assistir a um vídeo do YouTube. Todos os espectadores assistem ao mesmo conteúdo de vídeo, mas podem assistir a diferentes segmentos do vídeo.
Exemplo: vamos criar um Observable que emita de 1 a 10 em 10 segundos. Em seguida, assine o Observable uma vez imediatamente e novamente após 5 segundos.
// Create an Observable that emits data every second for 10 secondsconst observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); } }, 1000);});// Subscribe to the Observableobservable.subscribe({ next: value => { console.log(`Observer 1: ${value}`); }});// After 5 seconds subscribe againsetTimeout(() => { observable.subscribe({ next: value => { console.log(`Observer 2: ${value}`); } });}, 5000);/* OutputObserver 1: 1Observer 1: 2Observer 1: 3Observer 1: 4Observer 1: 5Observer 2: 1Observer 1: 6Observer 2: 2Observer 1: 7Observer 2: 3Observer 1: 8Observer 2: 4Observer 1: 9Observer 2: 5Observer 1: 10Observer 2: 6Observer 2: 7Observer 2: 8Observer 2: 9Observer 2: 10*/
Na saída, você pode observar que o segundo Observer começou a imprimir a partir de 1, mesmo após a assinatura após 5 segundos. Isso acontece porque o segundo Observer recebeu uma cópia do Observable cuja função de assinatura foi chamada novamente. Isso ilustra o comportamento unicast de Observables.
assuntos
Um Assunto é um tipo especial de Observável.
Criando um Assunto
Um Assunto é criado usando o new Subject
construtor.
import { Subject } from 'rxjs';// Create a subjectconst subject = new Subject();
Assinando um Assunto
A inscrição em um Assunto é semelhante à inscrição em um Observable: você usa o .subscribe
método e passar um observador.
subject.subscribe({ next: (x) => console.log(x), error: (x) => console.log(x), complete: () => console.log("done")});
Execução de um Assunto
Ao contrário de Observables, um Assunto chama de seu .next
, .error
e .complete
métodos para enviar dados aos Observadores.
// Push data to all observerssubject.next('first data');// Push error to all observerssubject.error('oops something went wrong');// Completesubject.complete('done');
Os assuntos são Multicast
Os assuntos são multicast: vários Observadores compartilham o mesmo Assunto e seu caminho de execução. Isso significa que todas as notificações são transmitidas a todos os observadores. É como assistir a um programa ao vivo. Todos os espectadores estão assistindo o mesmo segmento do mesmo conteúdo ao mesmo tempo.
Exemplo: vamos criar um Assunto que emita de 1 a 10 em 10 segundos. Em seguida, assine o Observable uma vez imediatamente e novamente após 5 segundos.
// Create a subjectconst subject = new Subject();let count = 1;const interval = setInterval(() => { subscriber.next(count++); if (count > 10) { clearInterval(interval); }}, 1000);// Subscribe to the subjectssubject.subscribe(data => { console.log(`Observer 1: ${data}`);});// After 5 seconds subscribe againsetTimeout(() => { subject.subscribe(data => { console.log(`Observer 2: ${data}`); });}, 5000);/* OUTPUTObserver 1: 1Observer 1: 2Observer 1: 3Observer 1: 4Observer 1: 5Observer 2: 5Observer 1: 6Observer 2: 6Observer 1: 7Observer 2: 7Observer 1: 8Observer 2: 8Observer 1: 9Observer 2: 9Observer 1: 10Observer 2: 10*/
Na saída, você pode notar que o segundo Observer começou a imprimir a partir de 5 em vez de começar a 1. Isso acontece porque o segundo Observer está compartilhando o mesmo Assunto. Desde que se inscreveu após 5 segundos, o Assunto já terminou de emitir 1 a 4. Isso ilustra o comportamento multicast de um Assunto.
Os sujeitos são observáveis e observadores
Os sujeitos têm o .next
, .error
e .complete
métodos. Isso significa que eles seguem a estrutura dos Observadores. Portanto, um Assunto também pode ser usado como Observador e passado para o .subscribe
função de observáveis ou outros assuntos.
Exemplo: vamos criar um Observável e um Assunto. Em seguida, assine o Observável usando o Assunto como Observador. Por fim, assine o Assunto. Todos os valores emitidos pelo Observável serão enviados ao Assunto, e o Assunto transmitirá os valores recebidos a todos os seus Observadores.
// Create an Observable that emits data every secondconst observable = new Observable(subscriber => { let count = 1; const interval = setInterval(() => { subscriber.next(count++); if (count > 5) { clearInterval(interval); } }, 1000);});// Create a subjectconst subject = new Subject();// Use the Subject as Observer and subscribe to the Observableobservable.subscribe(subject);// Subscribe to the subjectsubject.subscribe({ next: value => console.log(value)});/* Output12345*/
Operadores
Operadores são o que torna o RxJS útil. Operadores são funções puras que retornam um novo Observable. Eles podem ser categorizados em 2 categorias principais:
- Operadores de criação
- Operadores Pipeable
Operadores de criação
Operadores de criação são funções que podem criar um novo Observável.
Exemplo: podemos criar um Observable que emite cada elemento de uma matriz usando o from
operador.
const observable = from([2, 30, 5, 22, 60, 1]);observable.subscribe({ next: (value) => console.log("Received", value), error: (err) => console.log(err), complete: () => console.log("done")});/* OUTPUTSReceived 2Received 30Received 5Received 22Received 60Received 1done*/
O mesmo pode ser um Observável usando o diagrama de mármore.
Operadores Pipeable
Operadores Pipeable são funções que recebem um Observable como entrada e retornam um novo Observable com comportamento modificado.
Exemplo: vamos pegar o Observable que criamos usando o from
operador. Agora, usando este Observable, podemos criar um novo Observable que emita apenas números maiores que 10 usando o filter
operador.
const greaterThanTen = observable.pipe(filter(x => x > 10));greaterThanTen.subscribe(console.log, console.log, () => console.log("completed"));// OUTPUT// 11// 12// 13// 14// 15
O mesmo pode ser representado usando o diagrama de mármore.
Existem muitos operadores mais úteis por aí. Você pode ver a lista completa de operadores, além de exemplos na documentação oficial do RxJS aqui.
É crucial entender todos os operadores comumente usados. Aqui estão alguns operadores que eu uso com frequência:
mergeMap
switchMap
exhaustMap
map
catchError
startWith
delay
debounce
throttle
interval
from
of
Redux Observables
De acordo com o site oficial,
RxJSmiddleware baseado em Restaurado. Componha e cancele ações assíncronas para criar efeitos colaterais e muito mais.
No Redux, sempre que uma ação é despachada, ela executa todas as funções do redutor e um novo estado é retornado.
Redux-observable pega todas essas ações despachadas e novos estados e cria dois observáveis a partir dele – Ações observáveis action$
e Estados observáveis state$
.
As ações observáveis emitirão todas as ações despachadas usando o store.dispatch()
. Os estados observáveis emitirão todos os novos objetos de estado retornados pelo redutor de raiz.
Epics
De acordo com o site oficial,
É uma função que executa um fluxo de ações e retorna um fluxo de ações. Ações dentro, ações fora.
Epopéias são funções que podem ser usadas para assinar Ações e Observáveis dos Estados. Uma vez inscritos, os épicos receberão o fluxo de ações e estados como entrada e devem retornar um fluxo de ações como saída. Actions In – Actions Out.
const someEpic = (action$, state$) => { return action$.pipe( // subscribe to actions observable map(action => { // Receive every action, Actions In return someOtherAction(); // return an action, Actions Out }) )}
É importante entender que todas as ações recebidas na Epopeia já foram terminou de correr através dos redutores.
Dentro de uma epopeia, podemos usar qualquer padrão observável do RxJS, e é isso que torna o redux-observável útil.
Exemplo: nós podemos usar o .filter
operador para criar um novo intermediário observável. Da mesma forma, podemos criar qualquer número de observáveis intermediários, mas a saída final do observável final deve ser uma ação; caso contrário, uma exceção será levantada pelo redux-observável.
const sampleEpic = (action$, state$) => { return action$.pipe( filter(action => action.payload.age >= 18), // can create intermediate observables and streams map(value => above18(value)) // where above18 is an action creator );}
Toda ação emitida pela Epics é despachada imediatamente usando o store.dispatch()
.
Configuração
Primeiro, vamos instalar as dependências.
npm install --save rxjs redux-observable
Crie uma pasta separada chamada epics
para manter todos os épicos. Crie um novo arquivo index.js
dentro de epics
pasta e combine todos os épicos usando o combineEpics
para criar a epopeia raiz. Em seguida, exporte a epopeia raiz.
import { combineEpics } from 'redux-observable';import { epic1 } from './epic1';import { epic2 } from './epic2';const epic1 = (action$, state$) => { ... } const epic2 = (action$, state$) => { ... } export default combineEpics(epic1, epic2);
Crie um middleware épico usando o createEpicMiddleware
função e passá-lo para o createStore
Função Redux.
import { createEpicMiddleware } from 'redux-observable';import { createStore, applyMiddleware } from 'redux';import rootEpic from './rootEpics';const epicMiddleware = createEpicMiddlware();const store = createStore( rootReducer, applyMiddleware(epicMiddlware));
Por fim, passe o épico raiz para o middleware épico .run
método.
epicMiddleware.run(rootEpic);
Alguns casos práticos
O RxJS tem uma grande curva de aprendizado e a configuração observável ao redux piora o já doloroso processo de configuração do Redux. Tudo isso faz com que o Redux observável pareça um exagero. Mas aqui estão alguns casos de uso práticos que podem mudar de idéia.
Nesta seção, compararei redux-observables com redux-thunk para mostrar como o redux-observables pode ser útil em casos de uso complexos. Eu não odeio redux-thunk, adoro e uso todos os dias!
1. Faça chamadas de API
Caso: Faça uma chamada de API para buscar comentários de uma postagem. Mostrar carregadores quando a chamada da API estiver em andamento e também manipular erros de API.
Uma implementação redux-thunk terá esta aparência,
function getComments(postId){ return (dispatch) => { dispatch(getCommentsInProgress()); axios.get(`/v1/api/posts/${postId}/comments`).then(response => { dispatch(getCommentsSuccess(response.data.comments)); }).catch(() => { dispatch(getCommentsFailed()); }); }}
e isso é absolutamente correto. Mas o criador da ação está inchado.
Podemos escrever uma Epopeia para implementar o mesmo usando redux-observables.
const getCommentsEpic = (action$, state$) => action$.pipe( ofType('GET_COMMENTS'), mergeMap((action) => from(axios.get(`/v1/api/posts/${action.payload.postId}/comments`).pipe( map(response => getCommentsSuccess(response.data.comments)), catchError(() => getCommentsFailed()), startWith(getCommentsInProgress()) ));
Agora nos permite ter um criador de ações limpo e simples como este,
function getComments(postId) { return { type: 'GET_COMMENTS', payload: { postId } }}
2. Solicitação de devolução
Caso: Forneça o preenchimento automático para um campo de texto chamando uma API sempre que o valor do campo de texto for alterado. A chamada à API deve ser feita 1 segundo após o usuário parar de digitar.
Uma implementação redux-thunk terá esta aparência,
let timeout;function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { axios.get(`/suggestions?q=${value}`) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); }}
Requer uma variável global timeout
. Quando começamos a usar variáveis globais, nossos criadores de ações não são mais funções puras. Também fica difícil testar os criadores de ações que usam uma variável global.
Podemos implementar o mesmo com redux observável usando o .debounce
operador.
const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), debounce(1000), mergeMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()));
Agora, nossos criadores de ações podem ser limpos e, mais importante, podem ser funções puras novamente.
function valueChanged(value) { return { type: 'VALUE_CHANGED', payload: { value } }}
3. Solicitar cancelamento
Caso: Continuando o caso de uso anterior, suponha que o usuário não digitou nada por 1 segundo e fizemos nossa primeira chamada de API para buscar as sugestões.
Digamos que a própria API leva em média 2-3 segundos para retornar o resultado. Agora, se o usuário digitar algo enquanto a 1ª chamada da API estiver em andamento, após 1 segundo, faremos a nossa 2ª API. Podemos acabar tendo duas chamadas de API ao mesmo tempo, e isso pode criar uma condição de corrida.
Para evitar isso, precisamos cancelar a 1ª chamada da API antes de fazer a 2ª chamada da API.
Uma implementação redux-thunk terá esta aparência,
let timeout;var cancelToken = axios.cancelToken;let apiCall;function valueChanged(value) { return dispatch => { dispatch(loadSuggestionsInProgress()); dispatch({ type: 'VALUE_CHANGED', payload: { value } }); // If changed again within 1 second, cancel the timeout timeout && clearTimeout(timeout); // Make API Call after 1 second timeout = setTimeout(() => { // Cancel the existing API apiCall && apiCall.cancel('Operation cancelled'); // Generate a new token apiCall = cancelToken.source(); axios.get(`/suggestions?q=${value}`, { cancelToken: apiCall.token }) .then(response => dispatch(loadSuggestionsSuccess(response.data.suggestions))) .catch(() => dispatch(loadSuggestionsFailed())) }, 1000, value); }}
Agora, ele requer outra variável global para armazenar o token de cancelamento do Axios. Mais variáveis globais = funções mais impuras!
Para implementar o mesmo usando redux-observable, tudo o que precisamos fazer é substituir .mergeMap
com .switchMap
.
const loadSuggestionsEpic = (action$, state$) => action$.pipe( ofType('VALUE_CHANGED'), throttle(1000), switchMap(action => from(axios.get(`/suggestions?q=${action.payload.value}`)).pipe( map(response => loadSuggestionsSuccess(response.data.suggestions)), catchError(() => loadSuggestionsFailed()) )), startWith(loadSuggestionsInProgress()));
Como não requer alterações nos criadores de nossas ações, eles podem continuar sendo funções puras.
Da mesma forma, existem muitos casos de uso em que o Redux-Observables realmente brilha! Por exemplo, pesquisando uma API, mostrando lanchonetes, gerenciando conexões WebSocketetc.
Concluir
Se você estiver desenvolvendo um aplicativo Redux que envolva casos de uso tão complexos, é altamente recomendável usar o Redux-Observables. Afinal, os benefícios de usá-lo são diretamente proporcionais à complexidade do seu aplicativo e são evidentes nos casos de uso práticos mencionados acima.
Eu acredito firmemente que usar o conjunto certo de bibliotecas nos ajudará a desenvolver aplicações muito mais limpas e sustentáveise, a longo prazo, os benefícios de usá-los serão superiores aos inconvenientes.