Arquivo da tag: RxJS

RxJS – Creating An Observable with RxJS

<!DOCTYPE html>
<html>
<head>
<script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
</head>
<body>

</body>
</html>

Javascript

//see output in the console!

console.clear();

var source = Rx.Observable.create(function(observer) {
  var id = setTimeout(function() {
    try {
      // throw 'my bad error';
      observer.onNext(42);
      observer.onCompleted();
    } catch (error) {
      observer.onError(error);
    }
  }, 1000);
  
  console.log('started');
  
  return function() {
    console.log('disposal called');
    clearTimeout(id);
  };
});

var sub = source.subscribe(function(x) {
  console.log('next ' + x);
}, function(err) {
  console.error(err);
}, function() {
  console.info('done');
});

RxJS – Observables vs Promises

<!DOCTYPE html>
<html>
<head>
<script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
</head>
<body>

</body>
</html>

Javascript

console.clear();

// var promise = new Promise((resolve) => {
//   setTimeout(() => {
//     console.log('promise timeout hit');
//     resolve(42);
//   }, 1000);
//   console.log('promise started');
// });

// promise.then(x => console.log(x));

var source = Rx.Observable.create((observer) => {
  var id = setTimeout(() => {
    console.log('observable timeout hit');
    observer.onNext(42);
  }, 1000);
  console.log('observable started');
  
  return () => {
    console.log('dispose called');
    clearTimeout(id);
  };
});

var disposable = source.forEach(x => console.log(x));

setTimeout(() => {
  disposable.dispose();
}, 500);

RxJS – Throttled Buffering in RxJS

<!DOCTYPE html>
<html>
<head>
<script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
</head>
<body>
  <button id="clickMe">Click Me</button>
  
  <h4>values sent</h4>
  <div id="results"></div>
</body>
</html>

Javascript

var btn = document.querySelector('#clickMe');

var clicks = Rx.Observable.fromEvent(btn, 'click');

clicks.scan(0, (s) => s + 1)
  .buffer(clicks.throttle(1000))
  .forEach(x => sendValues(x));

function sendValues(arr) {
  var pre = document.createElement('pre');
  pre.innerHTML = JSON.stringify(arr);
  document.querySelector('#results')
    .appendChild(pre);
}

RxJS – Stream Processing With RxJS vs Array Higher-Order Functions

<!DOCTYPE html>
<html>
<head>
<script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
</head>
<body>

</body>
</html>

Javascript

//see console for output!

console.clear();

var source = Rx.Observable.fromArray([0,1,2,3,4,5]);

source.filter((x) => {
  console.log('filtering ' + x);
  return x % 2 === 0;
})
.map((x) => {
  console.log('mapping ' + x);
  return x + '!';
})
.reduce((r, x) => {
  console.log('reducing ' + x);
  return r + x;
}, '')
.subscribe(result => {
  console.log(result);
});

RxJS – Toggle A Stream On And Off With RxJS

<!DOCTYPE html>
<html>
<head>
<script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
</head>
<body>
  <input type="checkbox" id="toggle">
  <div id="display"></div>
</body>
</html>

Javascript

var source = Rx.Observable.interval(100)
  .map(() => '.');

var display = document.querySelector('#display');

var toggle = document.querySelector('#toggle');

var checked = Rx.Observable.fromEvent(toggle, 'change')
  .map(e => e.target.checked);

checked.filter(x => x === true)
  .flatMapLatest(() => source.takeUntil(checked))
  .subscribe(x => display.innerText += x);

RxJS – Demystifying Cold and Hot Observables in RxJS

<!DOCTYPE html>
<html>
<head>
  <script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.3.22/rx.all.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
</head>
<body>
</body>
</html>

Javascript

//SEE CONSOLE FOR OUTPUT!

var clock = Rx.Observable.interval(1000).take(10).map(x => x+1)
  .publish().refCount();

clock.subscribe(i => console.log('a: ' + i));

setTimeout(function () {
  clock.subscribe(i => console.log('   b: ' + i));
}, 4500);

RxJS – Aggregating Streams With Reduce And Scan using RxJS

<!DOCTYPE html>
<html>
<head>
<script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.5.2/rx.all.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
</head>
<body>

</body>
</html>

Javascript

console.clear();

var source = Rx.Observable.interval(100);

source.scan(0, function(r, x) {
  return r + x;
}).subscribe(function(result) {
  console.log(result);
});

RxJS – Introduction to the ConnectableObservable and using publish().refCount()

<!DOCTYPE html>
<html>
<head>
  <script src="//cdnjs.cloudflare.com/ajax/libs/rxjs/2.3.22/rx.all.js"></script>
  <meta charset="utf-8">
  <title>JS Bin</title>
</head>
<body>
</body>
</html>

Javascript

//SEE CONSOLE FOR OUTPUT!

var clock = Rx.Observable.interval(1000)
  .take(10).map(x => x+1).startWith(0)
  .publish();

console.log('a subscribed');
clock.subscribe(i => console.log('a: ' + i));

setTimeout(function () {
  console.log('b subscribed');
  clock.subscribe(i => console.log('   b: ' + i));
}, 4500);

setTimeout(function () {
  console.log('connected');
  clock.connect();
}, 5000);