Turning a continuous Twitter stream into a sequence using Seq.unfold

Twitter now provides a streaming API which can be used to obtain a continuous stream of tweets on any set of topics, locations, etc., in real time. Read the full API details here.

It would be nice to convert this stream into an F# sequence so that it can be treated just as any other sequence if F#. This provides “composability”; separation of the generation of a sequence from its consumption. For example, you could do something like:

tweets "soccer"
|> Seq.filter (fun t -> t.Text.Contains("Madrid"))
|> Seq.iter (fun t -> printfn "%A" t)

This will subscribe to Twitter for all tweets on the subject ‘soccer’ and then only print the ones that contain ‘Madrid’ in the text.

Updated: Once you have a sequence, it only takes a few lines of code to calculate a metric such as “Tweets / Minute” using the Seq.windowed function:

let tweetsPerMinute tracking =
    tweets tracking
    |> Seq.map (fun t -> toDateTime t.DateStr)
    |> Seq.windowed 3
    |> Seq.map (fun ts -> (Seq.max ts) - (Seq.min ts), ts.Length)
    |> Seq.map (fun (interval,count) -> (float)count / interval.TotalMinutes)

You can download the full code from the F# Snippets site: http://fssnip.net/8m

Below are some of the code highlights. This is a good example of using Seq.unfold to create a sequence from the underlying HTTP Twitter stream.

[...]
[<DataContract>]
type TweetUser = {
    [<field:DataMember(Name="followers_count")>] Followers:int
    [<field:DataMember(Name="screen_name")>] Name:string
    [<field:DataMember(Name="id_str")>] Id:int
    [<field:DataMember(Name="location")>] Location:string}

[<DataContract>]
type Tweet = {
     [<field:DataMember(Name="text")>] Text:string
     [<field:DataMember(Name="retweeted")>] IsRetweeted:bool
     [<field:DataMember(Name="created_at")>] DateStr:string
     [<field:DataMember(Name="user")>] User:TweetUser
     [<field:DataMember(Name="geo")>] Geo:string}

[...]
let tweets(tracking) =
    let template = sprintf "https://stream.twitter.com/1/statuses/filter.json?track=%s"
    let http = WebRequest.Create(template tracking) : ?> HttpWebRequest
    http.Credentials <- NetworkCredential(<twitter id>, <twitter pwd>)
    [...]
    let readStream = new StreamReader(str, Encoding.UTF8)
    readStream |> Seq.unfold(fun rs ->
            try
                let line = rs.ReadLine() 
                if line.StartsWith("{\"text")  then 
                    let tweet =
                        try
                            line
                            |> Encoding.UTF8.GetBytes
                            |> toStream
                            |> dser.ReadObject :?> Tweet
                            |> Some
                        with
                        | ex -> 
                            printfn "Error %A" ex.Message
                            None
                    printfn "%A" tweet.Value
                    Some(tweet, rs)
                else
                    Some(None, rs)
            with
            | ex -> 
                printfn "%s" ex.Message
                None)
    |> Seq.choose (fun t -> t)

The tweets are formatted as json so we use the json serializer and DataContracts from WCF to deserialize the tweets into F# record(s).

The “tweets (tracking)” function calls twitter’s streaming API. The “tracking” parameter is just a string that is the topic(s) of interest (see API details) for the subscription. This function returns an endless sequence which can be terminated with the “stopAll()” function.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s