CINXE.COM

Kefir.js — fast and light Reactive Programming library for JavaScript inspired by Bacon.js and RxJS

<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><title>Kefir.js — fast and light Reactive Programming library for JavaScript inspired by Bacon.js and RxJS</title><meta name="viewport" content="width=device-width, initial-scale=1"><style type="text/css">/*! normalize.css v3.0.1 | MIT License | git.io/normalize */ html{font-family:sans-serif;-ms-text-size-adjust:100%;-webkit-text-size-adjust:100%} body{margin:0} article,aside,details,figcaption,figure, footer,header,hgroup,main,nav,section,summary{display:block} audio,canvas,progress,video{display:inline-block;vertical-align:baseline} audio:not([controls]){display:none;height:0}[hidden],template{display:none} a{background:0 0} a:active,a:hover{outline:0} abbr[title]{border-bottom:1px dotted} b,strong{font-weight:700} dfn{font-style:italic} h1{font-size:2em;margin:.67em 0} mark{background:#ff0;color:#000} small{font-size:80%} sub,sup{font-size:75%;line-height:0;position:relative;vertical-align:baseline} sup{top:-.5em} sub{bottom:-.25em} img{border:0}svg:not(:root){overflow:hidden} figure{margin:1em 40px} hr{-moz-box-sizing:content-box;box-sizing:content-box;height:0} pre{overflow:auto} code,kbd,pre,samp{font-family:monospace,monospace;font-size:1em} /*button,input,optgroup,select,textarea{color:inherit;font:inherit;margin:0}*/ button{overflow:visible} button,select{text-transform:none} button,html input[type=button],input[type=reset],input[type=submit]{ -webkit-appearance:button;cursor:pointer} button[disabled],html input[disabled]{cursor:default} button::-moz-focus-inner,input::-moz-focus-inner{border:0;padding:0} input{line-height:normal} input[type=checkbox],input[type=radio]{box-sizing:border-box;padding:0} input[type=number]::-webkit-inner-spin-button, input[type=number]::-webkit-outer-spin-button{height:auto} input[type=search]{-webkit-appearance:textfield;-moz-box-sizing:content-box; -webkit-box-sizing:content-box;box-sizing:content-box} input[type=search]::-webkit-search-cancel-button, input[type=search]::-webkit-search-decoration{-webkit-appearance:none} fieldset{border:1px solid silver;margin:0 2px;padding:.35em .625em .75em} legend{border:0;padding:0} textarea{overflow:auto} optgroup{font-weight:700} table{border-collapse:collapse;border-spacing:0} td,th{padding:0} h1,h2,h3,h4,h5,h6,strong,th,b { font-weight: 500; } figure { margin: 1em 0; } figure img { max-width: 100%; } body { font-size: 14px; line-height: 22px; background: #f4f4f4; color: #000; min-width: 700px; } a { color: #476cff; text-decoration: none; } a:hover { text-decoration: underline; } p { margin: 20px 0; } h2 { padding-top: 1em; } h2 { font-size: 20px; } h2 small, h2 sup, h2 sub { font-weight: normal; } h1 small { color: #b1b1b1; line-height: 2; } h1 sup { font-weight: normal; } table, tr, td { margin: 0; padding: 0; } td, th { padding: 2px 14px 2px 0; } th { text-align: left; } table .rule { padding: 5px 0; } table .rule:after { content: ''; display: block; border-bottom: solid 1px #ddd; } hr { height: 0; border: none; border-bottom: solid 1px #ddd; } ul { list-style-type: square; padding: 0 0 0 20px; } li { margin-bottom: 10px; } code, pre, tt { font-family: "Fira Mono", Monaco, Consolas, "Lucida Console", monospace; font-size: 12px; line-height: 18px; font-style: normal; } tt { background: #fbfbfb; } code { margin-left: 20px; } pre { font-size: 12px; padding: 2px 2px 2px 12px; border-left: 3px solid #DFDFDF; margin: 0 0 30px; overflow: auto; position: relative; } @media (max-width: 800px) { pre[title]:not([title=example]) { padding-top: 18px; } } pre[title]:not([title=example]):after { content: attr(title); position: absolute; top: 0; right: 0; padding: 0 .5em; font-size: .8em; color: #8b8b8b; } pre + pre { margin-top: -10px; } .sidebar { background: #fbfbfb; position: fixed; top: 0; left: 0; bottom: 0; width: 210px; overflow-y: auto; overflow-x: hidden; -webkit-overflow-scrolling: touch; padding: 15px 30px 30px; border-right: 1px solid #bbb; font-family: "Fira Sans", "Lucida Grande", "Lucida Sans Unicode", Helvetica, Arial, sans-serif; } @media (max-width: 1200px) { .sidebar { width: 170px; } } .sidebar a { color: inherit; } .toc-title { display: block; font-weight: 500; margin-top: 20px; line-height: 1.2; margin-bottom: 10px; } .toc-section { font-size: 12px; line-height: 17px; margin: 5px 0 0 0; padding-left: 0px; list-style-type: none; } .toc-section li { margin: 0 0 3px 0; } .toc-section a { text-decoration: none; } .toc-section a:hover { text-decoration: underline; } .container { max-width: 680px; margin: 40px 10px 50px 300px; font-family: "Fira Sans", Helvetica, Arial, sans-serif; } @media (max-width: 1200px) { .container { margin-left: 260px; } } .header { font-size: 16px; line-height: 30px; font-weight: 500; color: inherit; } .alias, .shorthand { display: block; margin-left: 1em; margin-bottom: .4em; } .alias:before, .shorthand:before { content: 'Alias:'; font-family: Helvetica, Arial, sans-serif; font-style: italic; font-size: 1.1em; font-weight: 500; margin-right: .5em; } .shorthand:before { content: 'Shorthand for:'; } p[id] { margin-top: 4em; } h2[id]:not(.no-extra-padding) { margin-top: 3em; } h2[id] + p[id]{ margin-top: 3em; } .logo-img { width: 60px; height: 60px; vertical-align: bottom; margin-bottom: -4px; } [data-emoji] { width: 20px; height: 20px; visibility: hidden; } [data-emoji][src] { visibility: visible; } </style><script src="https://cdnjs.cloudflare.com/ajax/libs/jquery/2.1.4/jquery.min.js"></script><script>window.module = {}</script><script src="https://unpkg.com/transducers-js@0.4.174"></script><script>window.transducers = window.module.exports delete window.module</script><script src="dist/kefir.js"></script><link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.6/styles/tomorrow.min.css"><link rel="stylesheet" href="https://code.cdn.mozilla.net/fonts/fira.css"><style type="text/css">.hljs { background: transparent; padding: 2px 2px 2px 12px; }</style><script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/8.6/highlight.min.js"></script></head><body><div class="sidebar"><a class="toc-title" href="#">Kefir.js</a><ul class="toc-section"><li><a href="#installation">Installation</a></li></ul><a class="toc-title" href="#examples">Examples</a><a class="toc-title" href="#about-observables">Intro to Streams and Properties</a><a class="toc-title" href="#create-stream">Create a stream</a><ul class="toc-section"><li><a href="#never">never</a></li><li><a href="#later">later</a></li><li><a href="#interval">interval</a></li><li><a href="#sequentially">sequentially</a></li><li><a href="#from-poll">fromPoll</a></li><li><a href="#with-interval">withInterval</a></li><li><a href="#from-callback">fromCallback</a></li><li><a href="#from-node-callback">fromNodeCallback</a></li><li><a href="#from-event">fromEvents</a></li><li><a href="#stream">stream</a></li></ul><a class="toc-title" href="#create-property">Create a property</a><ul class="toc-section"><li><a href="#constant">constant</a></li><li><a href="#constant-error">constantError</a></li><li><a href="#from-promise">fromPromise</a></li></ul><a class="toc-title" href="#convert">Convert observables</a><ul class="toc-section"><li><a href="#to-property">toProperty</a></li><li><a href="#changes">changes</a></li></ul><a class="toc-title" href="#main-methods">Subscribe / add side effects</a><ul class="toc-section"><li><a href="#observe">observe</a></li><li><a href="#on-value">onValue</a></li><li><a href="#off-value">offValue</a></li><li><a href="#on-error">onError</a></li><li><a href="#off-error">offError</a></li><li><a href="#on-end">onEnd</a></li><li><a href="#off-end">offEnd</a></li><li><a href="#on-any">onAny</a></li><li><a href="#off-any">offAny</a></li><li><a href="#log">log</a></li><li><a href="#off-log">offLog</a></li><li><a href="#spy">spy</a></li><li><a href="#off-spy">offSpy</a></li></ul><a class="toc-title" href="#modify">Modify an observable</a><ul class="toc-section"><li><a href="#map">map</a></li><li><a href="#map-errors">mapErrors</a></li><li><a href="#filter">filter</a></li><li><a href="#filter-errors">filterErrors</a></li><li><a href="#take">take</a></li><li><a href="#take-errors">takeErrors</a></li><li><a href="#take-while">takeWhile</a></li><li><a href="#last">last</a></li><li><a href="#skip">skip</a></li><li><a href="#skip-while">skipWhile</a></li><li><a href="#skip-duplicates">skipDuplicates</a></li><li><a href="#diff">diff</a></li><li><a href="#scan">scan</a></li><li><a href="#flatten">flatten</a></li><li><a href="#delay">delay</a></li><li><a href="#throttle">throttle</a></li><li><a href="#debounce">debounce</a></li><li><a href="#ignore-values">ignoreValues</a></li><li><a href="#ignore-errors">ignoreErrors</a></li><li><a href="#ignore-end">ignoreEnd</a></li><li><a href="#before-end">beforeEnd</a></li><li><a href="#sliding-window">slidingWindow</a></li><li><a href="#buffer-while">bufferWhile</a></li><li><a href="#buffer-with-count">bufferWithCount</a></li><li><a href="#buffer-with-time-or-count">bufferWithTimeOrCount</a></li><li><a href="#transduce">transduce</a></li><li><a href="#thru">thru</a></li><li><a href="#with-handler">withHandler</a></li></ul><a class="toc-title" href="#combine-observables">Combine observables</a><ul class="toc-section"><li><a href="#combine">combine</a></li><li><a href="#zip">zip</a></li><li><a href="#merge">merge</a></li><li><a href="#concat">concat</a></li><li><a href="#pool">pool</a></li><li><a href="#repeat">repeat</a></li><li><a href="#flat-map">flatMap</a></li><li><a href="#flat-map-latest">flatMapLatest</a></li><li><a href="#flat-map-first">flatMapFirst</a></li><li><a href="#flat-map-concat">flatMapConcat</a></li><li><a href="#flat-map-with-concurrency-limit">flatMapConcurLimit</a></li><li><a href="#flat-map-errors">flatMapErrors</a></li></ul><a class="toc-title" href="#combine-two">Combine two observables</a><ul class="toc-section"><li><a href="#filter-by">filterBy</a></li><li><a href="#obs-sampled-by">sampledBy</a></li><li><a href="#skip-until-by">skipUntilBy</a></li><li><a href="#take-until-by">takeUntilBy</a></li><li><a href="#buffer-by">bufferBy</a></li><li><a href="#buffer-while-by">bufferWhileBy</a></li></ul><a class="toc-title" href="#interop">Interoperability</a><ul class="toc-section"><li><a href="#from-promise">fromPromise</a></li><li><a href="#to-promise">toPromise</a></li><li><a href="#from-es-observable">fromESObservable</a></li><li><a href="#to-es-observable">toESObservable</a></li><li><a href="#static-land">Static Land</a></li></ul><a class="toc-title" href="#active-state">Activation and deactivation of observables</a><a class="toc-title" href="#emitter-object">Emitter</a><a class="toc-title" href="#about-errors">Errors</a><a class="toc-title" href="#current-in-streams">Current values/errors in streams</a></div><div class="container"><h1><img src="Kefir.svg" class="logo-img"> Kefir.js <sup>3.8.5 (<a href="https://github.com/kefirjs/kefir/blob/master/changelog.md">changelog</a>)</sup></h1><p>Kefir &mdash; is a Reactive Programming library for JavaScript inspired by <a href="https://github.com/baconjs/bacon.js">Bacon.js</a> and <a href="http://reactivex.io/rxjs/">RxJS</a>, with focus on high performance and low memory usage. </p><p>Kefir has a <a href="https://github.com/kefirjs/kefir"> <img data-emoji="octocat"> GitHub repository</a>, where you can <a href="https://github.com/kefirjs/kefir/pulls">send pull requests</a>, <a href="https://github.com/kefirjs/kefir/issues">report bugs</a>, and have fun reading <a href="https://github.com/kefirjs/kefir/tree/master/src">source code</a>. </p><p>See also <a href="https://github.com/kefirjs/kefir/blob/master/deprecated-api-docs.md">Deprecated API docs</a>. </p><h2 id="installation">Installation</h2><p>Kefir is available as an NPM and a Bower package, as well as a simple file download. </p><h3 id="npm">NPM</h3><pre>npm install kefir </pre><h3 id="bower">Bower</h3><pre>bower install kefir </pre><h3 id="downloads">Downloads <sup>(3.8.5)</sup></h3><table><tr><th rowspan="1" valign="top">For Development</th><td><a href="dist/kefir.js">kefir.js</a></td><td><i>~ 100 KB</i></td></tr><tr><th rowspan="1" valign="top">For Production</th><td><a href="dist/kefir.min.js">kefir.min.js</a></td><td><i>~ 10 KB (when gzipped)</i></td></tr><tr><td class="rule" colspan="3"></td></tr><tr><th>All files</th><td><a href="https://github.com/kefirjs/kefir/archive/3.8.5.zip">kefir-3.8.5.zip</a></td><td>... including documentation, tests, source maps, etc.</td></tr></table><p>Kefir also available on <a href="http://www.jsdelivr.com/#!kefir">jsDelivr</a>.</p><h2 id="examples">Examples</h2><p>Let's start from a quick little example to get you a feel of what is it like to program with Kefir. First we create a stream of events that will produce three numbers with interval of 100 milliseconds: </p><pre class="javascript" title="example">var numbers = Kefir.sequentially(100, [1, 2, 3]); </pre><p>Now let's create another stream based on the first one. As you might guess, it will produce 2, 4, and 6. </p><pre class="javascript" title="example">var numbers2 = numbers.map(x =&gt; x * 2); </pre><p>Suppose we don't want number 4 to be in the sequence, no problem, we can filter it out: </p><pre class="javascript" title="example">var numbers3 = numbers2.filter(x =&gt; x !== 4); </pre><p>Ok, I think numbers3 stream is what we want, it's time to subscribe to it and to get the values: </p><pre class="javascript" title="example">numbers3.onValue(x =&gt; { logger.log(x); }); </pre><script>function Logger(domNode, divider) { this.domNode = domNode; this.divider = divider || ' '; this.domNode.innerHTML = ''; } Logger.prototype.log = function(x) { this.domNode.innerHTML += '' + x + this.divider; } function runExample1() { var logger = new Logger(document.querySelector('#example-log-1')); Kefir.sequentially(100, [1, 2, 3]) .map(function(x) { return x * 2; }) .filter(function(x) { return x !== 4; }) .onValue(function(x) { logger.log(x); }); } </script><p><button onClick="runExample1()">Run example</button></p><pre title="logger output" id="example-log-1" style="min-height:18px;"></pre><hr><p>Nice, here is another one. Let's this time begin from a streams based on user actions instead of timer. First we create a stream that will contain button click events: </p><pre class="javascript" title="example">var btnClicks = Kefir.fromEvents(document.querySelector(&#x27;#ex-2-btn&#x27;), &#x27;click&#x27;); </pre><p>Also let's create a stream of a text input value changes: </p><pre class="javascript" title="example">var inputValue = Kefir.fromEvents(document.querySelector(&#x27;#ex-2-input&#x27;), &#x27;keyup&#x27;) .map(event =&gt; event.target.value); </pre><p>That's a good beginning, we have streams representing user actions, now we can transform and combine them to create our desired result stream. First we want to have a <a href="#about-observables">property</a> representing how many times user clicked the button, we will use <a href="#scan">scan</a> method to create it: </p><pre class="javascript" title="example">var clicksCount = btnClicks.scan(sum =&gt; sum + 1, 0); </pre><p>Now we have two numbers, first is clicks count, and second is text field content, which actually a string, but let's fix that: </p><pre class="javascript" title="example">var inputNumber = inputValue.map(text =&gt; parseInt(text, 10)); </pre><p>Done, but now it can produce the <tt>NaN</tt> if user type "banana" or something instead of a number. Let's fix this too using <a href="#about-errors">errors handling</a> that Kefir provides: </p><pre class="javascript" title="example">var fixedInputNumber = inputNumber.flatMap( x =&gt; isNaN(x) ? Kefir.constantError(&#x27;banana?&#x27;) : Kefir.constant(x) ); </pre><p>Almost done, final step is to combine our two dynamic number values. Suppose we want to multiply them: </p><pre class="javascript" title="example">var theResult = Kefir.combine([fixedInputNumber, clicksCount], (a, b) =&gt; a * b); </pre><p>Good, let's display the result: </p><pre class="javascript" title="example">var outputElement = document.querySelector(&#x27;#ex-2-output&#x27;); theResult .onValue(x =&gt; { outputElement.innerHTML = x; }) .onError(error =&gt; { outputElement.innerHTML = &#x27;&lt;span style=&quot;color:red&quot;&gt;&#x27; + error + &#x27;&lt;/span&gt;&#x27;; }); </pre><p><button id="ex-2-btn">click me</button> <span id="ex-2-count-output"></span> * <input id="ex-2-input" placeholder="try &quot;banana&quot;"> = <span id="ex-2-output"></span></p><script>var clicksCount = Kefir .fromEvents(document.querySelector('#ex-2-btn'), 'click') .scan(function(sum, event) { return sum + 1; }, 0); var fixedInputNumber = Kefir .fromEvents(document.querySelector('#ex-2-input'), 'keyup') .map(function(event) { return event.target.value; }) .map(function(text) { return parseInt(text, 10); }) .flatMap(function(x) { return isNaN(x) ? Kefir.constantError('banana?') : Kefir.constant(x) }); var theResult = Kefir.combine([fixedInputNumber, clicksCount], function(a, b) { return a * b; }); var countOutputElement = document.querySelector('#ex-2-count-output'); clicksCount.onValue(function(x) { countOutputElement.innerHTML = x; }); var outputElement = document.querySelector('#ex-2-output'); theResult .onValue(function(x) { outputElement.innerHTML = x; }) .onError(function(error) { outputElement.innerHTML = '<span style="color:red">' + error + '</span>'; }); </script><hr><h3 id="more-examples">More examples</h3><ul><li><a href="http://jsfiddle.net/fxv6dpo2/">Click to alert</a> (a most basic example)</li><li><a href="http://jsfiddle.net/vyppkj0k/3/">Clock</a> + <a href="http://jsfiddle.net/vyppkj0k/4/">Controlled by scrolling</a> + <a href="http://jsfiddle.net/vyppkj0k/5/">Controlled by both time and scrolling</a></li><li><a href="http://jsfiddle.net/tn0z97e1/">Multiple clicks</a></li><li><a href="http://jsfiddle.net/cbg9rdd3/">The “Secret combination” challenge</a> (from <a href="http://www.jayway.com/2014/09/16/comparing-core-async-and-rx-by-example/">here</a>)</li><li><a href="http://jsfiddle.net/4Lx9ktpp/">Drag a div</a></li><li><a href="http://jsfiddle.net/8sd79x3L/">Tree</a> (Bacon.js vs Kefir.js performance comparison)</li><li><a href="http://jsfiddle.net/v1Lesw91/">AJAX search</a> (with errors handling and cancelation)</li></ul><p><img data-emoji="information_desk_person"> Also, almost any code snippet below can be run in the browser console, on this page. So you can play with Kefir right now, just open up the browser console.</p><h2 id="about-observables">Intro to Streams and Properties</h2><p>Kefir supports two types of observables — streams and properties. Streams represent sequences of events made available over time. And properties represent values that change over time. The value of a property changes in response to events, which means that any stream may be easily converted to a property. </p><figure><img src="docs-src/images/stream-and-property.png"></figure><p>In practice, the only difference between the two types of observables is that properties may have a current value. The process of subscribing to both types of observables is the same: you call the <a href="#on-value">onValue</a> method, passing a callback function to it. But when you subscribe to a property which has a current value, the callback is called immediately (synchronously) with the current value of the property.</p><h2 id="create-stream">Create a stream</h2><p id="never"><a class="header" href="#never">never</a><code>Kefir.never()</code><br/>Creates a stream that already ended and will never produce any events. </p><pre class="javascript" title="example">var stream = Kefir.never(); stream.log(); </pre><pre title="console output">&gt; [never] &lt;end:current&gt; </pre><pre title="events in time">stream: X</pre><div></div><p id="later"><a class="header" href="#later">later</a><code>Kefir.later(wait, value)</code><br/>Creates a stream that produces a single <b>value</b> after <b>wait</b> milliseconds, then ends. </p><pre class="javascript" title="example">var stream = Kefir.later(1000, 1); stream.log(); </pre><pre title="console output (after 1 second)">&gt; [later] &lt;value&gt; 1 &gt; [later] &lt;end&gt; </pre><pre title="events in time">stream: ----1X</pre><div></div><p id="interval"><a class="header" href="#interval">interval</a><code>Kefir.interval(interval, value)</code><br/>Creates a stream that produces the same <b>value</b> each <b>interval</b> milliseconds. Never ends. </p><pre class="javascript" title="example">var stream = Kefir.interval(1000, 1); stream.log(); </pre><pre title="console output">&gt; [interval] &lt;value&gt; 1 &gt; [interval] &lt;value&gt; 1 &gt; [interval] &lt;value&gt; 1 ... </pre><pre title="events in time">stream: ----1----1----1----1---</pre><div></div><p id="sequentially"><a class="header" href="#sequentially">sequentially</a><code>Kefir.sequentially(interval, values)</code><br/>Creates a stream containing the given <b>values</b> (array), delivered with the given <b>interval</b> in milliseconds. Ends after all <b>values</b> are delivered. </p><pre class="javascript" title="example">var stream = Kefir.sequentially(1000, [1, 2, 3]); stream.log(); </pre><pre title="console output">&gt; [sequentially] &lt;value&gt; 1 &gt; [sequentially] &lt;value&gt; 2 &gt; [sequentially] &lt;value&gt; 3 &gt; [sequentially] &lt;end&gt; </pre><pre title="events in time">stream: ----1----2----3X</pre><div></div><p id="from-poll"><a class="header" href="#from-poll">fromPoll</a><code>Kefir.fromPoll(interval, fn)</code><br/>Creates a stream that polls the given <b>fn</b> function, with the given <b>interval</b> in milliseconds, and emits the values returned by <b>fn</b>. Never ends. </p><pre class="javascript" title="example">var start = new Date(); var stream = Kefir.fromPoll(1000, () =&gt; new Date() - start); stream.log(); </pre><pre title="console output">&gt; [fromPoll] &lt;value&gt; 1001 &gt; [fromPoll] &lt;value&gt; 2002 &gt; [fromPoll] &lt;value&gt; 3004 &gt; [fromPoll] &lt;value&gt; 4006 &gt; [fromPoll] &lt;value&gt; 5007 &gt; [fromPoll] &lt;value&gt; 6007 ... </pre><pre title="events in time">stream: ----•----•----•----•--- 1001 2002 3004 4006</pre><div></div><p id="with-interval"><a class="header" href="#with-interval">withInterval</a><code>Kefir.withInterval(interval, handler)</code><br/>General method to create an interval based stream. Creates a stream that calls the given <b>handler</b> function, with the given <b>interval</b> in milliseconds. The <b>handler</b> function is called with one argument — an <a href="#emitter-object">emitter</a>. </p><pre class="javascript" title="example">var start = new Date(); var stream = Kefir.withInterval(1000, emitter =&gt; { var time = new Date() - start; if (time &lt; 4000) { emitter.emit(time); // emit a value } else { emitter.end(); // end the stream } }); stream.log(); </pre><pre title="console output">&gt; [withInterval] &lt;value&gt; 1002 &gt; [withInterval] &lt;value&gt; 2003 &gt; [withInterval] &lt;value&gt; 3005 &gt; [withInterval] &lt;end&gt; </pre><pre title="events in time">stream: ----•----•----•----X 1002 2003 3005</pre><div></div><p>You may call <b>emitter</b> methods several times on each interval tick, or not call them at all. </p><p id="from-callback"><a class="header" href="#from-callback">fromCallback</a><code>Kefir.fromCallback(fn)</code><br/>Convert a function that accepts a <b>callback</b> as the first argument to a stream. Emits at most one value when <b>callback</b> is called, then ends. The <b>fn</b> function will be called at most once, when the first subscriber will be added to the stream. </p><pre class="javascript" title="example">var stream = Kefir.fromCallback(callback =&gt; { // we use setTimeout here just to simulate some asynchronous activity setTimeout(() =&gt; callback(1), 1000); }); stream.log(); </pre><pre title="console output">&gt; [fromCallback] &lt;value&gt; 1 &gt; [fromCallback] &lt;end&gt; </pre><pre title="events in time">stream: ----1X</pre><div></div><p id="from-node-callback"><a class="header" href="#from-node-callback">fromNodeCallback</a><code>Kefir.fromNodeCallback(fn)</code><br/>Similar to <a href="#from-callback">fromCallback</a>, but the <b>callback</b> passed to <b>fn</b> is a Node.JS style callback — <tt>callback(error,&nbsp;result)</tt>. If the <b>error</b> argument of the <b>callback</b> is truthy, an error will be emitted from the result stream, otherwise a value is emitted. The stream will end after the first value or on error. </p><pre class="javascript" title="example">var stream = Kefir.fromNodeCallback(callback =&gt; { // we use setTimeout here just to simulate some asynchronous activity setTimeout(() =&gt; callback(null, 1), 1000); }); stream.log(); </pre><pre title="console output">&gt; [fromNodeCallback] &lt;value&gt; 1 &gt; [fromNodeCallback] &lt;end&gt; </pre><pre title="events in time">stream: ----1X</pre><div></div><p id="from-event"><a class="header" href="#from-event">fromEvents</a><code>Kefir.fromEvents(target, eventName, [transform])</code><br/>Creates a stream from events on a DOM EventTarget or a Node.JS EventEmitter object, or an object that supports event listeners using <tt>on/off</tt> methods (e.g. a jQuery object). </p><p>If a <b>transform</b> function is provided, it will be called on each event with the same arguments and context (<tt>this</tt>) as the event listener callback. And the value returned by <b>transform</b> will be emitted from the stream. If no <b>transform</b> function is provided, the first argument of the callback is emitted by default, i.e. the function <tt>x&nbsp;=&gt;&nbsp;x</tt> is used as <b>transform</b>. </p><pre class="javascript" title="example">var stream = Kefir.fromEvents(document.body, &#x27;click&#x27;); stream.log() </pre><pre title="console output">&gt; [fromEvents] &lt;value&gt; MouseEvent {y: 474, x: 551 ...} &gt; [fromEvents] &lt;value&gt; MouseEvent {y: 361, x: 751 ...} &gt; [fromEvents] &lt;value&gt; MouseEvent {y: 444, x: 1120 ...} </pre><pre title="events in time">stream: ----•-----------•----•--- MouseEvent MouseEvent MouseEvent</pre><div></div><a name="from-binder"></a><p id="stream"><a class="header" href="#stream">stream</a><code>Kefir.stream(subscribe)</code><br/>Creates a general purpose stream. The <b>subscribe</b> callback is called on each <a href="#active-state">activation</a>, and if a function is returned from <b>subscribe</b>, it will be called on the following <b>deactivation</b>. The <b>subscribe</b> function is called with <a href="#emitter-object">emitter</a> as an argument, which can be used to emit events from the result stream. </p><pre class="javascript" title="example">var stream = Kefir.stream(emitter =&gt; { var count = 0; emitter.emit(count); var intervalId = setInterval(() =&gt; { count++; if (count &lt; 4) { emitter.emit(count); } else { emitter.end(); } }, 1000); return () =&gt; { clearInterval(intervalId); } }); stream.log() </pre><pre title="console output">&gt; [stream] &lt;value:current&gt; 0 &gt; [stream] &lt;value&gt; 1 &gt; [stream] &lt;value&gt; 2 &gt; [stream] &lt;value&gt; 3 &gt; [stream] &lt;end&gt; </pre><pre title="events in time">stream: 0----1----2----3----X</pre><div></div><h2 id="create-property">Create a property</h2><p id="constant"><a class="header" href="#constant">constant</a><code>Kefir.constant(value)</code><br/>Creates an ended property, with the specified <b>current value</b>. </p><pre class="javascript" title="example">var property = Kefir.constant(1); property.log(); </pre><pre title="console output">&gt; [constant] &lt;value:current&gt; 1 &gt; [constant] &lt;end:current&gt; </pre><pre title="events in time">property: 1X</pre><div></div><p id="constant-error"><a class="header" href="#constant-error">constantError</a><code>Kefir.constantError(error)</code><br/>Creates an ended property, with the specified <b>current error</b>. </p><pre class="javascript" title="example">var property = Kefir.constantError(1); property.log(); </pre><pre title="console output">&gt; [constantError] &lt;error:current&gt; 1 &gt; [constantError] &lt;end:current&gt; </pre><pre title="events in time">property: eX</pre><div></div><h2 id="convert">Convert observables</h2><p id="to-property"><a class="header" href="#to-property">toProperty</a><code>stream.toProperty([getCurrent])</code><br/>Converts a stream to a property. Accepts an optional <b>getCurrent</b> callback, which will be called on each <a href="#active-state">activation</a> to get the current value at that moment. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.toProperty(() =&gt; 0); result.log(); </pre><pre title="console output">&gt; [sequentially.toProperty] &lt;value:current&gt; 0 &gt; [sequentially.toProperty] &lt;value&gt; 1 &gt; [sequentially.toProperty] &lt;value&gt; 2 &gt; [sequentially.toProperty] &lt;value&gt; 3 &gt; [sequentially.toProperty] &lt;end&gt; </pre><pre title="events in time">source: ----1----2----3X result: 0----1----2----3X</pre><div></div><p id="changes"><a class="header" href="#changes">changes</a><code>property.changes()</code><br/>Converts a property to a stream. If the property has a current value (or error), it will be ignored (subscribers of the stream won't get it). </p><p>If you call <b>changes</b> on a stream, it will return a new stream with <a href="#current-in-streams">current values/errors</a> removed. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var property = source.toProperty(() =&gt; 0); var result = property.changes(); result.log(); </pre><pre title="console output">&gt; [sequentially.toProperty.changes] &lt;value&gt; 1 &gt; [sequentially.toProperty.changes] &lt;value&gt; 2 &gt; [sequentially.toProperty.changes] &lt;value&gt; 3 &gt; [sequentially.toProperty.changes] &lt;end&gt; </pre><pre title="events in time">property: 0----1----2----3X result: ----1----2----3X</pre><div></div><h2 id="main-methods">Subscribe / add side effects</h2><p id="observe"><a class="header" href="#observe">observe</a><code>obs.observe(observer) obs.observe([onValue], [onError], [onEnd])</code><br/>Subscribes the provided <b>observer</b> to <b>obs</b>. Observer is an object with 3 optional methods: </p><ul><li><b>value</b> - called on values from the observable</li><li><b>error</b> - called on errors from the observable</li><li><b>end</b> - called on end of the observable</li></ul><p>Returns a <tt>Subscription</tt> object, which has an <tt>unsubscribe</tt> method and a read-only <tt>closed</tt> property. <tt>closed</tt> indicates whether the <tt>unsubscribe</tt> method has been called or the observable have ended. </p><pre class="javascript" title="example">var stream = Kefir.sequentially(1000, [1, 2]); var subscription = stream.observe({ value(value) { console.log(&#x27;value:&#x27;, value); }, error(error) { console.log(&#x27;error:&#x27;, error); }, end() { console.log(&#x27;end&#x27;); }, }); ... // later subscription.unsubscribe() </pre><pre title="console output">&gt; value: 1 &gt; value: 2 &gt; end </pre><p>In addition to passing in an Observer, <tt>observe</tt> can takes callbacks individually: </p><pre class="javascript" title="example">var stream = Kefir.sequentially(1000, [1, 2]); stream.observe( function onValue(value) { console.log(&#x27;value&#x27;, value); }, function onError(error) { console.log(&#x27;error&#x27;, error); }, function onEnd() { console.log(&#x27;end&#x27;); } ); </pre><p><img data-emoji="point_up"> This methods is designed to replace all other methods for subscribing (onValue, offValue, onError, etc). It's recomented to use <b>observe</b> instead of other methods, they will be removed eventually. </p><p id="on-value"><a class="header" href="#on-value">onValue</a><code>obs.onValue(callback)</code><br/>Subscribes <b>callback</b> to values on an observable. </p><p>If called on a property, which has a current value, <b>callback</b> will be called immediately (synchronously) with that value. </p><pre class="javascript" title="example">var stream = Kefir.sequentially(1000, [1, 2]); stream.onValue(x =&gt; { console.log(&#x27;value:&#x27;, x); }); </pre><pre title="console output">&gt; value: 1 &gt; value: 2 </pre><p id="off-value"><a class="header" href="#off-value">offValue</a><code>obs.offValue(callback)</code><br/>Unsubscribes <b>callback</b> from values on an observable. </p><p id="on-error"><a class="header" href="#on-error">onError</a><code>obs.onError(callback)</code><br/>Subscribes <b>callback</b> to <a href="#about-errors">errors</a> on an observable. </p><p>If called on a property, which has a current error, <b>callback</b> will be called immediately (synchronously) with that error. </p><pre class="javascript" title="example">var property = Kefir.constantError(1); property.onError(x =&gt; { console.log(&#x27;error:&#x27;, x); }); </pre><pre title="console output">&gt; error: 1 </pre><p id="off-error"><a class="header" href="#off-error">offError</a><code>obs.offError(callback)</code><br/>Unsubscribes <b>callback</b> from errors on an observable. </p><p id="on-end"><a class="header" href="#on-end">onEnd</a><code>obs.onEnd(callback)</code><br/>Subscribes <b>callback</b> to ending of an observable. </p><p>If observable is already ended, <b>callback</b> will be called immediately (synchronously). </p><pre class="javascript" title="example">var stream = Kefir.sequentially(1000, [1, 2]); stream.onEnd(() =&gt; { console.log(&#x27;stream ended&#x27;); }); </pre><pre title="console output">&gt; stream ended </pre><p id="off-end"><a class="header" href="#off-end">offEnd</a><code>obs.offEnd(callback)</code><br/>Unsubscribes <b>callback</b> from ending of an observable. </p><p id="on-any"><a class="header" href="#on-any">onAny</a><code>obs.onAny(callback)</code><br/>Subscribes <b>callback</b> to all three types of events. Callback is called with an <b>event object</b> as argument. Each event object contains three attributes — <b>type</b>, <b>value</b>, and <b>current</b>. </p><ul><li><b>type</b> — a <tt>'value'</tt>, <tt>'error'</tt>, or <tt>'end'</tt> string</li><li><b>value</b> — the emitted <b>value</b> or <b>error</b> (<tt>undefined</tt> if <tt>event.type === 'end'</tt>)</li></ul><pre class="javascript" title="example">var stream = Kefir.sequentially(1000, [1, 2]); stream.onAny(event =&gt; { console.log(&#x27;event:&#x27;, event); }); </pre><pre title="console output">&gt; event: Object {type: &quot;value&quot;, value: 1} &gt; event: Object {type: &quot;error&quot;, value: 2} &gt; event: Object {type: &quot;end&quot;, value: undefined} </pre><p id="off-any"><a class="header" href="#off-any">offAny</a><code>obs.offAny(callback)</code><br/>Unsubscribes an <b>onAny</b> subscriber. </p><p id="log"><a class="header" href="#log">log</a><code>obs.log([name])</code><br/>Turns on logging of any event to the browser console. Accepts an optional <b>name</b> argument that will be shown in the log if provided. </p><pre class="javascript" title="example">var stream = Kefir.sequentially(1000, [1, 2]); stream.log(&#x27;my stream&#x27;); </pre><pre title="console output">&gt; my stream &lt;value&gt; 1 &gt; my stream &lt;value&gt; 2 &gt; my stream &lt;end&gt; </pre><p id="off-log"><a class="header" href="#off-log">offLog</a><code>obs.offLog([name])</code><br/>Turns off logging. If <b>.log</b> was called with a <b>name</b> argument, <b>offLog</b> must be called with the same <b>name</b> argument. </p><p id="spy"><a class="header" href="#spy">spy</a><code>obs.spy([name])</code><br/>Turns on spying of any event to the browser console. Similar to <b>.log</b>, however <b>.spy</b> will not cause the stream to activate by itself. Accepts an optional <b>name</b> argument that will be shown in the log if provided. </p><pre class="javascript" title="example">var stream = Kefir.sequentially(250, [1, 2, 3]); stream.spy(&#x27;spied&#x27;); // stream is *not* activated here. stream.observe(() =&gt; {}); </pre><pre title="console output">&gt; spied &lt;value&gt; 1 &gt; spied &lt;value&gt; 2 &gt; spied &lt;value&gt; 3 &gt; spied &lt;end&gt; </pre><p id="off-spy"><a class="header" href="#off-spy">offSpy</a><code>obs.offSpy([name])</code><br/>Turns off spying. If <b>.spy</b> was called with a <b>name</b> argument, <b>offSpy</b> must be called with the same <b>name</b> argument.</p><h2 id="modify">Modify an observable</h2><p>Most methods in this section create a new observable of same type* from an original one. The new observable applies some transformation to each event from the original one and emits the result of the transformation. In most cases a transformation is applied only to <b>value</b> events, <b>end</b> and <b>error</b> events just passes through untouched. </p><p><b>*</b> For example if the original observable was a stream, then the new one will also be a stream. Same for properties. </p><p id="map"><a class="header" href="#map">map</a><code>obs.map(fn)</code><br/>Applies the given <b>fn</b> function to each value from the original observable and emits the value returned by <b>fn</b>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.map(x =&gt; x + 1); result.log(); </pre><pre title="console output">&gt; [sequentially.map] &lt;value&gt; 2 &gt; [sequentially.map] &lt;value&gt; 3 &gt; [sequentially.map] &lt;value&gt; 4 &gt; [sequentially.map] &lt;end&gt;</pre><div></div><pre title="events in time">source: ---1---2---3X result: ---2---3---4X</pre><div></div><p id="map-errors"><a class="header" href="#map-errors">mapErrors</a><code>obs.mapErrors(fn)</code><br/>Same as <a href="#map">map</a> but for <a href="#about-errors">errors</a>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]).flatMap(Kefir.constantError); var result = source.mapErrors(x =&gt; x * 2); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMap.mapErrors] &lt;error&gt; 2 &gt; [sequentially.flatMap.mapErrors] &lt;error&gt; 4 &gt; [sequentially.flatMap.mapErrors] &lt;error&gt; 6 &gt; [sequentially.flatMap.mapErrors] &lt;end&gt; </pre><pre title="events in time">source: ---e---e---e---eX 0 1 2 3 result: ---e---e---e---eX 0 2 4 6</pre><div></div><p id="filter"><a class="header" href="#filter">filter</a><code>obs.filter([predicate])</code><br/>Filters values from the original observable using the given <b>predicate</b> function. </p><p>If no <b>predicate</b> is provided, the function <tt>x =&gt; x</tt> will be used. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.filter(x =&gt; x &gt; 1); result.log(); </pre><pre title="console output">&gt; [sequentially.filter] &lt;value&gt; 2 &gt; [sequentially.filter] &lt;value&gt; 3 &gt; [sequentially.filter] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3X result: -------2---3X</pre><div></div><p>See also <a href="#filter-by">filterBy</a>. </p><p id="filter-errors"><a class="header" href="#filter-errors">filterErrors</a><code>obs.filterErrors([predicate])</code><br/>Same as <a href="#filter">filter</a> but for <a href="#about-errors">errors</a>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [0, 1, 2, 3]).flatMap(Kefir.constantError); var result = source.filterErrors(x =&gt; (x % 2) === 0); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMap.filterErrors] &lt;error&gt; 0 &gt; [sequentially.flatMap.filterErrors] &lt;error&gt; 2 &gt; [sequentially.flatMap.filterErrors] &lt;end&gt; </pre><pre title="events in time">source: ---e---e---e---eX 0 1 2 3 result: ---e-------e----X 0 2</pre><div></div><p id="take"><a class="header" href="#take">take</a><code>obs.take(n)</code><br/>Emits the first <b>n</b> values from the original observable, then ends. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.take(2); result.log(); </pre><pre title="console output">&gt; [sequentially.take] &lt;value&gt; 1 &gt; [sequentially.take] &lt;value&gt; 2 &gt; [sequentially.take] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3X result: ---1---2X</pre><div></div><p id="take-errors"><a class="header" href="#take-errors">takeErrors</a><code>obs.takeErrors(n)</code><br/>Emits the first <b>n</b> <a href="#about-errors">errors</a> from the original observable, then ends. Values just flow through. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]) .flatMap(x =&gt; Kefir.constantError(x)); var result = source.takeErrors(2); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMap.takeErrors] &lt;error&gt; 1 &gt; [sequentially.flatMap.takeErrors] &lt;error&gt; 2 &gt; [sequentially.flatMap.takeErrors] &lt;end&gt; </pre><pre title="events in time">source: ---e---e---eX 1 2 3 result: ---e---eX 1 2</pre><div></div><p id="take-while"><a class="header" href="#take-while">takeWhile</a><code>obs.takeWhile([predicate])</code><br/>Emits values from the original observable until the given <b>predicate</b> function applied to a value returns false. Ends when the <b>predicate</b> returns false. </p><p>If no <b>predicate</b> is provided, the function <tt>x =&gt; x</tt> will be used. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.takeWhile(x =&gt; x &lt; 3); result.log(); </pre><pre title="console output">&gt; [sequentially.takeWhile] &lt;value&gt; 1 &gt; [sequentially.takeWhile] &lt;value&gt; 2 &gt; [sequentially.takeWhile] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3X result: ---1---2---X</pre><div></div><p>See also <a href="#take-while-by">takeWhileBy</a>. </p><p id="last"><a class="header" href="#last">last</a><code>obs.last()</code><br/>Emits only the last value from the original observable. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.last(); result.log(); </pre><pre title="console output">&gt; [sequentially.last] &lt;value&gt; 3 &gt; [sequentially.last] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3X result: -----------3X</pre><div></div><p id="skip"><a class="header" href="#skip">skip</a><code>obs.skip(n)</code><br/>Skips the first <b>n</b> values from the original observable, then emits all the rest. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.skip(2); result.log(); </pre><pre title="console output">&gt; [sequentially.skip] &lt;value&gt; 3 &gt; [sequentially.skip] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3X result: -----------3X</pre><div></div><p id="skip-while"><a class="header" href="#skip-while">skipWhile</a><code>obs.skipWhile([predicate])</code><br/>Skips values from the original observable until the given <b>predicate</b> function applied to a value returns false, then stops applying the <b>predicate</b> to values and emits all of them. </p><p>If no <b>predicate</b> is provided, the function <tt>x =&gt; x</tt> will be used. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 3, 2]); var result = source.skipWhile(x =&gt; x &lt; 3); result.log(); </pre><pre title="console output">&gt; [sequentially.skipWhile] &lt;value&gt; 3 &gt; [sequentially.skipWhile] &lt;value&gt; 2 &gt; [sequentially.skipWhile] &lt;end&gt; </pre><pre title="events in time">source: ---1---3---2X result: -------3---2X</pre><div></div><p id="skip-duplicates"><a class="header" href="#skip-duplicates">skipDuplicates</a><code>obs.skipDuplicates([comparator])</code><br/>Skips duplicate values using <tt>===</tt> for comparison. Accepts an optional <b>comparator</b> function which is then used instead of <tt>===</tt>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 2, 3, 1]); var result = source.skipDuplicates(); result.log(); </pre><pre title="console output">&gt; [sequentially.skipDuplicates] &lt;value&gt; 1 &gt; [sequentially.skipDuplicates] &lt;value&gt; 2 &gt; [sequentially.skipDuplicates] &lt;value&gt; 3 &gt; [sequentially.skipDuplicates] &lt;value&gt; 1 &gt; [sequentially.skipDuplicates] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---2---3---1X result: ---1---2-------3---1X</pre><div></div><p>With custom <b>comparator</b> function:</p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 2.1, 3, 1]); var result = source.skipDuplicates( (a, b) =&gt; Math.round(a) === Math.round(b) ); result.log(); </pre><pre title="console output">&gt; [sequentially.skipDuplicates] &lt;value&gt; 1 &gt; [sequentially.skipDuplicates] &lt;value&gt; 2 &gt; [sequentially.skipDuplicates] &lt;value&gt; 3 &gt; [sequentially.skipDuplicates] &lt;value&gt; 1 &gt; [sequentially.skipDuplicates] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---•---3---1X 2.1 result: ---1---2-------3---1X</pre><div></div><p id="diff"><a class="header" href="#diff">diff</a><code>obs.diff([fn], [seed])</code><br/>On each value from the original observable, calls the <b>fn</b> function with the previous and current values as arguments. At first time, calls <b>fn</b> with <b>seed</b> and current value. Emits whatever <b>fn</b> returns. </p><p>If no <b>seed</b> is provided, the first value will be used as a seed, and the result observable won't emit on first value. </p><p>If no <b>fn</b> function is provided, <tt>(a, b) =&gt; [a, b</tt>] will be used. If you want to omit <b>fn</b> but provide <b>seed</b>, pass <tt>null</tt> as <b>fn</b>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 2, 3]); var result = source.diff((prev, next) =&gt; next - prev, 0); result.log(); </pre><pre title="console output">&gt; [sequentially.diff] &lt;value&gt; 1 &gt; [sequentially.diff] &lt;value&gt; 1 &gt; [sequentially.diff] &lt;value&gt; 0 &gt; [sequentially.diff] &lt;value&gt; 1 &gt; [sequentially.diff] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---2---3X result: ---1---1---0---1X</pre><div></div><p id="scan"><a class="header" href="#scan">scan</a><code>obs.scan(fn, [seed])</code><br/>On each value from the original observable, calls the <b>fn</b> function with the previous result returned by <b>fn</b> and the current value emitted by the original observable. At first time, calls <b>fn</b> with <b>seed</b> as previous result. Emits whatever <b>fn</b> returns. Always creates a property. </p><p>If no <b>seed</b> is provided, the first value will be used as a seed. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 2, 3]); var result = source.scan((prev, next) =&gt; next + prev, 0); result.log(); </pre><pre title="console output">&gt; [sequentially.scan] &lt;value:current&gt; 0 &gt; [sequentially.scan] &lt;value&gt; 1 &gt; [sequentially.scan] &lt;value&gt; 3 &gt; [sequentially.scan] &lt;value&gt; 5 &gt; [sequentially.scan] &lt;value&gt; 8 &gt; [sequentially.scan] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---2---3X result: 0---1---3---5---8X</pre><div></div><p id="flatten"><a class="header" href="#flatten">flatten</a><code>obs.flatten([transformer])</code><br/>For this method it's expected that the source observable emits arrays. The result stream will then emit each element of these arrays. </p><p>Always returns a stream. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [[1], [], [2,3]]); var result = source.flatten(); result.log(); </pre><pre title="console output">&gt; [sequentially.flatten] &lt;value&gt; 1 &gt; [sequentially.flatten] &lt;value&gt; 2 &gt; [sequentially.flatten] &lt;value&gt; 3 &gt; [sequentially.flatten] &lt;end&gt; </pre><pre title="events in time">source: --------•--------•-------- •X [1] [] [2,3] result: --------1-----------------23X</pre><div></div><p>You can also provide the <b>transformer</b> function which will be applied to each value from <b>obs</b> observable, and which is supposed to return an array. This makes <b>flatten</b> a pretty powerful transformation method. It allows you to do three kinds of transformations on each value: change value (like map), skip value (like filter), and respond with several values to a single value. If you want to skip a value, return an empty array, to change the value — return an array with a single new value, to emit several values — return them in an array. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3, 4]); var result = source.flatten(x =&gt; { if (x % 2 === 0) { return [x * 10]; } else { return []; } }); result.log(); </pre><pre title="console output">&gt; [sequentially.flatten] &lt;value&gt; 20 &gt; [sequentially.flatten] &lt;value&gt; 40 &gt; [sequentially.flatten] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3---4X result: -------•-------•X 20 40</pre><div></div><p>See also <a href="#flat-map">flatMap</a> </p><p id="delay"><a class="header" href="#delay">delay</a><code>obs.delay(wait)</code><br/>Delays all events by <b>wait</b> milliseconds, with an exception for the current value of a property, or the end of an already ended observable. Doesn't delay <a href="#about-errors">errors</a>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(200, [1, 2, 3]); var result = source.delay(100); result.log(); </pre><pre title="console output">&gt; [sequentially.delay] &lt;value&gt; 1 &gt; [sequentially.delay] &lt;value&gt; 2 &gt; [sequentially.delay] &lt;value&gt; 3 &gt; [sequentially.delay] &lt;end&gt; </pre><pre title="events in time">source: -----1-----2-----3X result: --------1-----2-----3X</pre><div></div><p id="throttle"><a class="header" href="#throttle">throttle</a><code>obs.throttle(wait, [options])</code><br/>Return a new throttled version of the original observable, which will emit values only at most once every <b>wait</b> milliseconds. If used on a property, the current value will always pass without any delay. </p><p>Accepts an optional <b>options</b> object similar to <a href="http://underscorejs.org/#throttle" target="_blank">underscore.throttle</a>. By default, it will emit an event as soon as it comes for the first time, and, if any new event comes during the wait period, it will emit the last of them as soon as that period is over. If you'd like to disable the leading-edge emit, pass <tt>{leading: false}</tt>. And if you'd like to disable the emit on the trailing-edge, pass <tt>{trailing: false}</tt>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(750, [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]); var result = source.throttle(2500); result.log(); </pre><pre title="console output">&gt; [sequentially.throttle] &lt;value&gt; 1 &gt; [sequentially.throttle] &lt;value&gt; 4 &gt; [sequentially.throttle] &lt;value&gt; 7 &gt; [sequentially.throttle] &lt;value&gt; 0 &gt; [sequentially.throttle] &lt;end&gt;</pre><div></div><pre title="events in time">source: --1--2--3--4--5--6--7--8--9--0X result: --1---------4---------7---------0X</pre><div></div><p id="debounce"><a class="header" href="#debounce">debounce</a><code>obs.debounce(wait, [options])</code><br/>Creates a new debounced version of the original observable. Will emit a value only after <b>wait</b> milliseconds period of no events. Pass <tt>{immediate: true}</tt> as an <b>options</b> object to cause observable to emit a value on the leading instead of the trailing edge of the <b>wait</b> interval. If used on a property, the current value will always pass without any delay. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3, 0, 0, 0, 4, 5, 6]); source = source.filter(x =&gt; x &gt; 0); var result = source.debounce(250); result.log(); </pre><pre title="console output">&gt; [sequentially.filter.debounce] &lt;value&gt; 3 &gt; [sequentially.filter.debounce] &lt;value&gt; 6 &gt; [sequentially.filter.debounce] &lt;end&gt;</pre><div></div><pre title="events in time">source: ---1---2---3---------------4---5---6X result: ----------------------3---------------------6X</pre><div></div><p id="ignore-values"><a class="header" href="#ignore-values">ignoreValues</a><code>obs.ignoreValues()</code><br/>Ignores all values from the original observable, emitting only errors and end. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [0, -1, 2, -3]) .flatMap(x =&gt; x &lt; 0 ? Kefir.constantError(x) : Kefir.constant(x)); var result = source.ignoreValues() result.log(); </pre><pre title="console output">&gt; [sequentially.flatMap.ignoreValues] &lt;error&gt; -1 &gt; [sequentially.flatMap.ignoreValues] &lt;error&gt; -3 &gt; [sequentially.flatMap.ignoreValues] &lt;end&gt; </pre><pre title="events in time">source: ---•---e---•---eX 0 -1 2 -3 result: -------e-------eX -1 -3</pre><div></div><p id="ignore-errors"><a class="header" href="#ignore-errors">ignoreErrors</a><code>obs.ignoreErrors()</code><br/>Ignores all errors from the original observable, emitting only values and end. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [0, -1, 2, -3]) .flatMap(x =&gt; x &lt; 0 ? Kefir.constantError(x) : Kefir.constant(x)); var result = source.ignoreErrors() result.log(); </pre><pre title="console output">&gt; [sequentially.flatMap.ignoreErrors] &lt;value&gt; 0 &gt; [sequentially.flatMap.ignoreErrors] &lt;value&gt; 2 &gt; [sequentially.flatMap.ignoreErrors] &lt;end&gt; </pre><pre title="events in time">source: ---•---e---•---eX 0 -1 2 -3 result: ---•-------•----X 0 2</pre><div></div><p id="ignore-end"><a class="header" href="#ignore-end">ignoreEnd</a><code>obs.ignoreEnd()</code><br/>Ignores end of source observable. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.ignoreEnd(); result.log(); </pre><pre title="console output">&gt; [sequentially.ignoreEnd] &lt;value&gt; 1 &gt; [sequentially.ignoreEnd] &lt;value&gt; 2 &gt; [sequentially.ignoreEnd] &lt;value&gt; 3 </pre><pre title="events in time">source: ---1---2---3X result: ---1---2---3---</pre><div></div><p id="before-end"><a class="header" href="#before-end">beforeEnd</a><code>obs.beforeEnd(fn)</code><br/>Allows you to insert an additional value just before the observable ends. <b>fn</b> will be called on <b>obs</b>' end with no arguments, and whatever it return will be emitted in the result stream before end. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.beforeEnd(() =&gt; 0); result.log(); </pre><pre title="console output">&gt; [sequentially.beforeEnd] &lt;value&gt; 1 &gt; [sequentially.beforeEnd] &lt;value&gt; 2 &gt; [sequentially.beforeEnd] &lt;value&gt; 3 &gt; [sequentially.beforeEnd] &lt;value&gt; 0 &gt; [sequentially.beforeEnd] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3 X result: ---1---3---30X</pre><div></div><p id="sliding-window"><a class="header" href="#sliding-window">slidingWindow</a><code>obs.slidingWindow(max, [min])</code><br/>Will emit arrays containing the last <b>n</b> values from the <b>obs</b> observable, where <b>n</b> is between <b>max</b> and <b>min</b> arguments. By default <b>min</b> equals <tt>0</tt>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3, 4, 5]); var result = source.slidingWindow(3, 2) result.log(); </pre><pre title="console output">&gt; [sequentially.slidingWindow] &lt;value&gt; [1, 2] &gt; [sequentially.slidingWindow] &lt;value&gt; [1, 2, 3] &gt; [sequentially.slidingWindow] &lt;value&gt; [2, 3, 4] &gt; [sequentially.slidingWindow] &lt;value&gt; [3, 4, 5] &gt; [sequentially.slidingWindow] &lt;end&gt; </pre><pre title="events in time">source: --------1--------2--------3--------4--------5X result: -----------------•--------•--------•--------•X [1,2] [1,2,3] [2,3,4] [3,4,5]</pre><div></div><p id="buffer-while"><a class="header" href="#buffer-while">bufferWhile</a><code>obs.bufferWhile([predicate], [options])</code><br/>Passes every value from the source observable to the <b>predicate</b> function. If it returns <tt>true</tt>, adds the value to the buffer, otherwise flushes the buffer. Also flushes the buffer before end, but you can disable that by passing <tt>{flushOnEnd: false}</tt> as <b>options</b>. </p><p>The default <b>predicate</b> is <tt>x =&gt; x</tt>. If you want to omit <b>predicate</b> but pass <b>options</b>, pass <tt>null</tt> as <b>predicate</b>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3, 4, 5]); var result = source.bufferWhile(x =&gt; x !== 3); result.log(); </pre><pre title="console output">&gt; [sequentially.bufferWhile] &lt;value&gt; [1, 2, 3] &gt; [sequentially.bufferWhile] &lt;value&gt; [4, 5] &gt; [sequentially.bufferWhile] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3---4---5 X result: -----------•--------•X [1,2,3] [4,5]</pre><div></div><p id="buffer-with-count"><a class="header" href="#buffer-with-count">bufferWithCount</a><code>obs.bufferWithCount(count, [options])</code><br/>Buffers all values from <b>obs</b> observable, and flushes the buffer every time <b>count</b> values have been passed through. Also flushes the buffer before end, but you can disable that by passing <tt>{flushOnEnd: false}</tt> as <b>options</b>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3, 4, 5]); var result = source.bufferWithCount(2); result.log(); </pre><pre title="console output">&gt; [sequentially.bufferWithCount] &lt;value&gt; [1, 2] &gt; [sequentially.bufferWithCount] &lt;value&gt; [3, 4] &gt; [sequentially.bufferWithCount] &lt;value&gt; [5] &gt; [sequentially.bufferWithCount] &lt;end&gt; </pre><pre title="events in time">source: --------1--------2--------3--------4--------5 X result: -----------------•-----------------•---------•X [1,2] [3,4] [5]X</pre><div></div><p id="buffer-with-time-or-count"><a class="header" href="#buffer-with-time-or-count">bufferWithTimeOrCount</a><code>obs.bufferWithTimeOrCount(interval, count, [options])</code><br/>Continuously buffers values from the source observable, flushing every <b>interval</b> milliseconds, or immediately once <b>count</b> values have been stored. Also flushes the buffer before end, but you can disable that by passing <tt>{flushOnEnd: false}</tt> as <b>options</b>. </p><p>Limited by time: </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]); var result = source.bufferWithTimeOrCount(330, 10); result.log(); </pre><pre title="console output">&gt; [sequentially.bufferWithTimeOrCount] &lt;value&gt; [1, 2, 3] &gt; [sequentially.bufferWithTimeOrCount] &lt;value&gt; [4, 5, 6] &gt; [sequentially.bufferWithTimeOrCount] &lt;value&gt; [7, 8] &gt; [sequentially.bufferWithTimeOrCount] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3---4---5---6---7---8 X result: ------------•------------•------•X [1,2,3] [4,5,6] [7,8] </pre><p>Limited by count: </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]); var result = source.bufferWithTimeOrCount(330, 2); result.log(); </pre><pre title="console output">&gt; [sequentially.bufferWithTimeOrCount] &lt;value&gt; [1, 2] &gt; [sequentially.bufferWithTimeOrCount] &lt;value&gt; [3, 4] &gt; [sequentially.bufferWithTimeOrCount] &lt;value&gt; [5, 6] &gt; [sequentially.bufferWithTimeOrCount] &lt;value&gt; [7, 8] &gt; [sequentially.bufferWithTimeOrCount] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3---4---5---6---7---8X result: -------•-------•-------•-------•X [1,2] [3,4] [5,6] [7,8]</pre><div></div><p id="transduce"><a class="header" href="#transduce">transduce</a><code>obs.transduce(transducer)</code><br/>This method allows you to use transducers in Kefir. It supports any transducers implementation that follows <a href="https://github.com/cognitect-labs/transducers-js#the-transducer-protocol">the transducer protocol</a>, for example <a href="https://github.com/cognitect-labs/transducers-js">cognitect-labs/transducers-js</a> or <a href="https://github.com/jlongster/transducers.js">jlongster/transducers.js</a>. To learn more about transducers please visit these library pages. </p><p>In the example the <a href="https://github.com/cognitect-labs/transducers-js">cognitect-labs/transducers-js</a> library is used. </p><pre class="javascript" title="example">var t = transducers; var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6]); var myTransducer = t.comp( t.map(x =&gt; x + 10), t.filter(x =&gt; x % 2 === 0), t.take(2) ); var result = source.transduce(myTransducer); result.log(); </pre><pre title="console output">&gt; [sequentially.transduce] &lt;value&gt; 12 &gt; [sequentially.transduce] &lt;value&gt; 14 &gt; [sequentially.transduce] &lt;end&gt; </pre><pre title="events in time">source: ---1---2---3---4---5---6X result: -------•-------•X 12 14</pre><div></div><p id="thru"><a class="header" href="#thru">thru</a><code>obs.thru(transformer)</code><br/>Calls <b>transformer</b> with <b>obs</b> as argument and returns whatever <b>transformer</b> returned. Put another way, <tt>o.thru(fn)</tt> is the same as <tt>fn(o)</tt>. This allows you to integrate your helper functions into chains of Kefir method calls without adding them to Observable prototype. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6]); var transformer = obs =&gt; obs.filter(x =&gt; x &gt; 2); var result = source.thru(transformer).map(x =&gt; x - 1); // is the same as transformer(source).map(...) result.log();</pre><div></div><pre title="events in time">source: ---1---2---3---4---5---6X result: -----------2---3---4---5X</pre><div></div><pre title="console output">&gt; [sequentially.filter.map] &lt;value&gt; 2 &gt; [sequentially.filter.map] &lt;value&gt; 3 &gt; [sequentially.filter.map] &lt;value&gt; 4 &gt; [sequentially.filter.map] &lt;value&gt; 5 &gt; [sequentially.filter.map] &lt;end&gt;</pre><div></div><p id="with-handler"><a class="header" href="#with-handler">withHandler</a><code>obs.withHandler(handler)</code><br/>The most general transformation method. All other transformation methods above can be implemented via <b>withHandler</b>. Will call the <b>handler</b> function on each event from <b>obs</b> observable, passing to it two arguments: an <a href="#emitter-object">emitter</a>, and an event object (with same format as in <a href="#on-any">onAny</a> callback). </p><p>By default, it will not emit any values or errors, and it will not end when <b>obs</b> observable ends. Instead you should implement the desired behaviour in the <b>handler</b> function, i.e. analyse <b>event object</b> and call <b>emitter</b> methods if necessary. You can call the <b>emitter</b> methods several times in each <b>handler</b> execution, and you can also call them any time later, for example to implement <a href="#delay">delay</a>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [0, 1, 2, 3]); var result = source.withHandler((emitter, event) =&gt; { if (event.type === &#x27;end&#x27;) { emitter.emit(&#x27;bye&#x27;); emitter.end(); } if (event.type === &#x27;value&#x27;) { for (var i = 0; i &lt; event.value; i++) { emitter.emit(event.value); } } }); result.log(); </pre><pre title="console output">&gt; [sequentially.withHandler] &lt;value&gt; 1 &gt; [sequentially.withHandler] &lt;value&gt; 2 &gt; [sequentially.withHandler] &lt;value&gt; 2 &gt; [sequentially.withHandler] &lt;value&gt; 3 &gt; [sequentially.withHandler] &lt;value&gt; 3 &gt; [sequentially.withHandler] &lt;value&gt; 3 &gt; [sequentially.withHandler] &lt;value&gt; bye &gt; [sequentially.withHandler] &lt;end&gt;</pre><div></div><pre title="events in time">source: ---0---1--- 2--- 3 X result: -------•---••---••••X 1 22 333bye</pre><div></div><h2 id="combine-observables">Combine observables</h2><p id="combine"><a class="header" href="#combine">combine</a><code>Kefir.combine(obss, [passiveObss], [combinator])</code><code class="alias">obs.combine(otherObs, [combinator])</code>Returns a stream. Combines two or more observables together. On each value from any source observable (<b>obss</b> array), emits a combined value, generated by the <b>combinator</b> function from the latest values from each source observable. The <b>combinator</b> function is called with the latest values as arguments. If no <b>combinator</b> is provided, it emits an array containing the latest values. </p><pre class="javascript" title="example">var a = Kefir.sequentially(100, [1, 3]); var b = Kefir.sequentially(100, [2, 4]).delay(40); var result = Kefir.combine([a, b], (a, b) =&gt; a + b); result.log(); </pre><pre title="console output">&gt; [combine] &lt;value&gt; 3 &gt; [combine] &lt;value&gt; 5 &gt; [combine] &lt;value&gt; 7 &gt; [combine] &lt;end&gt; </pre><pre title="events in time">a: ----1----3X b: ------2----4X result: ------3--5-7X</pre><div></div><p>You can also pass part of the source observables as <b>passiveObss</b> in a second array, the result stream won't emit on values from <b>passiveObss</b>, but all the values will be available in the combinator function. </p><pre class="javascript" title="example">var a = Kefir.sequentially(100, [1, 3]); var b = Kefir.sequentially(100, [2, 4]).delay(40); var c = Kefir.sequentially(60, [5, 6, 7]); var result = Kefir.combine([a, b], [c], (a, b, c) =&gt; a + b + c); result.log(); </pre><pre title="console output">&gt; [combine] &lt;value&gt; 9 &gt; [combine] &lt;value&gt; 12 &gt; [combine] &lt;value&gt; 14 &gt; [combine] &lt;end&gt; </pre><pre title="events in time">a: ----1----3X b: ------2----4X c: --5--6--7X result: ------•--•-•X 9 12 14 </pre><div></div><p>Also, <b>combine</b> supports passing objects as both <b>obss</b> <i>and</i> <b>passiveObss</b>. The <b>combinator</b> function will then be called with a single argument, a new object with the latest value from each observable. If no <b>combinator</b> is provided, it emits the object containing latest values. </p><pre class="javascript" title="example">var aStream = Kefir.sequentially(100, [1, 3]); var bStream = Kefir.sequentially(100, [2, 4]).delay(40); var result = Kefir.combine({ a: aStream, b: bStream }); result.log(); </pre><pre title="console output">&gt; [combine] &lt;value&gt; { a: 1, b: 2 } &gt; [combine] &lt;value&gt; { a: 3, b: 2 } &gt; [combine] &lt;value&gt; { a: 3, b: 4 } &gt; [combine] &lt;end&gt; </pre><pre title="events in time">a: ----1----3X b: ------2----4X result: ------•--•-•X </pre><p><img data-emoji="point_up"> If there are duplicate keys in both <b>obss</b> <i>and</i> <b>passiveObss</b>, only the latest values from <b>obss</b> will appear in the combined object for the duplicated keys. </p><p>The result stream emits a value only when it has at least one value from each of source observables. Ends when all the active source observables (<b>obss</b> array) end. </p><p>You can also combine two observables by calling <tt>a.combine(b, combinator)</tt> if you like. </p><p id="zip"><a class="header" href="#zip">zip</a><code>Kefir.zip(sources, [combinator])</code><code class="alias">obs.zip(otherObs, [combinator])</code>Creates a stream with values from <b>sources</b> lined up with each other. For example if you have two sources with values <tt>[1, 2, 3</tt>] and <tt>[4, 5, 6, 7</tt>], the result stream will emit <tt>[1, 4</tt>], <tt>[2, 5</tt>], and <tt>[3, 6</tt>]. The result stream will emit the next value only when it has at least one value from each source. </p><p>You can also provide a <b>combinator</b> function. In this case, instead of emitting an array of values, they will be passed to <b>combinator</b> as arguments, and the returned value will be emitted (same as in <a href="#combine">combine</a>) </p><p>Also in <b>zip</b> you can pass ordinary arrays along with observables in the <b>sources</b>, e.g. <tt>Kefir.zip([obs,&nbsp;[1,&nbsp;2,&nbsp;3</tt>],&nbsp;fn)]. In other words, <b>sources</b> is an array of observables and arrays, or only observables of course. </p><p>The result stream ends when all sources end. </p><pre class="javascript" title="example">var a = Kefir.sequentially(100, [0, 1, 2, 3]); var b = Kefir.sequentially(160, [4, 5, 6]); var c = Kefir.sequentially(100, [8, 9]).delay(260).toProperty(() =&gt; 7); var result = Kefir.zip([a, b, c]); result.log(); </pre><pre title="console output">&gt; [zip] &lt;value&gt; [0, 4, 7] &gt; [zip] &lt;value&gt; [1, 5, 8] &gt; [zip] &lt;value&gt; [2, 6, 9] &gt; [zip] &lt;end&gt; </pre><pre title="events in time">a: ----0----1----2----3X b: -------4-------5-------6X c: 7-----------------8----9X abc: -------•---------•-----•X [0,4,7] [1,5,8] [2,6,9]</pre><div></div><p><img data-emoji="point_up"> This method sometimes is used incorrectly instead of <a href="#combine">combine</a>. Please make sure you understand the difference and are making right choice. </p><p id="merge"><a class="header" href="#merge">merge</a><code>Kefir.merge(obss)</code><code class="alias">obs.merge(otherObs)</code>Merges several <b>obss</b> observables into a single stream i.e., simply repeats values from each source observable. Ends when all <b>obss</b> observables end. </p><p>You can also merge two observables by calling <tt>a.merge(b)</tt>, if you like. </p><pre class="javascript" title="example">var a = Kefir.sequentially(100, [0, 1, 2]); var b = Kefir.sequentially(100, [0, 1, 2]).delay(30); var c = Kefir.sequentially(100, [0, 1, 2]).delay(60); var abc = Kefir.merge([a, b, c]); abc.log(); </pre><pre title="console output">&gt; [merge] &lt;value&gt; 0 &gt; [merge] &lt;value&gt; 0 &gt; [merge] &lt;value&gt; 0 &gt; [merge] &lt;value&gt; 1 &gt; [merge] &lt;value&gt; 1 &gt; [merge] &lt;value&gt; 1 &gt; [merge] &lt;value&gt; 2 &gt; [merge] &lt;value&gt; 2 &gt; [merge] &lt;value&gt; 2 &gt; [merge] &lt;end&gt; </pre><pre title="events in time">a: ----------0---------1---------2X b: ------------0---------1---------2X c: --------------0---------1---------2X abc: ----------0-0-0-----1-1-1-----2-2-2X</pre><div></div><p id="concat"><a class="header" href="#concat">concat</a><code>Kefir.concat(obss)</code><code class="alias">obs.concat(otherObs)</code>Concatenates several <b>obss</b> observables into one stream. Like <a href="#merge">merge</a>, but switches to the next source only after the previous one end. </p><pre class="javascript" title="example">var a = Kefir.sequentially(100, [0, 1, 2]); var b = Kefir.sequentially(100, [3, 4, 5]); var abc = Kefir.concat([a, b]); abc.log(); </pre><pre title="console output">&gt; [concat] &lt;value&gt; 0 &gt; [concat] &lt;value&gt; 1 &gt; [concat] &lt;value&gt; 2 &gt; [concat] &lt;value&gt; 3 &gt; [concat] &lt;value&gt; 4 &gt; [concat] &lt;value&gt; 5 &gt; [concat] &lt;end&gt; </pre><pre title="events in time">a: ---0---1---2X b: ---3---4---5X abc: ---0---1---2---3---4---5X</pre><div></div><p><img data-emoji="point_up"> This method sometimes is used incorrectly instead of <a href="#merge">merge</a>. Please make sure you understand the difference and are making right choice. </p><p id="pool"><a class="header" href="#pool">pool</a><code>Kefir.pool()</code><br/><b>Pool</b> is like <a href="#merge">merge</a> to which you can dynamically add and remove sources. When you create a new <b>pool</b> it has no sources. Then you can add observables to it using the <b>plug</b> method, and remove them using <b>unplug</b>. <b>Pool</b> never ends. </p><pre class="javascript" title="example">var a = Kefir.sequentially(100, [0, 1, 2]); var b = Kefir.sequentially(100, [0, 1, 2]).delay(30); var c = Kefir.sequentially(100, [0, 1, 2]).delay(60); var pool = Kefir.pool(); pool.plug(a); pool.plug(b); pool.plug(c); pool.log(); </pre><pre title="console output">&gt; [pool] &lt;value&gt; 0 &gt; [pool] &lt;value&gt; 0 &gt; [pool] &lt;value&gt; 0 &gt; [pool] &lt;value&gt; 1 &gt; [pool] &lt;value&gt; 1 &gt; [pool] &lt;value&gt; 1 &gt; [pool] &lt;value&gt; 2 &gt; [pool] &lt;value&gt; 2 &gt; [pool] &lt;value&gt; 2 </pre><pre title="events in time">a: ----------0---------1---------2X b: ------------0---------1---------2X c: --------------0---------1---------2X pool: ----------0-0-0-----1-1-1-----2-2-2</pre><div></div><p id="repeat"><a class="header" href="#repeat">repeat</a><code>Kefir.repeat(generator)</code><br/>Calls the <b>generator</b> function which is supposed to return an observable. Emits values and errors from the spawned observable; when it ends, calls <b>generator</b> again to get a new one and so on. </p><p>The <b>generator</b> function is called with one argument — iteration number starting from <tt>0</tt>. If a falsy value is returned from the <b>generator</b>, the stream ends. </p><pre class="javascript" title="example">var result = Kefir.repeat(i =&gt; { if (i &lt; 3) { return Kefir.sequentially(100, [i, i]); } else { return false; } }); result.log(); </pre><pre title="console output">&gt; [repeat] &lt;value&gt; 0 &gt; [repeat] &lt;value&gt; 0 &gt; [repeat] &lt;value&gt; 1 &gt; [repeat] &lt;value&gt; 1 &gt; [repeat] &lt;value&gt; 2 &gt; [repeat] &lt;value&gt; 2 &gt; [repeat] &lt;end&gt; </pre><pre title="events in time">spawned 1: ---0---0X spawned 2: ---1---1X spawned 3: ---2---2X result: ---0---0---1---1---2---2X</pre><div></div><p><img data-emoji="point_up"> Note that with this method it is possible to create an infinite loop. Consider this example: </p><pre class="javascript" title="example">var result = Kefir.repeat(() =&gt; Kefir.constant(1)); // When we subscribe to it (directly or via .log) // we already are in an infinite loop. result.log(); // But if we limit it with .take or something it&#x27;ll work just fine. // So the `result` stream defined like this // may still make sense, depending on how we use it. result.take(10).log(); </pre><p>It is even more dangerous if <b>generator</b> constantly returns an ended observable with no values (e.g. <a href="#never">never</a>). In this case, <tt>.take</tt> won't help, because you'll never get any single value from it, but <b>generator</b> will be called over and over. The only escape path here is to define an escape condition in the <b>generator</b>: </p><pre class="javascript" title="example">var result = Kefir.repeat(i =&gt; { // Defining that a new observable will be spawned at most 10 times if (i &gt;= 10) { return false; } return Kefir.never(); }); </pre><p>So just be careful when using <b>repeat</b>, it's a little dangerous but it is still a great method. </p><p id="flat-map"><a class="header" href="#flat-map">flatMap</a><code>obs.flatMap([transform])</code><br/>Works similar to <a href="#flatten">flatten</a>, but instead of arrays, it handles observables. Like in <b>flatten</b> you can either provide a <b>transform</b> function which will return observables, or you can use the source <b>obs</b> observable that already emits observables. </p><p>Always returns a stream. </p><p><b>flatMap</b> ends when <b>obs</b> and all spawned observables end. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.flatMap(x =&gt; Kefir.interval(40, x).take(4)); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMap] &lt;value&gt; 1 &gt; [sequentially.flatMap] &lt;value&gt; 1 &gt; [sequentially.flatMap] &lt;value&gt; 1 &gt; [sequentially.flatMap] &lt;value&gt; 2 &gt; [sequentially.flatMap] &lt;value&gt; 1 &gt; [sequentially.flatMap] &lt;value&gt; 2 &gt; [sequentially.flatMap] &lt;value&gt; 2 &gt; [sequentially.flatMap] &lt;value&gt; 3 &gt; [sequentially.flatMap] &lt;value&gt; 2 &gt; [sequentially.flatMap] &lt;value&gt; 3 &gt; [sequentially.flatMap] &lt;value&gt; 3 &gt; [sequentially.flatMap] &lt;value&gt; 3 &gt; [sequentially.flatMap] &lt;end&gt; </pre><pre title="events in time">source: ----------1---------2---------3X spawned 1: ---1---1---1---1X spawned 2: ---2---2---2---2X spawned 3: ---3---3---3---3X result: -------------1---1---1-2-1-2---2-3-2-3---3---3X</pre><div></div><p id="flat-map-latest"><a class="header" href="#flat-map-latest">flatMapLatest</a><code>obs.flatMapLatest([fn])</code><br/>Like <b>flatMap</b>, but repeats events only from the latest added observable i.e., switching from one observable to another. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.flatMapLatest(x =&gt; Kefir.interval(40, x).take(4)); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMapLatest] &lt;value&gt; 1 &gt; [sequentially.flatMapLatest] &lt;value&gt; 1 &gt; [sequentially.flatMapLatest] &lt;value&gt; 2 &gt; [sequentially.flatMapLatest] &lt;value&gt; 2 &gt; [sequentially.flatMapLatest] &lt;value&gt; 3 &gt; [sequentially.flatMapLatest] &lt;value&gt; 3 &gt; [sequentially.flatMapLatest] &lt;value&gt; 3 &gt; [sequentially.flatMapLatest] &lt;value&gt; 3 &gt; [sequentially.flatMapLatest] &lt;end&gt; </pre><pre title="events in time">source: ----------1---------2---------3X spawned 1: ---1---1---1---1X spawned 2: ---2---2---2---2X spawned 3: ---3---3---3---3X result: -------------1---1-----2---2-----3---3---3---3X</pre><div></div><p id="flat-map-first"><a class="header" href="#flat-map-first">flatMapFirst</a><code>obs.flatMapFirst([fn])</code><br/>Like <b>flatMap</b>, but adds a new observable only if the previous one ended. Otherwise, it just ignores the new observable. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.flatMapFirst(x =&gt; Kefir.interval(40, x).take(4)); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMapFirst] &lt;value&gt; 1 &gt; [sequentially.flatMapFirst] &lt;value&gt; 1 &gt; [sequentially.flatMapFirst] &lt;value&gt; 1 &gt; [sequentially.flatMapFirst] &lt;value&gt; 1 &gt; [sequentially.flatMapFirst] &lt;value&gt; 3 &gt; [sequentially.flatMapFirst] &lt;value&gt; 3 &gt; [sequentially.flatMapFirst] &lt;value&gt; 3 &gt; [sequentially.flatMapFirst] &lt;value&gt; 3 &gt; [sequentially.flatMapFirst] &lt;end&gt; </pre><pre title="events in time">source: ----------1---------2---------3X spawned 1: ---1---1---1---1X spawned 2: ---2---2---2---2X spawned 3: ---3---3---3---3X result: -------------1---1---1---1-------3---3---3---3X</pre><div></div><p id="flat-map-concat"><a class="header" href="#flat-map-concat">flatMapConcat</a><code>obs.flatMapConcat([fn])</code><br/>Like <a href="#flat-map-first">flatMapFirst</a>, but instead of ignoring new observables (if the previous one is still alive), it adds them to the queue. Then, when the current source ends, it takes the oldest observable from the queue, and switches to it. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.flatMapConcat(x =&gt; Kefir.interval(40, x).take(4)); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMapConcat] &lt;value&gt; 1 &gt; [sequentially.flatMapConcat] &lt;value&gt; 1 &gt; [sequentially.flatMapConcat] &lt;value&gt; 1 &gt; [sequentially.flatMapConcat] &lt;value&gt; 1 &gt; [sequentially.flatMapConcat] &lt;value&gt; 2 &gt; [sequentially.flatMapConcat] &lt;value&gt; 2 &gt; [sequentially.flatMapConcat] &lt;value&gt; 2 &gt; [sequentially.flatMapConcat] &lt;value&gt; 2 &gt; [sequentially.flatMapConcat] &lt;value&gt; 3 &gt; [sequentially.flatMapConcat] &lt;value&gt; 3 &gt; [sequentially.flatMapConcat] &lt;value&gt; 3 &gt; [sequentially.flatMapConcat] &lt;value&gt; 3 &gt; [sequentially.flatMapConcat] &lt;end&gt; </pre><pre title="events in time">source: ----------1---------2---------3X spawned 1: ---1---1---1---1X spawned 2: ---2---2---2---2X spawned 3: ---3---3---3---3X result: -------------1---1---1---1---2---2---2---2---3---3---3---3X</pre><div></div><p id="flat-map-with-concurrency-limit"><a class="header" href="#flat-map-with-concurrency-limit">flatMapConcurLimit</a><code>obs.flatMapConcurLimit([fn], limit)</code><br/>Like <a href="#flat-map-concat">flatMapConcat</a>, but with a configurable number of concurent sources. In other words <b>flatMapConcat</b> is <tt>flatMapConcurLimit(fn, 1)</tt>. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2, 3]); var result = source.flatMapConcurLimit(x =&gt; Kefir.interval(40, x).take(6), 2); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 1 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 1 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 1 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 2 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 1 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 2 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 1 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 2 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 1 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 2 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 3 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 2 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 3 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 2 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 3 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 3 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 3 &gt; [sequentially.flatMapConcurLimit] &lt;value&gt; 3 &gt; [sequentially.flatMapConcurLimit] &lt;end&gt; </pre><pre title="events in time">source: ----------1---------2---------3X spawned 1: ---1---1---1---1---1---1X spawned 2: ---2---2---2---2---2---2X spawned 3: ---3---3---3---3---3---3X result: -------------1---1---1-2-1-2-1-2-1-2-3-2-3-2-3---3---3---3X</pre><div></div><p id="flat-map-errors"><a class="header" href="#flat-map-errors">flatMapErrors</a><code>obs.flatMapErrors([transform])</code><br/>Same as <a href="#flat-map">flatMap</a>, but operates on <a href="#about-errors">errors</a> while <b>values</b> just flow through. </p><pre class="javascript" title="example">var source = Kefir.sequentially(100, [1, 2]).flatMap(Kefir.constantError); var result = source.flatMapErrors(x =&gt; Kefir.interval(40, x).take(2)); result.log(); </pre><pre title="console output">&gt; [sequentially.flatMap.flatMapErrors] &lt;value&gt; 1 &gt; [sequentially.flatMap.flatMapErrors] &lt;value&gt; 1 &gt; [sequentially.flatMap.flatMapErrors] &lt;value&gt; 2 &gt; [sequentially.flatMap.flatMapErrors] &lt;value&gt; 2 &gt; [sequentially.flatMap.flatMapErrors] &lt;end&gt; </pre><pre title="events in time">source: ----------e---------eX 1 2 spawned 1: ---1---1X spawned 2: ---2---2X result: -------------1---1-----2---2X</pre><div></div><h2 id="combine-two">Combine two observables</h2><p>Just like in the <a href="#modify">"Modify an observable"</a> section, most of the methods in this section will return an observable of same type as the original observable (on which the method was called). </p><p id="filter-by"><a class="header" href="#filter-by">filterBy</a><code>obs.filterBy(otherObs)</code><br/>Works like <a href="#filter">filter</a>, but instead of calling a predicate on each value from <b>obs</b> observable, it checks the last value from <b>otherObs</b>. </p><pre class="javascript" title="example">var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]); var bar = Kefir.sequentially(200, [false, true, false]).delay(40).toProperty(() =&gt; true); var result = foo.filterBy(bar); result.log(); </pre><pre title="console output">&gt; [sequentially.filterBy] &lt;value&gt; 1 &gt; [sequentially.filterBy] &lt;value&gt; 2 &gt; [sequentially.filterBy] &lt;value&gt; 5 &gt; [sequentially.filterBy] &lt;value&gt; 6 &gt; [sequentially.filterBy] &lt;end&gt; </pre><pre title="events in time">foo: ----1----2----3----4----5----6----7----8X bar: t-----------f---------t---------fX result: ----1----2--------------5----6----------X</pre><div></div><p id="obs-sampled-by"><a class="header" href="#obs-sampled-by">sampledBy</a><code>obs.sampledBy(otherObs, [combinator])</code><br/>Returns a stream that emits the latest value from <b>obs</b> observable on each value from <b>otherObs</b>. Ends when <b>otherObs</b> ends. </p><p>You can also provide a <b>combinator</b> function which will be used to from the value emitted by the result stream. It is called with the latest values from <b>obs</b> and <b>otherObs</b> as arguments. The default <b>combinator</b> function is <tt>(a,&nbsp;b)&nbsp;=&gt;&nbsp;a</tt>. </p><pre class="javascript" title="example">var a = Kefir.sequentially(200, [2, 3]).toProperty(() =&gt; 1); var b = Kefir.interval(100, 0).delay(40).take(5); var result = a.sampledBy(b); result.log(); </pre><pre title="console output">&gt; [sequentially.toProperty.sampledBy] &lt;value&gt; 1 &gt; [sequentially.toProperty.sampledBy] &lt;value&gt; 2 &gt; [sequentially.toProperty.sampledBy] &lt;value&gt; 2 &gt; [sequentially.toProperty.sampledBy] &lt;value&gt; 3 &gt; [sequentially.toProperty.sampledBy] &lt;value&gt; 3 &gt; [sequentially.toProperty.sampledBy] &lt;end&gt; </pre><pre title="events in time">a: 1---------2---------3X b: ------0----0----0----0----0X result: ------1----2----2----3----3X</pre><div></div><p id="skip-until-by"><a class="header" href="#skip-until-by">skipUntilBy</a><code>obs.skipUntilBy(otherObs)</code><br/>Skips values from <b>obs</b> until the first value from <b>otherObs</b>. </p><pre class="javascript" title="example">var foo = Kefir.sequentially(100, [1, 2, 3, 4]); var bar = Kefir.later(250, 0); var result = foo.skipUntilBy(bar); result.log(); </pre><pre title="console output">&gt; [sequentially.skipUntilBy] &lt;value&gt; 3 &gt; [sequentially.skipUntilBy] &lt;value&gt; 4 &gt; [sequentially.skipUntilBy] &lt;end&gt; </pre><pre title="events in time">foo: ----1----2----3----4X bar: -----------0X result: --------------3----4X</pre><div></div><p id="take-until-by"><a class="header" href="#take-until-by">takeUntilBy</a><code>obs.takeUntilBy(otherObs)</code><br/>Takes values from <b>obs</b> until the first value from <b>otherObs</b> i.e., ends on the first value from <b>otherObs</b>. </p><pre class="javascript" title="example">var foo = Kefir.sequentially(100, [1, 2, 3, 4]); var bar = Kefir.later(250, 0); var result = foo.takeUntilBy(bar); result.log(); </pre><pre title="console output">&gt; [sequentially.takeUntilBy] &lt;value&gt; 1 &gt; [sequentially.takeUntilBy] &lt;value&gt; 2 &gt; [sequentially.takeUntilBy] &lt;end&gt; </pre><pre title="events in time">foo: ----1----2----3----4X bar: -----------0X result: ----1----2-X</pre><div></div><p id="buffer-by"><a class="header" href="#buffer-by">bufferBy</a><code>obs.bufferBy(otherObs, [options])</code><br/>Buffers all values from <b>obs</b> observable, and flushes the buffer on each value from <b>otherObs</b>. Also flushes the buffer before end. </p><p>If <tt>options.flushOnEnd</tt> is <tt>false</tt>, the buffer won't be flushed when the main observable ends. </p><p>The result observable will emit <tt>[</tt>] in cases, when the buffer supposed to be flushed, but it's empty. </p><pre class="javascript" title="example">var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]).delay(40); var bar = Kefir.sequentially(300, [1, 2]) var result = foo.bufferBy(bar); result.log(); </pre><pre title="console output">&gt; [sequentially.delay.bufferBy] &lt;value&gt; [1, 2] &gt; [sequentially.delay.bufferBy] &lt;value&gt; [3, 4, 5] &gt; [sequentially.delay.bufferBy] &lt;value&gt; [6, 7, 8] &gt; [sequentially.delay.bufferBy] &lt;end&gt; </pre><pre title="events in time">foo: ------1----2----3----4----5----6----7----8 X bar: --------------1--------------2X result: --------------•--------------•------------•X [1, 2] [3, 4, 5] [6, 7, 8]</pre><div></div><p id="buffer-while-by"><a class="header" href="#buffer-while-by">bufferWhileBy</a><code>obs.bufferWhileBy(otherObs, [options])</code><br/>Similar to <a href="#buffer-while">bufferWhile</a>, but instead of using a predicate function it uses another observable. On each value from <b>obs</b> observable: if the last value from <b>otherObs</b> was truthy, adds the new value to the buffer, otherwise flushes the buffer (with the new value included). </p><p>If <tt>options.flushOnEnd</tt> is <tt>false</tt>, the buffer won't be flushed when the main observable ends. </p><p>If <tt>options.flushOnChange</tt> is <tt>true</tt>, the buffer will be also flushed each time the controlling observable emits <tt>false</tt>. </p><p>The result observable will emit <tt>[</tt>] in cases, when the buffer supposed to be flushed, but it's empty. </p><p>The default options are <tt>{flushOnEnd: true, flushOnChange: false}</tt>. </p><pre class="javascript" title="example">var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]); var bar = Kefir.sequentially(200, [false, true, false]).delay(40); var result = foo.bufferWhileBy(bar); result.log(); </pre><pre title="console output">&gt; [sequentially.bufferWhileBy] &lt;value&gt; [1, 2, 3] &gt; [sequentially.bufferWhileBy] &lt;value&gt; [4] &gt; [sequentially.bufferWhileBy] &lt;value&gt; [5, 6, 7] &gt; [sequentially.bufferWhileBy] &lt;value&gt; [8] &gt; [sequentially.bufferWhileBy] &lt;end&gt; </pre><pre title="events in time">foo: ----1----2----3----4----5----6----7----8X bar: -----------f---------t---------fX result: --------------•----•--------------•----•X [1, 2, 3] [4] [5, 6, 7] [8]</pre><div></div><h2 id="interop">Interoperation with other async abstractions</h2><p id="from-promise"><a class="header" href="#from-promise">fromPromise</a><code>Kefir.fromPromise(promise)</code><br/>Converts a Promise to a Kefir Property. Uses <tt>promise.then(onFulfilled, onRejected)</tt> interface to subscribe to the promise. Also calls <tt>promise.done()</tt> (if there is such methods) to prevent libraries like <a href="https://github.com/kriskowal/q">Q</a> or <a href="https://github.com/cujojs/when">when</a> from swallowing exceptions. </p><pre class="javascript" title="example">var result = Kefir.fromPromise(new Promise(fulfill =&gt; fulfill(1))); result.log(); </pre><pre title="console output">&gt; [fromPromise] &lt;value&gt; 1 &gt; [fromPromise] &lt;end&gt; </pre><pre title="events in time">result: ----1X</pre><div></div><p id="to-promise"><a class="header" href="#to-promise">toPromise</a><code>obs.toPromise([PromiseConstructor])</code><br/>Converts an Kefir Observable to a Promise. If called without arguments the default <tt>gloabal.Promise</tt> constructor is used, alternatively you can pass a promise constructor that supports following interface: <tt>new Promise((resolve, reject) =&gt; { ... })</tt>. The promise will be fulfilled or rejected at the moment source observable ends, with the latest value or error. If observable ends without any value or error the promise will never be fulfilled/rejected. </p><pre class="javascript" title="example">var promise = Kefir.sequentially(1000, [1, 2]).toPromise(); promise.then(x =&gt; { console.log(&#x27;fulfilled with:&#x27;, x); }); </pre><pre title="console output">&gt; fulfilled with: 2</pre><div></div><p id="from-es-observable"><a class="header" href="#from-es-observable">fromESObservable</a><code>Kefir.fromESObservable(observable)</code><br/>Converts an <a href="https://github.com/zenparsing/es-observable">ECMAScript Observable</a> to a Kefir Stream. </p><pre class="javascript" title="example">var result = Kefir.fromESObservable(new Observable(observer =&gt; { observer.next(1); observer.next(2); observer.complete(); })); result.log(); </pre><pre title="console output">&gt; [fromESObservable] &lt;value&gt; 1 &gt; [fromESObservable] &lt;value&gt; 2 &gt; [fromESObservable] &lt;end&gt; </pre><pre title="events in time">result: 12X</pre><div></div><p id="to-es-observable"><a class="header" href="#to-es-observable">toESObservable</a><code>obs.toESObservable()</code><code class="alias">obs[Symbol.observable]()</code>Converts an Kefir Observable to an <a href="https://github.com/zenparsing/es-observable">ECMAScript Observable</a>. </p><p>Also available as <tt>obs[Symbol.observable</tt>], so you can use ES Observable's <tt>from</tt> method with Kefir Observables e.g., <tt>Observable.from(Kefir.sequentially(1000, [1, 2</tt>))]. </p><pre class="javascript" title="example">var observable = Kefir.sequentially(1000, [1, 2]).toESObservable(); observable.subscribe({ next(x) { console.log(&#x27;value:&#x27;, x); }, complete() { console.log(&#x27;completed&#x27;); } }); </pre><pre title="console output">&gt; value: 1 &gt; value: 2 &gt; completed</pre><div></div><p id="static-land"><a class="header" href="#static-land">Static Land</a><code>Kefir.staticLand.Observable</code><br/>Provides <a href="https://github.com/rpominov/static-land">Static Land</a> compatibility. The <tt>Observable</tt> type object supports following algebras: Semigroup, Monoid, Functor, Bifunctor, Apply, Applicative, Chain, Monad. </p><pre class="javascript" title="example">var Observable = Kefir.staticLand.Observable; var obs = Observable.map(x =&gt; x * 3, Observable.of(2)); obs.log(); </pre><pre title="console output">&gt; [constant.map] &lt;value:current&gt; 6 &gt; [constant.map] &lt;end:current&gt; </pre><p><a href="https://github.com/rpominov/static-land"><img src="https://raw.githubusercontent.com/rpominov/static-land/master/logo/logo.png" width="80" height="50"></a></p><h2 id="active-state">Activation and deactivation of observables</h2><p>At the moment one create an observable it's not yet subscribed to its source. Observables subscribe to their sources only when they themselves get a first subscriber. In this docs this process is called <b>activation</b> of an observable. Also when the last subscriber is removed from an observable, the observable <b>deactivates</b> and unsubscribes from its source. Later it can be <b>activated</b> again, and so on. </p><p>The <i>source</i> to which observable subscribe on <b>activation</b> may be an another observable (for example in <tt>.map</tt>), several other observables (<tt>.combine</tt>), or some external source (<tt>.fromEvents</tt>). </p><p>For example <tt>stream = Kefir.fromEvents(el, 'click')</tt> won't immediately subscribe to the <tt>'click'</tt> event on <tt>el</tt>, it will subscribe only when the first listener will be added to the <tt>stream</tt>. And it will automatically unsubscribe when the last listener will be removed from the <tt>stream</tt>. </p><pre class="javascript">var stream = Kefir.fromEvents(el, &#x27;click&#x27;); // at this moment event listener to _el_ not added stream.onValue(someFn); // now &#x27;click&#x27; listener is added to _el_ stream.offValue(someFn); // and now it is removed again </pre><p>As you might already guess <b>activation</b> and <b>deactivation</b> propagates up the observables chain. For instance if one create a long chain like <tt>Kefir.fromEvents(...).map(...).filter(...).take(...)</tt>, the whole chain will be <b>inactive</b> until first subscriber is added, and then it will <b>activate</b> up to <tt>.fromEvents</tt>. Same for <b>deactivation</b>.</p><h2 id="emitter-object">Emitter</h2><p>Emitter is an object that has four methods for emitting events. It is used in several places in Kefir as a proxy to emit events from some observable. </p><ul><li><tt>emitter.value(value)</tt> emits a value in the connected observable</li><li><tt>emitter.error(error)</tt> emits a error in the connected observable</li><li><tt>emitter.end()</tt> ends the connected observable</li><li><tt>emitter.event(event)</tt> emits an event (object with same format as in <a href="#on-any">onAny</a> method) in the connected observable</li></ul><pre class="javascript" title="example">emitter.value(123); emitter.error(&#x27;Oh, snap!&#x27;); emitter.end();</pre><div></div><p>All <b>emitter</b> methods are bound to their context, and can be passed as callbacks safely without binding: </p><pre class="javascript">// instead of this el.addEventListener(&#x27;click&#x27;, emitter.value.bind(emitter)); // you can do just this el.addEventListener(&#x27;click&#x27;, emitter.value);</pre><div></div><p>There also exist legacy aliases to <b>emitter</b> methods: </p><ul><li><tt>emitter.emit === emitter.value</tt></li><li><tt>emitter.emitEvent === emitter.event</tt></li></ul><h2 id="about-errors">Errors</h2><p>Kefir supports an additional channel to pass data through observables — errors. Unlike values, errors normally just flow through the observable chain without any transformation. Consider this example: </p><pre class="javascript" title="example">var foo = Kefir.stream(emitter =&gt; { emitter.emit(0); emitter.emit(2); emitter.error(-1); emitter.emit(3); emitter.end(); }); var bar = foo.map(x =&gt; x + 2).filter(x =&gt; x &gt; 3); bar.log(); </pre><pre title="console output">&gt; [stream.map.filter] &lt;value&gt; 4 &gt; [stream.map.filter] &lt;error&gt; -1 &gt; [stream.map.filter] &lt;value&gt; 5 &gt; [stream.map.filter] &lt;end&gt; </pre><pre title="events in time">foo: ---0---2---e---3---X -1 bar: -------4---e---5---X -1</pre><div></div><p>As you can see values are being mapped and filtered, but errors just flow unchanged. Also notice that observable doesn't end on an error by default, but you can use the <a href="#take-errors">takeErrors</a> method to make it happen. Consider a slight change to the above example: </p><pre class="javascript" title="example">var foo = Kefir.stream(emitter =&gt; { emitter.emit(0); emitter.emit(2); emitter.error(-1); emitter.emit(3); emitter.end(); }); var bar = foo.map(x =&gt; x + 2).filter(x =&gt; x &gt; 3); bar.takeErrors(1).log(); </pre><pre title="console output">&gt; [stream.map.filter.takeErrors] &lt;value:current&gt; 4 &gt; [stream.map.filter.takeErrors] &lt;error:current&gt; -1 &gt; [stream.map.filter.takeErrors] &lt;end:current&gt; </pre><pre title="events in time">foo: ---0---2---e---3---X -1 bar: -------4---eX -1</pre><div></div><h2 id="current-in-streams">Current values/errors in streams</h2><p>Normally in Kefir only Properties have current values, but depending on how we define "current value" we might say that Streams also may have them. </p><p>Let's see how we get current values from Properties first. There is no direct access to the current value of a property, we can't do something like <tt>prop.getCurrent()</tt>. Instead we subscribe to the property using <tt>onValue(callback)</tt> for example, and our callback gets called immediately with the current value. </p><p>If we define "current value" through that technical detail of getting it: current value is the value that we get in the callback immediately after subscribing. Then we can say that sometimes Streams also may have current values. </p><p>Let see some examples. </p><pre class="javascript" title="example">// This is the most straightforward way for creating such a stream var s1 = Kefir.stream(emitter =&gt; { emitter.emit(1); }); // But it can be created accidentally or intentionally in some other cases, such as var s2 = Kefir.merge([Kefir.constant(1), Kefir.never()]); var s3 = Kefir.combine([Kefir.constant(1), Kefir.constant(1)]); </pre><p>This feature of Streams has its cons and pros. </p><h3>Cons</h3><p>Only first subscriber gets that value. Even if it was an <tt>onEnd</tt> or <tt>onError</tt> subscriber, it'll still "consume" the "current value". Let's see it in an example: </p><pre class="javascript" title="example">var stream = Kefir.stream(emitter =&gt; { emitter.emit(1); }); stream.onValue(x =&gt; console.log(&#x27;first&#x27;, x)); // logs &quot;first 1&quot; stream.onValue(x =&gt; console.log(&#x27;second&#x27;, x)); // won&#x27;t log // Even with onError it will be consumed var stream2 = ... stream.onError(fn); stream.onValue(x =&gt; console.log(&#x27;second&#x27;, x)); // won&#x27;t log </pre><p>This is fixed in Properties as they remember the last value, and call any new subscriber with it. </p><p>Another issue is that it's not very good from semantics point of view. The moment when an event happens in a stream depends on the moment when the stream gets the first subscriber. That makes subscribing not a pure operation, and whole system gets less declarative and functional. </p><h3>Pros</h3><p>This feature allows you to define current value in a stream. Well it sounds just like the definition of the feature, but let's just look at an example, hopefully it'll help you understand what I mean: </p><pre class="javascript" title="example">var scrollTopStream = Kefir.stream(emitter =&gt; { emitter.emit(window.scrollY); // here we are emitting the current value! window.addEventListener(&#x27;scroll&#x27;, () =&gt; { emitter.emit(window.scrollY); }); }); // Let&#x27;s now convert it to a property like good citizens var scrollTopProperty = scrollTopStream.toProperty(); </pre><p>Another benefit is that it makes possible to not lose current values when converting properties to streams and then back to properties. For example, <a href="#combine">combine</a> always returns a stream (<a href="https://github.com/kefirjs/kefir/issues/44#issuecomment-72875317">why?</a>) but it'll still emit the current value. So one can do <tt>Kefir.combine([p1, p2</tt>, fn).toProperty()], and get a property combined from two other properties with the correct current value. </p><h3>P.S.</h3><p>Note that all this applies to <a href="#about-errors">errors</a> as well. </p><p>Also it's a good practice to convert all streams that might emit current values to properties by using the <b>toProperty</b> method. That should make your code more reliable as all subscribers will get current values. And it's just better semantically as current values should live in properties.</p></div><script>$.getJSON('https://api.github.com/emojis', function(emojis){ $('[data-emoji]').each(function(){ var name = $(this).data('emoji'); $(this).attr({ src: emojis[name], title: ':' + name + ':', alt: ':' + name + ':' }); }); }); var $window = $(window); var $document = $(document); function getScrollLeft() { return $window.scrollLeft(); } function getWinWidth() { return $window.width(); } function getDocWidth() { return $document.width(); } var scrolls = Kefir.fromEvents($window, 'scroll'); var resizes = Kefir.fromEvents($window, 'resize'); var scrollLeft = scrolls.map(getScrollLeft).toProperty(getScrollLeft).skipDuplicates(); var winWidth = resizes.map(getWinWidth).toProperty(getWinWidth).skipDuplicates(); var docWidth = resizes.map(getDocWidth).toProperty(getDocWidth).skipDuplicates(); Kefir.combine([scrollLeft, winWidth, docWidth], function(scrollLeft, winWidth, docWidth) { return -Math.min(docWidth - winWidth, Math.max(0, scrollLeft)); }).skipDuplicates().onValue(function(x) {$('.sidebar').css('left', x)}); $('pre.javascript').each(function(_, block) { hljs.highlightBlock(block); });</script></body></html>

Pages: 1 2 3 4 5 6 7 8 9 10