I recently needed a specific kind of publish-subscribe primitive, where the publisher must not block, and that should not leak memory. It is published as the stm-firehose package.
It exposes an STM
interface (built upon TBMQueue
), but also higher level helpers. My use case is to be able to
tap on a message stream at will, and observe some of the messages as they flow through the infrastructure. The clients
will join and leave at any time, possibly from a low bandwidth connection. This means that they might not be able to
cope with the traffic, but this should not adversely affect the publisher or other clients.
To cover common use cases, two helpers are exposed in the Data.Conduit.Network.Firehose module :
firehoseApp
: creates a WAI application that lets firehose clients connect through a web interface.firehoseConduit
: spawns a WARP server waiting for clients to connect, and returns aConduit
that will make all messages traversing it available to clients.
Finally, a specialized version of firehoseConduit
, called firehose
, has been included in the
Data.Conduit.Firehose module of
the hslogstash
package, in the name of fireHose
. This version is specialized for querying logstash messages, and
yes, the naming of the function and module could have been better.
It might have been a better idea to just create a TCP server for the helper functions, instead of a more complex HTTP server, but this would have required writing a custom protocol for handling the client “preferences” in the kind of messages they would like to see. As it stands, the user of the helper functions can pass a filtering function that can adjust its behavior based on the client HTTP request. This is exploited in the logstash-specific helper function by letting the client specify the list of message types he would like to see in the URL.
As a side note, there have been quite a few improvement and bug fixes in language-puppet
since last time I blogged
about it. There still are a few issues to close before releasing a new version.