CINXE.COM
Samza - API Overview
<!DOCTYPE html> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <html lang="en"> <head> <meta charset="utf-8"> <title>Samza - API Overview</title> <link href='/css/ropa-sans.css' rel='stylesheet' type='text/css'/> <link href="/css/bootstrap.min.css" rel="stylesheet"/> <link href="/css/font-awesome.min.css" rel="stylesheet"/> <link href="/css/main.css" rel="stylesheet"/> <link href="/css/syntax.css" rel="stylesheet"/> <link rel="icon" type="image/png" href="/img/samza-icon.png"> <script src="/js/jquery-1.11.1.min.js"></script> </head> <body> <div class="wrapper"> <div class="wrapper-content"> <div class="masthead"> <div class="container"> <div class="masthead-logo"> <a href="/" class="logo">samza</a> </div> <div class="masthead-icons"> <div class="pull-right"> <a href="/startup/download"><i class="fa fa-arrow-circle-o-down masthead-icon"></i></a> <a href="https://git-wip-us.apache.org/repos/asf?p=samza.git;a=tree" target="_blank"><i class="fa fa-code masthead-icon" style="font-weight: bold;"></i></a> <a href="https://twitter.com/samzastream" target="_blank"><i class="fa fa-twitter masthead-icon"></i></a> <!-- this icon only shows in versioned pages --> <a href="http://samza.apache.org/learn/documentation/0.14/api/overview"><i id="switch-version-button"></i></a> <!-- links for the navigation bar --> </div> </div> </div><!-- /.container --> </div> <div class="container"> <div class="menu"> <h1><i class="fa fa-rocket"></i> Getting Started</h1> <ul> <li><a href="/startup/hello-samza/latest">Hello Samza</a></li> <li><a href="/startup/download">Download</a></li> <li><a href="/startup/preview">Feature Preview</a></li> </ul> <h1><i class="fa fa-book"></i> Learn</h1> <ul> <li><a href="/learn/documentation/latest">Documentation</a></li> <li><a href="/learn/documentation/latest/jobs/configuration-table.html">Configuration</a></li> <li><a href="/learn/documentation/latest/container/metrics-table.html">Metrics</a></li> <li><a href="/learn/documentation/latest/api/javadocs/">Javadocs</a></li> <li><a href="/learn/tutorials/latest">Tutorials</a></li> <li><a href="https://cwiki.apache.org/confluence/display/SAMZA/FAQ">FAQ</a></li> <li><a href="https://cwiki.apache.org/confluence/display/SAMZA/Apache+Samza">Wiki</a></li> <li><a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51812876">Papers & Talks</a></li> <li><a href="http://blogs.apache.org/samza">Blog</a></li> </ul> <h1><i class="fa fa-comments"></i> Community</h1> <ul> <li><a href="/community/mailing-lists.html">Mailing Lists</a></li> <li><a href="/community/irc.html">IRC</a></li> <li><a href="https://issues.apache.org/jira/browse/SAMZA">Bugs</a></li> <li><a href="https://cwiki.apache.org/confluence/display/SAMZA/Powered+By">Powered by</a></li> <li><a href="https://cwiki.apache.org/confluence/display/SAMZA/Ecosystem">Ecosystem</a></li> <li><a href="/community/committers.html">Committers</a></li> </ul> <h1><i class="fa fa-code"></i> Contribute</h1> <ul> <li><a href="/contribute/contributors-corner.html">Contributor's Corner</a></li> <li><a href="/contribute/coding-guide.html">Coding Guide</a></li> <li><a href="/contribute/design-documents.html">Design Documents</a></li> <li><a href="/contribute/code.html">Code</a></li> <li><a href="/contribute/tests.html">Tests</a></li> </ul> <h1><i class="fa fa-history"></i> Archive</h1> <ul> <li><a href="/archive/index.html#latest">latest</a></li> <li><a href="/archive/index.html#14">0.14</a></li> <li><a href="/archive/index.html#13">0.13</a></li> <li><a href="/archive/index.html#12">0.12</a></li> <li><a href="/archive/index.html#11">0.11</a></li> <li><a href="/archive/index.html#10">0.10</a></li> <li><a href="/archive/index.html#09">0.9</a></li> <li><a href="/archive/index.html#08">0.8</a></li> <li><a href="/archive/index.html#07">0.7</a></li> </ul> </div> <div class="content"> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <h2>API Overview</h2> <!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> <p>When writing a stream processor for Samza, you must implement either <a href="javadocs/org/apache/samza/task/StreamTask.html">StreamTask</a> or <a href="javadocs/org/apache/samza/task/AsyncStreamTask.html">AsyncStreamTask</a> interface. You should implement StreamTask for synchronous process, where the message processing is complete after the <em>process</em> method returns. An example of StreamTask is a computation that does not involve remote calls:</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kn">package</span> <span class="nn">com.example.samza</span><span class="o">;</span> <span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyTaskClass</span> <span class="kd">implements</span> <span class="n">StreamTask</span> <span class="o">{</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span> <span class="c1">// process message</span> <span class="o">}</span> <span class="o">}</span></code></pre></figure> <p>The AsyncSteamTask interface, on the other hand, supports asynchronous process, where the message processing may not be complete after the <em>processAsync</em> method returns. Various concurrent libraries like Java NIO, ParSeq and Akka can be used here to make asynchronous calls, and the completion is marked by invoking the <a href="javadocs/org/apache/samza/task/TaskCallback.html">TaskCallback</a>. Samza will continue to process next message or shut down the container based on the callback status. An example of AsyncStreamTask is a computation that make remote calls but don’t block on the call completion:</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kn">package</span> <span class="nn">com.example.samza</span><span class="o">;</span> <span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyAsyncTaskClass</span> <span class="kd">implements</span> <span class="n">AsyncStreamTask</span> <span class="o">{</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">processAsync</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">,</span> <span class="n">TaskCallback</span> <span class="n">callback</span><span class="o">)</span> <span class="o">{</span> <span class="c1">// process message with asynchronous calls</span> <span class="c1">// fire callback upon completion, e.g. invoking callback from asynchronous call completion thread</span> <span class="o">}</span> <span class="o">}</span></code></pre></figure> <p>When you run your job, Samza will create several instances of your class (potentially on multiple machines). These task instances process the messages in the input streams.</p> <p>In your job’s configuration you can tell Samza which streams you want to consume. An incomplete example could look like this (see the <a href="../jobs/configuration.html">configuration documentation</a> for more detail):</p> <figure class="highlight"><pre><code class="language-jproperties" data-lang="jproperties"><span></span><span class="c"># This is the class above, which Samza will instantiate when the job is run</span> <span class="na">task.class</span><span class="o">=</span><span class="s">com.example.samza.MyTaskClass</span> <span class="c"># Define a system called "kafka" (you can give it any name, and you can define</span> <span class="c"># multiple systems if you want to process messages from different sources)</span> <span class="na">systems.kafka.samza.factory</span><span class="o">=</span><span class="s">org.apache.samza.system.kafka.KafkaSystemFactory</span> <span class="c"># The job consumes a topic called "PageViewEvent" from the "kafka" system</span> <span class="na">task.inputs</span><span class="o">=</span><span class="s">kafka.PageViewEvent</span> <span class="c"># Define a serializer/deserializer called "json" which parses JSON messages</span> <span class="na">serializers.registry.json.class</span><span class="o">=</span><span class="s">org.apache.samza.serializers.JsonSerdeFactory</span> <span class="c"># Use the "json" serializer for messages in the "PageViewEvent" topic</span> <span class="na">systems.kafka.streams.PageViewEvent.samza.msg.serde</span><span class="o">=</span><span class="s">json</span></code></pre></figure> <p>For each message that Samza receives from the task’s input streams, the <em>process</em> method is called. The <a href="javadocs/org/apache/samza/system/IncomingMessageEnvelope.html">envelope</a> contains three things of importance: the message, the key, and the stream that the message came from.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="cm">/** Every message that is delivered to a StreamTask is wrapped</span> <span class="cm"> * in an IncomingMessageEnvelope, which contains metadata about</span> <span class="cm"> * the origin of the message. */</span> <span class="kd">public</span> <span class="kd">class</span> <span class="nc">IncomingMessageEnvelope</span> <span class="o">{</span> <span class="cm">/** A deserialized message. */</span> <span class="n">Object</span> <span class="nf">getMessage</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span> <span class="cm">/** A deserialized key. */</span> <span class="n">Object</span> <span class="nf">getKey</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span> <span class="cm">/** The stream and partition that this message came from. */</span> <span class="n">SystemStreamPartition</span> <span class="nf">getSystemStreamPartition</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span> <span class="o">}</span></code></pre></figure> <p>The key and value are declared as Object, and need to be cast to the correct type. If you don’t configure a <a href="../container/serialization.html">serializer/deserializer</a>, they are typically Java byte arrays. A deserializer can convert these bytes into any other type, for example the JSON deserializer mentioned above parses the byte array into java.util.Map, java.util.List and String objects.</p> <p>The <code>getSystemStreamPartition()</code> method returns a <a href="javadocs/org/apache/samza/system/SystemStreamPartition.html">SystemStreamPartition</a> object, which tells you where the message came from. It consists of three parts:</p> <ol> <li>The <em>system</em>: the name of the system from which the message came, as defined in your job configuration. You can have multiple systems for input and/or output, each with a different name.</li> <li>The <em>stream name</em>: the name of the stream (topic, queue) within the source system. This is also defined in the job configuration.</li> <li>The <a href="javadocs/org/apache/samza/Partition.html"><em>partition</em></a>: a stream is normally split into several partitions, and each partition is assigned to one StreamTask instance by Samza.</li> </ol> <p>The API looks like this:</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="cm">/** A triple of system name, stream name and partition. */</span> <span class="kd">public</span> <span class="kd">class</span> <span class="nc">SystemStreamPartition</span> <span class="kd">extends</span> <span class="n">SystemStream</span> <span class="o">{</span> <span class="cm">/** The name of the system which provides this stream. It is</span> <span class="cm"> defined in the Samza job's configuration. */</span> <span class="kd">public</span> <span class="n">String</span> <span class="nf">getSystem</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span> <span class="cm">/** The name of the stream/topic/queue within the system. */</span> <span class="kd">public</span> <span class="n">String</span> <span class="nf">getStream</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span> <span class="cm">/** The partition within the stream. */</span> <span class="kd">public</span> <span class="n">Partition</span> <span class="nf">getPartition</span><span class="o">()</span> <span class="o">{</span> <span class="o">...</span> <span class="o">}</span> <span class="o">}</span></code></pre></figure> <p>In the example job configuration above, the system name is “kafka”, the stream name is “PageViewEvent”. (The name “kafka” isn’t special — you can give your system any name you want.) If you have several input streams feeding into your StreamTask, you can use the SystemStreamPartition to determine what kind of message you’ve received.</p> <p>What about sending messages? If you take a look at the process() method in StreamTask, you’ll see that you get a <a href="javadocs/org/apache/samza/task/MessageCollector.html">MessageCollector</a>.</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="cm">/** When a task wishes to send a message, it uses this interface. */</span> <span class="kd">public</span> <span class="kd">interface</span> <span class="nc">MessageCollector</span> <span class="o">{</span> <span class="kt">void</span> <span class="nf">send</span><span class="o">(</span><span class="n">OutgoingMessageEnvelope</span> <span class="n">envelope</span><span class="o">);</span> <span class="o">}</span></code></pre></figure> <p>To send a message, you create an <a href="javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html">OutgoingMessageEnvelope</a> object and pass it to the message collector. At a minimum, the envelope specifies the message you want to send, and the system and stream name to send it to. Optionally you can specify the partitioning key and other parameters. See the <a href="javadocs/org/apache/samza/system/OutgoingMessageEnvelope.html">javadoc</a> for details.</p> <p><strong>NOTE:</strong> Please only use the MessageCollector object within the <code>process()</code> method. If you hold on to a MessageCollector instance and use it again later, your messages may not be sent correctly.</p> <p>For example, here’s a simple task that splits each input message into words, and emits each word as a separate message:</p> <figure class="highlight"><pre><code class="language-java" data-lang="java"><span></span><span class="kd">public</span> <span class="kd">class</span> <span class="nc">SplitStringIntoWords</span> <span class="kd">implements</span> <span class="n">StreamTask</span> <span class="o">{</span> <span class="c1">// Send outgoing messages to a stream called "words"</span> <span class="c1">// in the "kafka" system.</span> <span class="kd">private</span> <span class="kd">final</span> <span class="n">SystemStream</span> <span class="n">OUTPUT_STREAM</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SystemStream</span><span class="o">(</span><span class="s">"kafka"</span><span class="o">,</span> <span class="s">"words"</span><span class="o">);</span> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">process</span><span class="o">(</span><span class="n">IncomingMessageEnvelope</span> <span class="n">envelope</span><span class="o">,</span> <span class="n">MessageCollector</span> <span class="n">collector</span><span class="o">,</span> <span class="n">TaskCoordinator</span> <span class="n">coordinator</span><span class="o">)</span> <span class="o">{</span> <span class="n">String</span> <span class="n">message</span> <span class="o">=</span> <span class="o">(</span><span class="n">String</span><span class="o">)</span> <span class="n">envelope</span><span class="o">.</span><span class="na">getMessage</span><span class="o">();</span> <span class="k">for</span> <span class="o">(</span><span class="n">String</span> <span class="n">word</span> <span class="o">:</span> <span class="n">message</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">" "</span><span class="o">))</span> <span class="o">{</span> <span class="c1">// Use the word as the key, and 1 as the value.</span> <span class="c1">// A second task can add the 1's to get the word count.</span> <span class="n">collector</span><span class="o">.</span><span class="na">send</span><span class="o">(</span><span class="k">new</span> <span class="n">OutgoingMessageEnvelope</span><span class="o">(</span><span class="n">OUTPUT_STREAM</span><span class="o">,</span> <span class="n">word</span><span class="o">,</span> <span class="mi">1</span><span class="o">));</span> <span class="o">}</span> <span class="o">}</span> <span class="o">}</span></code></pre></figure> <p>For AsyncStreamTask example, follow the tutorial in <a href="../../../tutorials/latest/samza-async-user-guide.html">Samza Async API and Multithreading User Guide</a>. For more details on APIs, please refer to <a href="../jobs/configuration-table.html">Configuration</a> and <a href="javadocs">Javadocs</a>.</p> <h2 id="samzacontainer"><a href="../container/samza-container.html">SamzaContainer »</a></h2> </div> </div> </div><!-- /.wrapper-content --> </div><!-- /.wrapper --> <div class="footer"> <div class="container"> <!-- nothing for now. --> </div> </div> <script> $( document ).ready(function() { if ( $.fn.urlExists( "/learn/documentation/0.14/api/overview" ) ) { $("#switch-version-button").addClass("fa fa-history masthead-icon"); } }); /* a function to test whether the url exists or not */ (function( $ ) { $.fn.urlExists = function(url) { var http = new XMLHttpRequest(); http.open('HEAD', url, false); http.send(); return http.status != 404; }; }( jQuery )); </script> <!-- Google Analytics --> <script> (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); ga('create', 'UA-43122768-1', 'apache.org'); ga('send', 'pageview'); </script> </body> </html>