Stormin’ F#

Apache Storm is a scalable ‘stream computing’ platform that is fast gaining popularity. Hadoop and Storm can share the same cluster and the two complement each other well for different computing needs – batch for Hadoop and near-real-time for Storm.

Storm provides a macro architecture for executing ‘big data’ stream processing ‘topologies’. For example, one easily increase the parallelism of any node in the Storm topology to suit the performance requirements.

For streaming analytics, however, Storm does not offer much help out of the box. Often one has to write the needed analytic logic from scratch. Wouldn’t it be nice if one could use something like Reactive Extensions (Rx) within Storm components?

Luckily Nathan Marz – the original author of Storm – chose to enable Storm with multi-language support. While Storm itself is written in Clojure and Java, it implements a (relatively simple?) language-independent protocol that can be used with basically any mainstream language.

FsStorm is an attempt to allow F# (and .Net Rx) to be used for defining and running Storm topologies.

Storm and Rx are big topics in and of themselves. If your are interesting in leveraging FsStorm, it would be best to first understand Storm and Rx using the official and supplemental documentation (blog posts, videos, etc.). It took me a while to get a decent enough grasp of Storm to start putting together FsStorm. Be sure to view one of Nathan’s videos on Storm.

Given sufficient Storm knowledge, take a look at the sample project FsSimpleTest. It includes a topology a spout and a bolt. The topology is described with StormDSL (which IMHO makes defining topologies much simpler than in Java).

In order to run FsSimpleTest first setup a Storm cluster under Windows (for now). Download the repo and compile FsSimpleTest. To submit the topology to Storm, use the Submit.fsx script, included in the project but before you run the script make sure the ‘binDir’ variable is set correctly for your environment. Also the “jar” command – which comes with Java JDK – should be in the path (it is used to package the built components into a Jar file which is uploaded to the Storm cluster).

You should be able to see the running topology in Storm UI – the browser-based console.

While the repo does not include a sample that uses Rx (yet) I have been successfully running a 3-component Storm topology that leverages Fsharp.Control.Reactive (the Rx wrapper for F#).

FsStorm does not utilize all of the capabilities offered by Storm. In particular Distributed RPC and Transactional topologies are not supported.

This is a very early but promising start for stream computing with F#. Contributors are welcomed!

Advertisements

16 thoughts on “Stormin’ F#

  1. Hi fwaris, I was actually thinking of implementing something like this, just the other day. I had the idea of creating computation expressions for spouts/bolts. So then, to create a bolt, you would have something like `bolt { let intuple = …; return outtuple }` of type Bolt, and likewise for spouts `spout { let msg = …; return outtuple }` of type Spout. These could then be composed (like directed graphs) into topologies, with full type-safety from F#. I.e. you could compose a Spout with a Bolt, but trying to compose it with a Bolt would give you an error at run-time.
    I haven’t really thought it through all that well. Do you think that would be possible?

    • Hi Tor,

      I believe that a high-level abstraction for stream computing is actually very desirable.
      What I have now is just the low level scaffolding to get topologies up and running. The individual topology components are not connected in a type-safe way.

      Using Storm in semi-production mode, I have come to realize that many of the compute needs you tend to encounter are addressed quite well by Rx. However Rx is meant to run on a single machine whereas Storm is distributed.

      Here are some ideas for a higher level abstraction:

      a) It should be based on Rx. You should be able to see the entire flow as an Rx computation (more-or-less).

      b) It needs hooks for specifying the nodes (equivalent to Storm spouts and bolts) so you can carve up the computation into distributed blocks. Also need to attach hints for parallelism to each node (and other topology metadata).

      c) The individual nodes can just execute the Rx flows for that node but the overall flow across all nodes needs to be consistent.

      d) Storm cares about grouping fields so these need to be relatively simple data types but the rest of the data can be complex objects (maybe serialized FsPickler binary for higher performance and easier typing).

      I think F# computation expressions will work well here but – at least from my perspective – more experience is needed before I can begin to identify the common patterns for a higher level abstraction.

      I am hoping to gain that experience by running Rx-based Storm topologies, in the next few weeks / months.

      Faisal

      BTW there is an Observable.createObservableAgent implementation included that turns a mailbox processor to an observable. This is mostly how I see using Rx within a node (for now). Incoming tuples are dropped into the mailbox which then triggers the Rx code flow.

  2. Pingback: F# Weekly #4, 2015 | Sergey Tihon's Blog

      • cool! Let me know and I will add you to the Git repo.

        Also, today I successfully tested a simple topology on a 5-node SLES cluster.

    • Any ideas about running cross-language topologies? I understand this is probably against the idea of being statically-verifiable, but it would be nice not having to re-implement all the ‘contrib’ stuff.

      • Could you elaborate on Java-connected part? Say I’m trying to instantiate RabbitMQ spout, which requires me to pass an instance of a java class, I wouldn’t be able to express that in FsStormDSL, would I?

        FW.
        Eugene. look at the type “Component” in StormDSL. It is a union type and one of the subtypes is “Java” which takes a class name and a list of arguments. You will have to make sure that the referenced java class or JAR is on the Storm classpath. Note I have not tested this yet.

      • A default implementation of is given – but it has not yet been tested.

        You can try something like

        reliableSpoutRunner cfg createDefaultHousekeeper fCreateReliableEmitter

        fCreateReliableEmitter is a function that you will have to create that returns another function (say f1). The function f1 returned by fCreateReliableEmitter will be provided a reliable emitter that it should use. The reliable emitter requires an Int64 id that your f1 function should provide (or create).

        An example of a reliable spout is:

        //spout – produces reliable messages
        let reliableSpout cfg =

        //define the function that will return the emitter function
        //cfg: the configution passed in by storm
        //reliableEmit : a function that emits message to storm that requires an int64 id and json with tuple
        let fCreateReliableEmitter (cfg:Configuration) reliableEmit =

        let producer() =
        async{
        do reliableEmit 1L (jval [Storm.TUPLE,[rnd.Next(0,100)]])
        do Storm.stormSync()
        }
        producer

        //run the spout
        Storm.reliableSpoutRunner cfg Storm.createDefaultHousekeeper fCreateReliableEmitter

      • The spout is ultimately responsible for managing reliability. I don’t think you can generalize this part too much. Some queuing systems such as Kestrel provide transactional semantics for pulling off data, which the corresponding spout can take advantage off. I don’t know much about Rabbit MQ (beyond that it is based on the AMQP protocol). My suggestion would be to look at the default housekeeper and create something that works with Rabbit MQ.

      • I’m trying to demonstrate a fully functional sample implemented in FsStorm to my employer and I’ve opened a few issues for discussion around it on github. If you have an idea for more interactive forum than the blog, please let me know, otherwise my fork might start looking weird and unrecognizable in a few weeks 🙂

        On unrelated topic, how would you compare FsStorm goals to mbrace? I feel mbrace is more of a batch processing engine, like hadoop than event-processing like Storm.

        FW
        I have commented on Git against each of the issues. I have changed the email notification address on Git so that I will be able to monitor the repo better from now on.

        As for mbrace, I have not really looked at it closely but mean to do so whenever I have some time. We need a push-oriented processing system and I don’t know if mbrace can be used in that way. I have a feeling that it is pull-oriented like hadoop.

  3. Pingback: Real-time analytics with Apache Storm – now in F# | I think, therefore I spam.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s