CINXE.COM

Samza - Feature Preview

<!DOCTYPE html> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <html lang="en"> <head> <meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta name="viewport" content="width=device-width, initial-scale=1, shrink-to-fit=no"> <meta charset="utf-8"> <title>Samza - Feature Preview</title> <link rel="apple-touch-icon-precomposed" sizes="57x57" href="/img/favicon/apple-touch-icon-57x57.png" /> <link rel="apple-touch-icon-precomposed" sizes="114x114" href="/img/favicon/apple-touch-icon-114x114.png" /> <link rel="apple-touch-icon-precomposed" sizes="72x72" href="/img/favicon/apple-touch-icon-72x72.png" /> <link rel="apple-touch-icon-precomposed" sizes="144x144" href="/img/favicon/apple-touch-icon-144x144.png" /> <link rel="apple-touch-icon-precomposed" sizes="60x60" href="/img/favicon/apple-touch-icon-60x60.png" /> <link rel="apple-touch-icon-precomposed" sizes="120x120" href="/img/favicon/apple-touch-icon-120x120.png" /> <link rel="apple-touch-icon-precomposed" sizes="76x76" href="/img/favicon/apple-touch-icon-76x76.png" /> <link rel="apple-touch-icon-precomposed" sizes="152x152" href="/img/favicon/apple-touch-icon-152x152.png" /> <link rel="icon" type="image/png" href="/img/favicon/favicon-196x196.png" sizes="196x196" /> <link rel="icon" type="image/png" href="/img/favicon/favicon-96x96.png" sizes="96x96" /> <link rel="icon" type="image/png" href="/img/favicon/favicon-32x32.png" sizes="32x32" /> <link rel="icon" type="image/png" href="/img/favicon/favicon-16x16.png" sizes="16x16" /> <link rel="icon" type="image/png" href="/img/favicon/favicon-128.png" sizes="128x128" /> <meta name="application-name" content="https://samza.apache.org" /> <meta name="msapplication-TileColor" content="#FFFFFF" /> <meta name="msapplication-TileImage" content="/img/favicon/mstile-144x144.png" /> <meta name="msapplication-square70x70logo" content="/img/favicon/mstile-70x70.png" /> <meta name="msapplication-square150x150logo" content="/img/favicon/mstile-150x150.png" /> <meta name="msapplication-wide310x150logo" content="/img/favicon/mstile-310x150.png" /> <meta name="msapplication-square310x310logo" content="/img/favicon/mstile-310x310.png" /> <link href="/css/ionicons.min.css" rel="stylesheet"> <link href="/css/google-fonts.css" rel="stylesheet"> <link href="/css/syntax.css" rel="stylesheet"/> <link rel="stylesheet" href="/css/main.new.css" /> </head> <body class="page"> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <div class="main-navigation" data-plugin="menu"> <div class="main-navigation__toggle" data-menu-closed> <i class="icon ion-md-menu"></i> </div> <div class="main-navigation__toggle main-navigation__toggle--opened" data-menu-opened> <i class="icon ion-md-close"></i> </div> <div class="main-navigation__inner"> <div class="main-navigation__logo"> <a href="/"> <img class="main-navigation__logo-img" src="/img/samza-logo.png" srcset="/img/samza-logo.png 1x, /img/samza-logo@2x.png 2x" alt="Samza Logo" /> </a> </div> <div class="main-navigation__items" data-menu-opened> <a class="main-navigation__item" href="/">Home</a> <a class="main-navigation__item" href="/learn/documentation/latest/core-concepts/core-concepts.html">Docs</a> <a class="main-navigation__item" href="/powered-by/">Powered By</a> <a class="main-navigation__item" href="/startup/download/">Downloads</a> <a class="main-navigation__item" href="/blog/">Blog</a> <div class="main-navigation__item main-navigation__item--group"> <div class="main-navigation__item-group-title"> Community <i class="icon ion-md-arrow-dropdown"></i> </div> <div class="main-navigation__item-group-list"> <a class="main-navigation__item" href="/community/contact-us.html">Contact Us</a> <a class="main-navigation__item" href="/contribute/contributors-corner.html">Contributor's Corner</a> <a class="main-navigation__item" href="/community/committers.html">PMC Members and committers</a> <a class="main-navigation__item" href="/meetups/">Talks and Meetups</a> </div> </div> </div> </div> </div> <div class="container"> <div class="container__toggle"> <i class="icon ion-md-arrow-dropleft-circle container__toggle-icon"></i> <i class="icon ion-md-arrow-dropright-circle container__toggle-icon container__toggle-icon--opened"></i> </div> <!-- There is only one menu, but made it as a no-output collection to grab data only --> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <div class="side-navigation"> <!-- Start Group --> <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> <!-- Make menu_title, and start items group if needed --> <div class="side-navigation__group-title"> <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> Getting Started </div> <div class="side-navigation__group-items " data-sub-menu > <!-- Handle sub navigation items from data --> <a class="side-navigation__group-item" data-match-active="" href="/startup/quick-start/latest/">QuickStart</a> <a class="side-navigation__group-item" data-match-active="" href="/startup/code-examples/latest/">Code Examples</a> <!-- Handle sub nagivation from site collections --> <!-- Close sub nav group --> </div> <!-- Close menu group --> </div> <!-- Start Group --> <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> <!-- Make menu_title, and start items group if needed --> <div class="side-navigation__group-title"> <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> Documentation </div> <div class="side-navigation__group-items side-navigation__group-has-submenus" data-sub-menu data-documentation="/learn/documentation/latest/"> <!-- Handle sub navigation items from data --> <!-- Handle sub nagivation from site collections --> <!-- Close sub nav group --> </div> <!-- Close menu group --> </div> <!-- Start Group --> <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> <!-- Make menu_title, and start items group if needed --> <div class="side-navigation__group-title"> <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> Releases </div> <div class="side-navigation__group-items " data-sub-menu > <!-- Handle sub navigation items from data --> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.8.0">1.8.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.7.0">1.7.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.6.0">1.6.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.5.1">1.5.1</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.5.0">1.5.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.4.0">1.4.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.3.1">1.3.1</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.3.0">1.3.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.2.0">1.2.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.1.0">1.1.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/1.0.0">1.0.0</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/0.14">0.14</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/0.13">0.13</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/0.12">0.12</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/0.11">0.11</a> <a class="side-navigation__group-item" data-match-active="" href="/releases/0.10">0.10</a> <!-- Handle sub nagivation from site collections --> <!-- Close sub nav group --> </div> <!-- Close menu group --> </div> <!-- Start Group --> <div class="side-navigation__group"> <!-- Make menu_title, and start items group if needed --> <a class="side-navigation__group-title" data-plugin="top-menu" data-match-active="" href="/blog/"> Blog </a> <!-- Handle sub navigation items from data --> <!-- Handle sub nagivation from site collections --> <!-- Close sub nav group --> <!-- Close menu group --> </div> <!-- Start Group --> <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> <!-- Make menu_title, and start items group if needed --> <div class="side-navigation__group-title"> <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> Community </div> <div class="side-navigation__group-items " data-sub-menu > <!-- Handle sub navigation items from data --> <a class="side-navigation__group-item" data-match-active="" href="/community/contact-us.html">Contact Us</a> <a class="side-navigation__group-item" data-match-active="" href="/contribute/contributors-corner.html">Contributor's Corner</a> <a class="side-navigation__group-item" data-match-active="" href="/contribute/enhancement-proposal.html">Enhancement Proposal</a> <a class="side-navigation__group-item" data-match-active="" href="/community/committers.html">PMC members & Committers</a> <a class="side-navigation__group-item" data-match-active="" href="/meetups/">Talks and Meetups</a> <!-- Handle sub nagivation from site collections --> <!-- Close sub nav group --> </div> <!-- Close menu group --> </div> <!-- Start Group --> <div class="side-navigation__group side-navigation__group--has-nested" data-plugin="sub-menu" data-sub-menu-show-class="side-navigation__group--has-nested-visible"> <!-- Make menu_title, and start items group if needed --> <div class="side-navigation__group-title"> <i class="side-navigation__group-title-icon icon ion-md-arrow-dropdown"></i> Case Studies </div> <div class="side-navigation__group-items " data-sub-menu > <!-- Handle sub navigation items from data --> <a class="side-navigation__group-item" data-match-active="exact" href="/case-studies/">View All</a> <hr> <!-- Handle sub nagivation from site collections --> <a class="side-navigation__group-item" href="/case-studies/ebay" data-match-active="">eBay</a> <a class="side-navigation__group-item" href="/case-studies/tripadvisor" data-match-active="">TripAdvisor</a> <a class="side-navigation__group-item" href="/case-studies/slack" data-match-active="">Slack</a> <a class="side-navigation__group-item" href="/case-studies/optimizely" data-match-active="">Optimizely</a> <a class="side-navigation__group-item" href="/case-studies/redfin" data-match-active="">Redfin</a> <a class="side-navigation__group-item" href="/case-studies/linkedin" data-match-active="">LinkedIn</a> <!-- Close sub nav group --> </div> <!-- Close menu group --> </div> </div> <div class="section"> <div class="content"> <h2>Feature Preview</h2> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <ol> <li><a href="#overview">Overview</a></li> <li><a href="#try-it-out">Try It Out</a></li> <li><a href="#architecture">Architecture</a></li> <li><a href="#high-level-api">High Level API</a></li> <li><a href="#flexible-deployment-model">Flexible Deployment Model</a></li> </ol> <hr /> <h2 id="overview">Overview</h2> <p>Samza 0.13.0 introduces a new programming model and a new deployment model. They’re being released as a preview because they represent major enhancements to how developers work with Samza, so it is beneficial for both early adopters and the Samza development community to experiment with the release and provide feedback. The following sections introduce the new features and link to tutorials which demonstrate how to use them. Please try them and send feedback to the <a href="mailto:dev@samza.apache.org">dev mailing list</a></p> <hr /> <h3 id="try-it-out">Try it Out</h3> <p>Want to skip all the details and get some hands on experience? There are three tutorials to help you get acquainted with running Samza applications in both YARN and embedded modes and programming with the high level API:</p> <ul> <li><a href="/learn/tutorials/latest/hello-samza-high-level-yarn.html">Yarn Deployment</a> - run a pre-existing Wikipedia application on YARN and observe the output.</li> <li><a href="/learn/tutorials/latest/hello-samza-high-level-code.html">High Level API Code Walkthrough</a> - walk through building the Wikipedia application, step by step.</li> <li><a href="/learn/tutorials/latest/hello-samza-high-level-zk.html">ZooKeeper Deployment</a> - run a pre-existing Wikipedia application with ZooKeeper coordination and observe the output.</li> </ul> <hr /> <h2 id="architecture">Architecture</h2> <h3 id="introduction">Introduction</h3> <p>The Samza high level API provides a unified way to handle both streaming and batch data. You can describe the end-to-end application logic in a single program with operators like map, filter, window, and join to accomplish what previously required multiple jobs. The API is designed to be portable. The same application code can be deployed in batch or streaming modes, embedded or with a cluster manager environments, and can switch between Kafka, Kinesis, HDFS or other systems with a simple configuration change. This portability is enabled by a new architecture which is described in the sections below.</p> <h3 id="concepts">Concepts</h3> <p>The Samza architecture has been overhauled with distinct layers to handle each stage of application development. The following diagram shows an overview of Apache Samza architecture with the high level API.</p> <p><img src="/img/latest/learn/documentation/introduction/layered-arch.png" alt="Architecture diagram" style="max-width: 100%; height: auto;" onclick="window.open(this.src)" /></p> <p>There are four layers in the architecture. The following sections describe each of the layers.</p> <h4 id="i-high-level-api">I. High Level API</h4> <p>The high level API provides the libraries to define your application logic. The <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a> is the central abstraction which your application must implement. You start by declaring your inputs as instances of <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html">MessageStream</a>. Then you can apply operators on each MessageStream like map, filter, window, and join to define the whole end-to-end data processing in a single program.</p> <p>For a deeper dive into the high level API, see <a href="#high-level-api">high level API section</a> below.</p> <h4 id="ii-applicationrunner">II. ApplicationRunner</h4> <p>Samza uses an <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/runtime/ApplicationRunner.html">ApplicationRunner</a> to run a stream application. The ApplicationRunner generates the configs (such as input/output streams), creates intermediate streams, and starts the execution. There are two types of ApplicationRunner:</p> <p><strong>RemoteApplicationRunner</strong> - submits the application to a remote cluster. This runner is invoked via the <em>run-app.sh</em> script. To use RemoteApplicationRunner, set the following configurations</p> <figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"># The StreamApplication class to run app.class=com.company.job.YourStreamApplication job.factory.class=org.apache.samza.job.yarn.YarnJobFactory</code></pre></figure> <p>Then use <em>run-app.sh</em> to run the application in the remote cluster. The script will invoke the RemoteApplicationRunner, which will launch one or more jobs using the factory specified with <em>job.factory.class</em>. Follow the <a href="/learn/tutorials/latest/hello-samza-high-level-yarn.html">yarn deployment tutorial</a> to try it out.</p> <p><strong>LocalApplicationRunner</strong> - runs the application in the JVM process of the runner. For example, to launch your application on multiple machines using ZooKeeper for coordination, you can run multiple instances of LocalApplicationRunner on various machines. After the applications load they will start cordinatinating their actions through ZooKeeper. Here is an example to run the StreamApplication in your program using the LocalApplicationRunner:</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="nc">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="kd">throws</span> <span class="nc">Exception</span> <span class="o">{</span> <span class="nc">CommandLine</span> <span class="n">cmdLine</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">CommandLine</span><span class="o">();</span> <span class="nc">Config</span> <span class="n">config</span> <span class="o">=</span> <span class="n">cmdLine</span><span class="o">.</span><span class="na">loadConfig</span><span class="o">(</span><span class="n">cmdLine</span><span class="o">.</span><span class="na">parser</span><span class="o">().</span><span class="na">parse</span><span class="o">(</span><span class="n">args</span><span class="o">));</span> <span class="nc">LocalApplicationRunner</span> <span class="n">localRunner</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">LocalApplicationRunner</span><span class="o">(</span><span class="n">config</span><span class="o">);</span> <span class="nc">StreamApplication</span> <span class="n">app</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">YourStreamApplication</span><span class="o">();</span> <span class="n">localRunner</span><span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="n">app</span><span class="o">);</span> <span class="c1">// Wait for the application to finish</span> <span class="n">localRunner</span><span class="o">.</span><span class="na">waitForFinish</span><span class="o">();</span> <span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Application completed with status "</span> <span class="o">+</span> <span class="n">localRunner</span><span class="o">.</span><span class="na">status</span><span class="o">(</span><span class="n">app</span><span class="o">));</span> <span class="o">}</span></code></pre></figure> <p>Follow the <a href="/learn/tutorials/latest/hello-samza-high-level-zk.html">ZooKeeper deployment tutorial</a> to try it out.</p> <h5 id="execution-plan">Execution Plan</h5> <p>The ApplicationRunner generates a physical execution plan for your processing logic before it starts executing it. The plan represents the runtime structure of the application. Particularly, it provides visibility into the generated intermediate streams. Once the job is deployed, the plan can be viewed as follows:</p> <ul> <li>For applications launched using <em>run-app.sh</em>, Samza will create a <em>plan</em> directory under your application deployment directory and write the <em>plan.json</em> file there.</li> <li>For the applications launched using your own script (e.g. for LocalApplicationRunner), please create a <em>plan</em> directory at the same level as <em>bin</em>, and point the <code class="language-plaintext highlighter-rouge">EXECUTION_PLAN_DIR</code> environment variable to its location.</li> </ul> <p>To view the plan, open the <em>bin/plan.html</em> file in a browser. Here’s a sample plan visualization:</p> <p><img src="/img/latest/learn/documentation/introduction/execution-plan.png" alt="Execution plan" style="max-width: 100%; height: auto;" onclick="window.open(this.src)" /></p> <h4 id="iii-execution-models">III. Execution Models</h4> <p>Samza supports two types of execution models: cluster based execution and embedded execution.</p> <p>In cluster based execution, Samza will run and manage your application on a multi-tenant cluster. Samza ships with support for <a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html">YARN</a>. You can implement your own StreamJob and a corresponding ResourceManagerFactory to add support for another cluster manager.</p> <p>In the embedded execution model, you can use Samza as a lightweight library within your application. You can spin up multiple instances of your application which will distribute and coordinate processing among themselves. This mode provides flexibility for running your applications in arbitrary hosting environments:It also supports pluggable coordination logic with out-of-the-box support for two types of coordination:</p> <ul> <li><strong>ZooKeeper based coordination</strong> - Samza can be configured to use ZooKeeper to manage group membership and partition assignment among instances of your application. This allows the you to dynamically scale your application by spinning up more instances or scaling down by shutting some down.</li> <li><strong>External coordination</strong> - Samza can run your application in a single JVM locally without coordination, or multiple JVMs with a static partition assignment. This is helpful when running in containerized environments like Kubernetes or Amazon ECS.</li> </ul> <p>For more details on running Samza in embedded mode, take a look at the <a href="#flexible-deployment-model">flexible deployment model</a> section below.</p> <h4 id="iv-processor">IV. Processor</h4> <p>The lowest execution unit of a Samza application is the processor. It reads the configs generated from the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/runtime/ApplicationRunner.html">ApplicationRunner</a> and processes the input stream partitions assigned by the JobCoordinator. It can access local state using a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/storage/kv/KeyValueStore.html">KeyValueStore</a> implementation (e.g. RocksDB or in-memory) and remote state (e.g. REST service) using multithreading.</p> <hr /> <h2 id="high-level-api">High Level API</h2> <p>Since the 0.13.0 release, Samza provides a new high level API that simplifies your applications. This API supports operations like re-partitioning, windowing, and joining on streams. You can now express your application logic concisely in few lines of code and accomplish what previously required multiple jobs.</p> <h2 id="code-examples">Code Examples</h2> <p>Check out some examples to see the high-level API in action.</p> <ol> <li><a href="https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewAdClickJoiner.java">Pageview AdClick Joiner</a> demonstrates joining a stream of PageViews with a stream of AdClicks, e.g. to analyze which pages get the most ad clicks.</li> <li><a href="https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewFilterApp.java">Pageview Repartitioner</a> illustrates re-partitioning the incoming stream of PageViews.</li> <li><a href="https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/PageViewSessionizerApp.java">Pageview Sessionizer</a> groups the incoming stream of events into sessions based on user activity.</li> <li><a href="https://github.com/apache/samza-hello-samza/blob/e5943a000eef87e077c422e09dc20f09d4e876ca/src/main/java/samza/examples/cookbook/TumblingPageViewCounterApp.java">Pageview by Region</a> counts the number of views per-region over tumbling time intervals.</li> </ol> <h2 id="key-concepts">Key Concepts</h2> <h3 id="streamapplication">StreamApplication</h3> <p>When writing your stream processing application using the Samza high-level API, you should implement a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/application/StreamApplication.html">StreamApplication</a> and define your processing logic in the init method.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">StreamGraph</span> <span class="n">graph</span><span class="o">,</span> <span class="nc">Config</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span> <span class="err">…</span> <span class="o">}</span></code></pre></figure> <p>For example, here is a StreamApplication that validates and decorates page views with viewer’s profile information.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">BadPageViewFilter</span> <span class="kd">implements</span> <span class="nc">StreamApplication</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">init</span><span class="o">(</span><span class="nc">StreamGraph</span> <span class="n">graph</span><span class="o">,</span> <span class="nc">Config</span> <span class="n">config</span><span class="o">)</span> <span class="o">{</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="err">“</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">”</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">isValidPageView</span><span class="o">)</span> <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">addProfileInformation</span><span class="o">)</span> <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">graph</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="err">“</span><span class="n">decorated</span><span class="o">-</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">”</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">DecoratedPageView</span><span class="o">.</span><span class="na">class</span><span class="o">)))</span> <span class="o">}</span> <span class="o">}</span></code></pre></figure> <h3 id="messagestream">MessageStream</h3> <p>A MessageStream, as the name implies, represents a stream of messages. A StreamApplication is described as a series of transformations on MessageStreams. You can get a MessageStream in two ways:</p> <ol> <li> <p>Using StreamGraph.getInputStream to get the MessageStream for a given input stream (e.g., a Kafka topic).</p> </li> <li> <p>By transforming an existing MessageStream using operations like map, filter, window, join etc.</p> </li> </ol> <h2 id="anatomy-of-a-typical-samza-streamapplication">Anatomy of a typical Samza StreamApplication</h2> <p>There are 3 simple steps to write your stream processing applications using the Samza high-level API.</p> <h3 id="step-1-obtain-the-input-streams">Step 1: Obtain the input streams:</h3> <p>You can obtain the MessageStream for your input stream ID (“page-views”) using StreamGraph.getInputStream.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViewInput</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="err">“</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">”</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">));</span> </code></pre></figure> <p>The first parameter <code class="language-plaintext highlighter-rouge">page-views</code> is the logical stream ID. Each stream ID is associated with a <em>physical name</em> and a <em>system</em>. By default, Samza uses the stream ID as the physical stream name and accesses the stream on the default system which is specified with the property “job.default.system”. However, the <em>physical name</em> and <em>system</em> properties can be overridden in configuration. For example, the following configuration defines the stream ID “page-views” as an alias for the PageViewEvent topic in a local Kafka cluster.</p> <figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">streams.page-views.samza.system=kafka systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory systems.kafka.consumer.zookeeper.connect=localhost:2181 systems.kafka.producer.bootstrap.servers=localhost:9092 streams.page-views.samza.physical.name=PageViewEvent</code></pre></figure> <p>The second parameter is a serde to de-serialize the incoming message.</p> <h3 id="step-2-define-your-transformation-logic">Step 2: Define your transformation logic:</h3> <p>You are now ready to define your StreamApplication logic as a series of transformations on MessageStreams.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">DecoratedPageViews</span><span class="o">&gt;</span> <span class="n">decoratedPageViews</span> <span class="o">=</span> <span class="n">pageViewInput</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">isValidPageView</span><span class="o">)</span> <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="k">this</span><span class="o">::</span><span class="n">addProfileInformation</span><span class="o">);</span></code></pre></figure> <h3 id="step-3-write-the-output-to-an-output-stream">Step 3: Write the output to an output stream:</h3> <p>Finally, you can create an OutputStream using StreamGraph.getOutputStream and send the transformed messages through it.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Send messages with userId as the key to “decorated-page-views”.</span> <span class="n">decoratedPageViews</span><span class="o">.</span><span class="na">sendTo</span><span class="o">(</span> <span class="n">graph</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="err">“</span><span class="n">decorated</span><span class="o">-</span><span class="n">page</span><span class="o">-</span><span class="n">views</span><span class="err">”</span><span class="o">,</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">DecoratedPageView</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span></code></pre></figure> <p>The first parameter <code class="language-plaintext highlighter-rouge">decorated-page-views</code> is a logical stream ID. The properties for this stream ID can be overridden just like the stream IDs for input streams. For example:</p> <figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">streams.decorated-page-views.samza.system=kafka streams.decorated-page-views.samza.physical.name=DecoratedPageViewEvent</code></pre></figure> <p>The second parameter is a serde to de-serialize the outgoing message.</p> <h2 id="operators">Operators</h2> <p>The high level API supports common operators like map, flatmap, filter, merge, joins, and windowing on streams. Most of these operators accept corresponding Functions and these functions are <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/InitableFunction.html">Initable</a>.</p> <h3 id="map">Map</h3> <p>Applies the provided 1:1 <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/MapFunction.html">MapFunction</a> to each element in the MessageStream and returns the transformed MessageStream. The MapFunction takes in a single message and returns a single message (potentially of a different type).</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">numbers</span> <span class="o">=</span> <span class="o">...</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">tripled</span><span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span class="n">m</span> <span class="o">*</span> <span class="mi">3</span><span class="o">)</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">stringified</span> <span class="o">=</span> <span class="n">numbers</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span class="nc">String</span><span class="o">.</span><span class="na">valueOf</span><span class="o">(</span><span class="n">m</span><span class="o">))</span></code></pre></figure> <h3 id="flatmap">Flatmap</h3> <p>Applies the provided 1:n <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FlatMapFunction.html">FlatMapFunction</a> to each element in the MessageStream and returns the transformed MessageStream. The FlatMapFunction takes in a single message and returns zero or more messages.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">sentence</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// Parse the sentence into its individual words splitting by space</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentence</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="n">sentence</span> <span class="o">-&gt;</span> <span class="nc">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">sentence</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="err">“</span> <span class="err">”</span><span class="o">))</span></code></pre></figure> <h3 id="filter">Filter</h3> <p>Applies the provided <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FilterFunction.html">FilterFunction</a> to the MessageStream and returns the filtered MessageStream. The FilterFunction is a predicate that specifies whether a message should be retained in the filtered stream. Messages for which the FilterFunction returns false are filtered out.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// Extract only the long words</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">longWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">&gt;</span> <span class="mi">15</span><span class="o">);</span> <span class="c1">// Extract only the short words</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">&gt;</span> <span class="n">shortWords</span> <span class="o">=</span> <span class="n">words</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">word</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">.</span><span class="na">size</span><span class="o">()</span> <span class="o">&lt;</span> <span class="mi">3</span><span class="o">);</span></code></pre></figure> <h3 id="partitionby">PartitionBy</h3> <p>Re-partitions this MessageStream using the key returned by the provided keyExtractor and returns the transformed MessageStream. Messages are sent through an intermediate stream during repartitioning.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// Repartition pageView by userId.</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;&gt;</span> <span class="n">partitionedPageViews</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">partitionBy</span><span class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key extractor</span> <span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">,</span> <span class="c1">// value extractor</span> <span class="nc">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="nc">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)),</span> <span class="c1">// serdes</span> <span class="s">"partitioned-page-views"</span><span class="o">)</span> <span class="c1">// operator ID</span> <span class="nc">The</span> <span class="n">operator</span> <span class="no">ID</span> <span class="n">should</span> <span class="n">be</span> <span class="n">unique</span> <span class="k">for</span> <span class="n">an</span> <span class="n">operator</span> <span class="n">within</span> <span class="n">the</span> <span class="n">application</span> <span class="n">and</span> <span class="n">is</span> <span class="n">used</span> <span class="n">to</span> <span class="n">identify</span> <span class="n">the</span> <span class="n">streams</span> <span class="n">and</span> <span class="n">stores</span> <span class="n">created</span> <span class="n">by</span> <span class="n">the</span> <span class="n">operator</span><span class="o">.</span></code></pre></figure> <h3 id="merge">Merge</h3> <p>Merges the MessageStream with all the provided MessageStreams and returns the merged stream.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">ServiceCall</span><span class="o">&gt;</span> <span class="n">serviceCall1</span> <span class="o">=</span> <span class="o">...</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">ServiceCall</span><span class="o">&gt;</span> <span class="n">serviceCall2</span> <span class="o">=</span> <span class="o">...</span> <span class="c1">// Merge individual “ServiceCall” streams and create a new merged MessageStream</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">ServiceCall</span><span class="o">&gt;</span> <span class="n">serviceCallMerged</span> <span class="o">=</span> <span class="n">serviceCall1</span><span class="o">.</span><span class="na">merge</span><span class="o">(</span><span class="n">serviceCall2</span><span class="o">)</span></code></pre></figure> <p>The merge transform preserves the order of each MessageStream, so if message <code class="language-plaintext highlighter-rouge">m1</code> appears before <code class="language-plaintext highlighter-rouge">m2</code> in any provided stream, then, <code class="language-plaintext highlighter-rouge">m1</code> also appears before <code class="language-plaintext highlighter-rouge">m2</code> in the merged stream.</p> <p>As an alternative to the <code class="language-plaintext highlighter-rouge">merge</code> instance method, you also can use the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/MessageStream.html#mergeAll-java.util.Collection-">MessageStream#mergeAll</a> static method to merge MessageStreams without operating on an initial stream.</p> <h3 id="sendto-stream">SendTo (stream)</h3> <p>Sends all messages from this MessageStream to the provided OutputStream. You can specify the key and the value to be used for the outgoing message.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Output a new message with userId as the key and region as the value to the “user-region” stream.</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;&gt;</span> <span class="n">keyedPageViews</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="no">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getRegion</span><span class="o">()));</span> <span class="nc">OutputStream</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">String</span><span class="o">&gt;&gt;</span> <span class="n">userRegions</span> <span class="o">=</span> <span class="n">graph</span><span class="o">.</span><span class="na">getOutputStream</span><span class="o">(</span><span class="err">“</span><span class="n">user</span><span class="o">-</span><span class="n">region</span><span class="err">”</span><span class="o">,</span> <span class="nc">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="nc">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">StringSerde</span><span class="o">()));</span> <span class="n">keyedPageViews</span><span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">userRegions</span><span class="o">);</span></code></pre></figure> <h3 id="sendto-table">SendTo (table)</h3> <p>Sends all messages from this MessageStream to the provided table, the expected message type is KV.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"> <span class="c1">// Write a new message with memberId as the key and profile as the value to a table.</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">"Profile"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">NoOpSerde</span><span class="o">&lt;</span><span class="nc">Profile</span><span class="o">&gt;())</span> <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">m</span> <span class="o">-&gt;</span> <span class="no">KV</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">m</span><span class="o">.</span><span class="na">getMemberId</span><span class="o">(),</span> <span class="n">m</span><span class="o">))</span> <span class="o">.</span><span class="na">sendTo</span><span class="o">(</span><span class="n">table</span><span class="o">);</span></code></pre></figure> <h3 id="sink">Sink</h3> <p>Allows sending messages from this MessageStream to an output system using the provided <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/SinkFunction.html">SinkFunction</a>.</p> <p>This offers more control than sendTo since the SinkFunction has access to the <code class="language-plaintext highlighter-rouge">MessageCollector</code> and the <code class="language-plaintext highlighter-rouge">TaskCoordinator</code>. For instance, you can choose to manually commit offsets, or shut-down the job using the TaskCoordinator APIs. This operator can also be used to send messages to non-Samza systems (e.g. remote databases, REST services, etc.)</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Repartition pageView by userId.</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">sink</span><span class="o">(</span> <span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">collector</span><span class="o">,</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span> <span class="c1">// Construct a new outgoing message, and send it to a kafka topic named TransformedPageViewEvent.</span> <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="nc">OutgoingMessageEnvelope</span><span class="o">(</span><span class="k">new</span> <span class="nc">SystemStream</span><span class="o">(</span><span class="err">“</span><span class="n">kafka</span><span class="err">”</span><span class="o">,</span> <span class="err">“</span><span class="nc">TransformedPageViewEvent</span><span class="err">”</span><span class="o">),</span> <span class="n">msg</span><span class="o">));</span> <span class="o">}</span> <span class="o">)</span></code></pre></figure> <h3 id="join-stream-stream">Join (stream-stream)</h3> <p>The stream-stream Join operator joins messages from two MessageStreams using the provided pairwise <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/JoinFunction.html">JoinFunction</a>. Messages are joined when the keys extracted from messages from the first stream match keys extracted from messages in the second stream. Messages in each stream are retained for the provided ttl duration and join results are emitted as matches are found.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Joins a stream of OrderRecord with a stream of ShipmentRecord by orderId with a TTL of 20 minutes.</span> <span class="c1">// Results are produced to a new stream of FulfilledOrderRecord.</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">OrderRecord</span><span class="o">&gt;</span> <span class="n">orders</span> <span class="o">=</span> <span class="err">…</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">ShipmentRecord</span><span class="o">&gt;</span> <span class="n">shipments</span> <span class="o">=</span> <span class="err">…</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">FulfilledOrderRecord</span><span class="o">&gt;</span> <span class="n">shippedOrders</span> <span class="o">=</span> <span class="n">orders</span><span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">shipments</span><span class="o">,</span> <span class="k">new</span> <span class="nc">OrderShipmentJoiner</span><span class="o">(),</span> <span class="k">new</span> <span class="nf">StringSerde</span><span class="o">(),</span> <span class="c1">// serde for the join key</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">OrderRecord</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">ShipmentRecord</span><span class="o">.</span><span class="na">class</span><span class="o">),</span> <span class="c1">// serde for both streams</span> <span class="nc">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">20</span><span class="o">),</span> <span class="c1">// join TTL</span> <span class="s">"shipped-order-stream"</span><span class="o">)</span> <span class="c1">// operator ID</span> <span class="c1">// Constructs a new FulfilledOrderRecord by extracting the order timestamp from the OrderRecord and the shipment timestamp from the ShipmentRecord.</span> <span class="kd">class</span> <span class="nc">OrderShipmentJoiner</span> <span class="kd">implements</span> <span class="nc">JoinFunction</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">OrderRecord</span><span class="o">,</span> <span class="nc">ShipmentRecord</span><span class="o">,</span> <span class="nc">FulfilledOrderRecord</span><span class="o">&gt;</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">FulfilledOrderRecord</span> <span class="nf">apply</span><span class="o">(</span><span class="nc">OrderRecord</span> <span class="n">message</span><span class="o">,</span> <span class="nc">ShipmentRecord</span> <span class="n">otherMessage</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="k">new</span> <span class="nf">FulfilledOrderRecord</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">,</span> <span class="n">message</span><span class="o">.</span><span class="na">orderTimestamp</span><span class="o">,</span> <span class="n">otherMessage</span><span class="o">.</span><span class="na">shipTimestamp</span><span class="o">);</span> <span class="o">}</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getFirstKey</span><span class="o">(</span><span class="nc">OrderRecord</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">;</span> <span class="o">}</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">String</span> <span class="nf">getSecondKey</span><span class="o">(</span><span class="nc">ShipmentRecord</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">orderId</span><span class="o">;</span> <span class="o">}</span> <span class="o">}</span></code></pre></figure> <h3 id="join-stream-table">Join (stream-table)</h3> <p>The stream-table Join operator joins messages from a MessageStream using the provided <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/StreamTableJoinFunction.html">StreamTableJoinFunction</a>. Messages from the input stream are joined with record in table using key extracted from input messages. The join function is invoked with both the message and the record. If a record is not found in the table, a null value is provided; the join function can choose to return null (inner join) or an output message (left outer join). For join to function properly, it is important to ensure the input stream and table are partitioned using the same key as this impacts the physical placement of data.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getInputStream</span><span class="o">(</span><span class="s">"PageView"</span><span class="o">,</span> <span class="k">new</span> <span class="nc">NoOpSerde</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;())</span> <span class="o">.</span><span class="na">partitionBy</span><span class="o">(</span><span class="nl">PageView:</span><span class="o">:</span><span class="n">getMemberId</span><span class="o">,</span> <span class="n">v</span> <span class="o">-&gt;</span> <span class="n">v</span><span class="o">,</span> <span class="s">"p1"</span><span class="o">)</span> <span class="o">.</span><span class="na">join</span><span class="o">(</span><span class="n">table</span><span class="o">,</span> <span class="k">new</span> <span class="nc">PageViewToProfileJoinFunction</span><span class="o">())</span> <span class="o">...</span></code></pre></figure> <figure class="highlight"><pre><code class="language-java" data-lang="java"><table class="rouge-table"><tbody><tr><td class="gutter gl"><pre class="lineno">1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 </pre></td><td class="code"><pre><span class="kd">public</span> <span class="kd">class</span> <span class="nc">PageViewToProfileJoinFunction</span> <span class="kd">implements</span> <span class="nc">StreamTableJoinFunction</span> <span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;,</span> <span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;,</span> <span class="nc">EnrichedPageView</span><span class="o">&gt;</span> <span class="o">{</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">EnrichedPageView</span> <span class="nf">apply</span><span class="o">(</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">m</span><span class="o">,</span> <span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;</span> <span class="n">r</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">r</span> <span class="o">!=</span> <span class="kc">null</span> <span class="o">?</span> <span class="k">new</span> <span class="nf">EnrichedPageView</span><span class="o">(...)</span> <span class="o">:</span> <span class="kc">null</span><span class="o">;</span> <span class="o">}</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">Integer</span> <span class="nf">getMessageKey</span><span class="o">(</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">message</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">message</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span> <span class="o">}</span> <span class="nd">@Override</span> <span class="kd">public</span> <span class="nc">Integer</span> <span class="nf">getRecordKey</span><span class="o">(</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;</span> <span class="n">record</span><span class="o">)</span> <span class="o">{</span> <span class="k">return</span> <span class="n">record</span><span class="o">.</span><span class="na">getKey</span><span class="o">();</span> <span class="o">}</span> <span class="o">}</span> </pre></td></tr></tbody></table></code></pre></figure> <h3 id="window">Window</h3> <h4 id="windowing-concepts">Windowing Concepts</h4> <p><strong>Windows, Triggers, and WindowPanes</strong>: The window operator groups incoming messages in the MessageStream into finite windows. Each emitted result contains one or more messages in the window and is called a WindowPane.</p> <p>A window can have one or more associated triggers which determine when results from the window are emitted. Triggers can be either <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/windows/Window.html#setEarlyTrigger-org.apache.samza.operators.triggers.Trigger-">early triggers</a> that allow emitting results speculatively before all data for the window has arrived, or late triggers that allow handling late messages for the window.</p> <p><strong>Aggregator Function</strong>: By default, the emitted WindowPane will contain all the messages for the window. Instead of retaining all messages, you typically define a more compact data structure for the WindowPane and update it incrementally as new messages arrive, e.g. for keeping a count of messages in the window. To do this, you can provide an aggregating <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/operators/functions/FoldLeftFunction.html">FoldLeftFunction</a> which is invoked for each incoming message added to the window and defines how to update the WindowPane for that message.</p> <p><strong>Accumulation Mode</strong>: A window’s accumulation mode determines how results emitted from a window relate to previously emitted results for the same window. This is particularly useful when the window is configured with early or late triggers. The accumulation mode can either be discarding or accumulating.</p> <p>A <em>discarding window</em> clears all state for the window at every emission. Each emission will only correspond to new messages that arrived since the previous emission for the window.</p> <p>An <em>accumulating window</em> retains window results from previous emissions. Each emission will contain all messages that arrived since the beginning of the window.</p> <h4 id="window-types">Window Types:</h4> <p>The Samza high-level API currently supports tumbling and session windows.</p> <p><strong>Tumbling Window</strong>: A tumbling window defines a series of contiguous, fixed size time intervals in the stream.</p> <p>Examples:</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Group the pageView stream into 3 second tumbling windows keyed by the userId.</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="o">...</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Collection</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;&gt;&gt;</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span> <span class="nc">Windows</span><span class="o">.</span><span class="na">keyedTumblingWindow</span><span class="o">(</span><span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="c1">// key extractor</span> <span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="c1">// window duration</span> <span class="k">new</span> <span class="nf">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">JsonSerdeV2</span><span class="o">&lt;&gt;(</span><span class="nc">PageView</span><span class="o">.</span><span class="na">class</span><span class="o">)));</span> <span class="c1">// Compute the maximum value over tumbling windows of 3 seconds.</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">integers</span> <span class="o">=</span> <span class="err">…</span> <span class="nc">Supplier</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="nc">Integer</span><span class="o">.</span><span class="na">MIN_VALUE</span><span class="o">;</span> <span class="nc">FoldLeftFunction</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">aggregateFunction</span> <span class="o">=</span> <span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="nc">Math</span><span class="o">.</span><span class="na">max</span><span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">);</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;&gt;</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">integers</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">30</span><span class="o">),</span> <span class="n">initialValue</span><span class="o">,</span> <span class="n">aggregateFunction</span><span class="o">,</span> <span class="k">new</span> <span class="nc">IntegerSerde</span><span class="o">()));</span></code></pre></figure> <p><strong>Session Window</strong>: A session window groups a MessageStream into sessions. A session captures a period of activity over a MessageStream and is defined by a gap. A session is closed and results are emitted if no new messages arrive for the window for the gap duration.</p> <p>Examples:</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// Sessionize a stream of page views, and count the number of page-views in a session for every user.</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">&gt;</span> <span class="n">pageViews</span> <span class="o">=</span> <span class="err">…</span> <span class="nc">Supplier</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="mi">0</span> <span class="nc">FoldLeftFunction</span><span class="o">&lt;</span><span class="nc">PageView</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">countAggregator</span> <span class="o">=</span> <span class="o">(</span><span class="n">pageView</span><span class="o">,</span> <span class="n">oldCount</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">oldCount</span> <span class="o">+</span> <span class="mi">1</span><span class="o">;</span> <span class="nc">Duration</span> <span class="n">sessionGap</span> <span class="o">=</span> <span class="nc">Duration</span><span class="o">.</span><span class="na">ofMinutes</span><span class="o">(</span><span class="mi">3</span><span class="o">);</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">String</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">sessionCounts</span> <span class="o">=</span> <span class="n">pageViews</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span class="na">keyedSessionWindow</span><span class="o">(</span> <span class="n">pageView</span> <span class="o">-&gt;</span> <span class="n">pageView</span><span class="o">.</span><span class="na">getUserId</span><span class="o">(),</span> <span class="n">sessionGap</span><span class="o">,</span> <span class="n">initialValue</span><span class="o">,</span> <span class="n">countAggregator</span><span class="o">,</span> <span class="k">new</span> <span class="nf">StringSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">IntegerSerde</span><span class="o">()));</span> <span class="c1">// Compute the maximum value over tumbling windows of 3 seconds.</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">integers</span> <span class="o">=</span> <span class="err">…</span> <span class="nc">Supplier</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">initialValue</span> <span class="o">=</span> <span class="o">()</span> <span class="o">-&gt;</span> <span class="nc">Integer</span><span class="o">.</span><span class="na">MAX_INT</span> <span class="nc">FoldLeftFunction</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;</span> <span class="n">aggregateFunction</span> <span class="o">=</span> <span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="nc">Math</span><span class="o">.</span><span class="na">max</span><span class="o">(</span><span class="n">msg</span><span class="o">,</span> <span class="n">oldValue</span><span class="o">)</span> <span class="nc">MessageStream</span><span class="o">&lt;</span><span class="nc">WindowPane</span><span class="o">&lt;</span><span class="nc">Void</span><span class="o">,</span> <span class="nc">Integer</span><span class="o">&gt;&gt;</span> <span class="n">windowedStream</span> <span class="o">=</span> <span class="n">integers</span><span class="o">.</span><span class="na">window</span><span class="o">(</span><span class="nc">Windows</span><span class="o">.</span><span class="na">tumblingWindow</span><span class="o">(</span><span class="nc">Duration</span><span class="o">.</span><span class="na">ofSeconds</span><span class="o">(</span><span class="mi">3</span><span class="o">),</span> <span class="n">initialValue</span><span class="o">,</span> <span class="n">aggregateFunction</span><span class="o">,</span> <span class="k">new</span> <span class="nf">IntegerSerde</span><span class="o">()))</span></code></pre></figure> <h3 id="table">Table</h3> <p>A Table represents a dataset that can be accessed by keys, and is one of the building blocks of the Samza high level API; the main motivation behind it is to support stream-table joins. The current K/V store is leveraged to provide backing store for local tables. More variations such as direct access and composite tables will be supported in the future. The usage of a table typically follows three steps:</p> <ol> <li>Create a table</li> <li>Populate the table using the sendTo() operator</li> <li>Join a stream with the table using the join() operator</li> </ol> <figure class="highlight"><pre><code class="language-java" data-lang="java"><table class="rouge-table"><tbody><tr><td class="gutter gl"><pre class="lineno">1 2 3 4 5 </pre></td><td class="code"><pre><span class="kd">final</span> <span class="nc">StreamApplication</span> <span class="n">app</span> <span class="o">=</span> <span class="o">(</span><span class="n">streamGraph</span><span class="o">,</span> <span class="n">cfg</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span> <span class="nc">Table</span><span class="o">&lt;</span><span class="no">KV</span><span class="o">&lt;</span><span class="nc">Integer</span><span class="o">,</span> <span class="nc">Profile</span><span class="o">&gt;&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="n">streamGraph</span><span class="o">.</span><span class="na">getTable</span><span class="o">(</span><span class="k">new</span> <span class="nc">InMemoryTableDescriptor</span><span class="o">(</span><span class="s">"t1"</span><span class="o">)</span> <span class="o">.</span><span class="na">withSerde</span><span class="o">(</span><span class="nc">KVSerde</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="k">new</span> <span class="nc">IntegerSerde</span><span class="o">(),</span> <span class="k">new</span> <span class="nc">ProfileJsonSerde</span><span class="o">())));</span> <span class="o">...</span> <span class="o">};</span> </pre></td></tr></tbody></table></code></pre></figure> <p>Example above creates a TableDescriptor object, which contains all information about a table. The currently supported table types are <a href="https://github.com/apache/samza/blob/master/samza-kv-inmemory/src/main/java/org/apache/samza/storage/kv/inmemory/InMemoryTableDescriptor.java">InMemoryTableDescriptor</a> and <a href="https://github.com/apache/samza/blob/master/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/rocksdb/RocksDbTableDescriptor.java">RocksDbTableDescriptor</a>. Notice the type of records in a table is KV, and <a href="https://samza.apache.org/learn/documentation/latest/container/serialization.html">Serdes</a> for both key and value of records needs to be defined (line 4). Additional parameters can be added based on individual table types.</p> <p>More details about step 2 and 3 can be found at operator section.</p> <hr /> <h2 id="flexible-deployment-model">Flexible Deployment Model</h2> <h3 id="introduction-1">Introduction</h3> <p>Prior to Samza 0.13.0, Samza only supported cluster-managed deployment with <a href="http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html">YARN</a>.</p> <p>With Samza 0.13.0, the deployment model has been simplified and decoupled from YARN. If you prefer cluster management, you can still use YARN or you can implement your own extension to deploy Samza on other cluster management systems. But what if you want to avoid cluster management systems altogether?</p> <p>Samza now ships with the ability to deploy applications as a simple embedded library with pluggable coordination. With embedded mode, you can leverage Samza processors directly in your application and deploy it in whatever way you prefer. Samza has a pluggable job coordinator layer to perform leader election and assign work to the processors.</p> <p>This section will focus on the new embedded deployment capability.</p> <h3 id="concepts-1">Concepts</h3> <p>Let’s take a closer look at how embedded deployment works.</p> <p>The <a href="#architecture">architecture</a> section above provided an overview of the layers that enable the flexible deployment model. The new embedded mode comes into the picture at the <em>deployment</em> layer. The deployment layer includes assignment of input partitions to the available processors.</p> <p>There are two types of partition assignment models which are controlled with the <em>job.coordinator.factory</em> in configuration:</p> <h4 id="external-partition-management">External Partition Management</h4> <p>With external partition management, Samza doesn’t manage the partitioning by itself. Instead it uses a <code class="language-plaintext highlighter-rouge">PassthroughJobCoordinator</code> which honors whatever partition mapping is provided by the <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/container/grouper/stream/SystemStreamPartitionGrouper.html">SystemStreamPartitionGrouper</a>. There are two common patterns for external partition management:</p> <ul> <li><strong>Using high level Kafka consumer</strong> - partition assignment is done by the high level Kafka consumer itself. To use this model, you need to implement and configure a SystemFactory which provides the Kafka high level consumer. Then you need to configure <em>job.systemstreampartition.grouper.factory</em> to <em>org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouper</em> so Kafka’s partition assignments all go to one task.</li> <li><strong>Customized partitioning</strong> - partition assignment is done with a custom grouper. The grouper logic is completely up to you. A practical example of this model is to implement a custom grouper which reads static partition assignments from the configuration.</li> </ul> <p>Samza ships with a <code class="language-plaintext highlighter-rouge">PassthroughJobCoordinatorFactory</code> which facilitates this type of partition management.</p> <h4 id="dynamic-partition-management">Dynamic Partition Management</h4> <p>With dynamic partitioning, partitions are distributed between the available processors at runtime. If the number of available processors changes (for example, if some are shut down or added) the mapping will be regenerated and re-distributed to all the processors. Information about current mapping is contained in a special structure called the JobModel. There is one leader processor which generates the JobModel and distributes it to the other processors. The leader is determined by a “leader election” process.</p> <p>Let’s take a closer look at how dynamic coordination works.</p> <h4 id="coordination-service">Coordination service</h4> <p>Dynamic coordination of the processors assumes presence of a coordination service. The main responsibilities of the service are:</p> <ul> <li><strong>Leader Election</strong> - electing a single processor, which will be responsible for JobModel calculation and distribution or for intermediate streams creation.</li> <li><strong>Central barrier and latch</strong> - coordination primitives used by the processors.</li> <li><strong>JobModel notifications</strong> - notifying the processors about availability of a new JobModel.</li> <li><strong>JobModel storage</strong> the coordination service dictates where the JobModel is persisted.</li> </ul> <p>The coordination service is currently derived from the job coordinator factory. Samza ships with a <code class="language-plaintext highlighter-rouge">ZkJobCoordinatorFactory</code> implementation which has a corresponding <code class="language-plaintext highlighter-rouge">ZkCoordinationServiceFactory</code>.</p> <p>Let’s walk through the coordination sequence for a ZooKeeper based embedded application:</p> <ul> <li>Each processor (participant) will register with the pluggable coordination service. During the registration it will provide its own participant ID.</li> <li>One of the participants will be elected as the leader.</li> <li>The leader monitors the list of all the active participants.</li> <li>Whenever the list of the participants changes, the leader will generate a new JobModel for the current participants.</li> <li>The new JobModel will be pushed to a common storage. The default implementation uses ZooKeeper for this purpose.</li> <li>Participants are notified that the new JobModel is available. Notification is done through the coordination services, e.g. ZooKeeper.</li> <li>The participants will stop processing, apply the new JobModel, and then resume processing.</li> </ul> <p>The following diagram shows the relationships of the coordinators in the ZooKeeper coordination service implementation.</p> <p><img src="/img/latest/learn/documentation/introduction/coordination-service.png" alt="Coordination service diagram" style="max-width: 100%; height: auto;" onclick="window.open(this.src)" /></p> <p>Here are a few important details about the coordination service:</p> <ul> <li>In order to ensure that no two partitions are processed twice by different processors, processing is paused and the processors synchronize on a barrier. Once all the processors are paused, the new JobModel is applied and the processing resumes. The barrier is implemented using the coordination service.</li> <li>During startup and shutdown the processors will be joining/leaving one after another. To avoid redundant JobModel re-calculation, there is a debounce timer which waits for some short period of time (2 seconds by default, configurable in a future release) for more processors to join or leave. Each time a processor joins or leaves, the timer is reset. When the timer expires the JobModel is finally recalculated.</li> <li>If the processors require local store for adjacent or temporary data, we would want to keep its mapping across restarts. For this we uses some extra information about each processor, which uniquely identifies it and its location. If the same processor is restarted on the same location we will try to assign it the same partitions. This locality information should survive the restarts, so it is stored on a common storage (currently using ZooKeeper).</li> </ul> <h3 id="user-guide">User guide</h3> <p>Embedded deployment is designed to help users who want more control over the deployment of their application. So it is the user’s responsibility to configure and deploy the processors. In case of ZooKeeper coordination, you also need to configure the URL for an instance of ZooKeeper.</p> <p>Additionally, each processor requires a unique ID to be used with the coordination service. If location affinity is important, this ID should be unique for each processor on a specific hostname (assuming local Storage services). To address this requirement, Samza uses a <a href="/learn/documentation/latest/api/javadocs/org/apache/samza/runtime/ProcessorIdGenerator.html">ProcessorIdGenerator</a> to provide the ID for each processor. If no generator is explicitly configured, the default one will create a UUID for each processor.</p> <h4 id="configuration">Configuration</h4> <p>To run an embedded Samza processor, you need to configure the coordinator service using the <em>job.coordinator.factory</em> property. Also, there is currently one taskname grouper that supports embedded mode, so you must configure that explicitly.</p> <p>Let’s take a look at how to configure the two coordination service implementations that ship with Samza.</p> <p>To use ZooKeeper-based coordination, the following configs are required:</p> <figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties">job.coordinator.factory=org.apache.samza.zk.ZkJobCoordinatorFactory job.coordinator.zk.connect=yourzkconnection task.name.grouper.factory=org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</code></pre></figure> <p>To use external coordination, the following configs are needed:</p> <figure class="highlight"><pre><code class="language-properties" data-lang="properties"><span class="py">job.coordinator.factory</span><span class="p">=</span><span class="s">org.apache.samza.standalone.PassthroughJobCoordinatorFactory</span> <span class="py">task.name.grouper.factory</span><span class="p">=</span><span class="s">org.apache.samza.container.grouper.task.GroupByContainerIdsFactory</span></code></pre></figure> <h4 id="api">API</h4> <p>As mentioned in the <a href="#architecture">architecture</a> section above, you use the LocalApplicationRunner to launch your processors from your application code, like this:</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">WikipediaZkLocalApplication</span> <span class="o">{</span> <span class="kd">public</span> <span class="kd">static</span> <span class="kt">void</span> <span class="nf">main</span><span class="o">(</span><span class="nc">String</span><span class="o">[]</span> <span class="n">args</span><span class="o">)</span> <span class="o">{</span> <span class="nc">CommandLine</span> <span class="n">cmdLine</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">CommandLine</span><span class="o">();</span> <span class="nc">OptionSet</span> <span class="n">options</span> <span class="o">=</span> <span class="n">cmdLine</span><span class="o">.</span><span class="na">parser</span><span class="o">().</span><span class="na">parse</span><span class="o">(</span><span class="n">args</span><span class="o">);</span> <span class="nc">Config</span> <span class="n">config</span> <span class="o">=</span> <span class="n">cmdLine</span><span class="o">.</span><span class="na">loadConfig</span><span class="o">(</span><span class="n">options</span><span class="o">);</span> <span class="nc">LocalApplicationRunner</span> <span class="n">runner</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">LocalApplicationRunner</span><span class="o">(</span><span class="n">config</span><span class="o">);</span> <span class="nc">WikipediaApplication</span> <span class="n">app</span> <span class="o">=</span> <span class="k">new</span> <span class="nc">WikipediaApplication</span><span class="o">();</span> <span class="n">runner</span><span class="o">.</span><span class="na">run</span><span class="o">(</span><span class="n">app</span><span class="o">);</span> <span class="n">runner</span><span class="o">.</span><span class="na">waitForFinish</span><span class="o">();</span> <span class="o">}</span> <span class="o">}</span></code></pre></figure> <p>In the code above, <code class="language-plaintext highlighter-rouge">WikipediaApplication</code> is an application written with the <a href="#high-level-api">high level API</a>.</p> <p>Check out the <a href="/learn/tutorials/latest/hello-samza-high-level-zk.html">tutorial</a> to run this application with ZooKeeper coordination on your machine now.</p> <h4 id="deployment-and-scaling">Deployment and Scaling</h4> <p>You can deploy the application instances in any way you prefer. If using the coordination service, you can add or remove instances at any time and the leader’s job coordinator (elected via the CoordinationService) will automatically recalculate the JobModel after the debounce time and apply it to the available processors. So, to scale up your application, you simply start more processors.</p> <h3 id="known-issues">Known issues</h3> <p>Take note of the following issues with the embedded deployment feature for the 0.13.0 release. They will be fixed in a subsequent release.</p> <ul> <li>The GroupByContainerCount default taskname grouper isn’t supported.</li> <li>Host affinity is not enabled.</li> <li>ZkJobCoordinator metrics are not provided yet. Metrics will soon be added for <ul> <li>Number of JobModel recalculations</li> <li>Number of Read/Writes</li> <li>Leader reelections</li> <li>more..</li> </ul> </li> <li>The LocalApplicationRunner does not yet support the low level API. This means you cannot use StreamTask with LocalApplicationRunner.</li> <li>Currently, ‘app.id’ config must be unique for all the application using this zk cluster.</li> </ul> </div> </div> </div> <!-- footer starts here --> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <footer> <div class="footer-inner"> <div class="side-by-side"> <div> <div class="footer__heading">Learn More</div> <div class="footer__items"> <a class="footer__item" href="/meetups/">Meetups</a> <a class="footer__item" href="/blog/">Blog</a> <a class="footer__item" href="/learn/documentation/latest/introduction/background.html">About</a> </div> </div> <div> <div class="footer__heading">Community</div> <div class="footer__items"> <a class="footer__item" href="/community/contact-us.html">Contact Us</a> <a class="footer__item" href="/contribute/contributors-corner.html">Contributors' Corner</a> <a class="footer__item" href="/community/committers.html">PMC members and committers</a> <a class="footer__item" href="/powered-by/">Powered By</a> </div> </div> <div> <div class="quick-links"> <a class="quick-link" href="/startup/download" target="_blank"> <i class="icon ion-md-download"></i> </a> <a class="quick-link" href="https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tree" target="_blank"> <i class="icon ion-md-code"></i> </a> <a class="quick-link" href="https://twitter.com/samzastream" target="_blank"> <i class="icon ion-logo-twitter"></i> </a> </div> <p> <script>document.write(new Date().getFullYear());</script> &copy; samza.apache.org</p> </div> </div> </div> </footer> <!-- Google Analytics --> <script> (function (i, s, o, g, r, a, m) { i['GoogleAnalyticsObject'] = r; i[r] = i[r] || function () { (i[r].q = i[r].q || []).push(arguments) }, i[r].l = 1 * new Date(); a = s.createElement(o), m = s.getElementsByTagName(o)[0]; a.async = 1; a.src = g; m.parentNode.insertBefore(a, m) })(window, document, 'script', '//www.google-analytics.com/analytics.js', 'ga'); ga('create', 'UA-43122768-1', 'apache.org'); ga('send', 'pageview'); </script> <script src="/js/main.new.js"></script> </body> </html>

Pages: 1 2 3 4 5 6 7 8 9 10