No frills data pipeline with Go

There are a plethora of data processing frameworks both maintained by open-source communities and corporations, nowadays. Just because of the abundance of such frameworks, engineers sometimes get carried away and pick up a big data processing framework to process little data, and while I myself am guilty of having done that in the past (in my defence, that use case was one where we had big money to deliver a top-k service, so we went ahead and used Google’s Dataflow framework a.k.a. Apache Beam), I recently had a chance for redemption and took it with both hands and a bunch of Go channels.

Go diverges from some of the other common programming languages by advocating and providing semantics for building multi-threaded applications by sharing data through communication channels (which – coincidentally – in Go are called channels) instead of communicating by passing around shared data. The big advantage of this approach is that it helps to prevent common pitfalls of multi-threaded applications such as race conditions (since channels can be used to coordinate the control flow) and deadlocks (since data communicated through channels is only accessible by a single goroutine). But, a much more visible advantage is the procedural semantics of channels, which increases code readability (a somewhat similar analogy would be how async functions turn callback hells and promise chains into simple procedural code in JS world, although, the concern in JS is only asynchronous operations and not concurrent ones).

Coming back to the day of redemption, I decided to give Go’s channels a shot as a data pipeline solution (with concurrency playing a second fiddle to it). The use case at hand was to build an event compaction service i.e. a service that would receive a large number of events with many duplicates in a short amount of time and would need to reduce them to distinct events in configurable time buckets and enrich those distinct events before pushing them out.

So, I decided on a simple architecture consisting of the following interfaces:

Orchestrator: an interface that manages the pipeline (i.e. starts, stops & restarts the pipeline) and coordinates its stages.

type Orchestrator interface {
     StartPipeline() error
     StopPipeline() error
     RestartPipeline() error
 }

Pipeline: an interface that groups all the stages of a pipeline.

type Pipeline interface {
    Source
    Reducer
    Enricher
    Sink
}

Source: an interface that provides a readable data stream.

type Source interface {
    Stream() error
}

Reducer: an interface that takes a data stream, removes duplicates and outputs batches of data.

type Reducer interface {
    Reduce(interface{}) error
}

Enricher: an interface that takes batches of data, enriches them with data (possibly from external sources) and outputs batches of data.

type Enricher interface {
    Enrich([]interface{}) error
}

Sink: an interface that takes batches of data and sends them off to external endpoints.

type Sink interface {
    Publish([]interface{}) error
}

After having a contract in place, I set out to implement the pipeline interface which was at the heart of the operations. Each stage of the pipeline would accept data and write its output to a channel, with these channels acting as queues to maintain the order of events.

Next, I implemented the brains of the pipeline i.e. the orchestrator interface. All stages were glued together by creating a waterfall of channels, where the output of each channel was fed to a stage, thereby, establishing a coordination of these stages:

go func() {
    for {
        select {
        case originalEvent := <-orc.pl.SourceChannel:
            go func() {
                if err := orc.pl.Reduce(originalEvent); err != nil {
                    orc.l.WithError(err).Error("event reduction failed")
                    orc.ErrorChan <- err
                }
            }()
            orc.l.Debug("started reduction of events")

        case reducedEvents := <-orc.pl.ReducerChannel:
            go func() {
                if err := orc.pl.Enrich(reducedEvents); err != nil {
                    orc.l.WithError(err).Error("events enrichment failed")
                    orc.ErrorChan <- err
                }
            }()
            orc.l.Debug("started enrichment of events batches")

        case enrichedEvents := <-orc.pl.EnricherChannel:
            go func() {
                if err := orc.pl.Publish(enrichedEvents); err != nil {
                    orc.l.WithError(err).Error("events publishing failed")
                    orc.ErrorChan <- err
                }
            }()
            orc.l.Debug("started publishing of events batches")

        case <-orc.pl.ExitSourceSignal:
            orc.l.Info("pipeline halted")
            return
        }
    }
}()

I deployed the first iteration of this service and was happy to see it work flawlessly when suddenly errors popped up originating from external APIs that the enricher was using, the pipeline was able to automatically recover from these errors as the orchestrator was ensuring the availability of pipeline by restarting it on failures, but in-flight events were lost, as the pipeline did not have a persistent storage layer (which was a conscious decision as historical events were useless in our context).

An easy fix would’ve been to add a persistent storage layer, maybe, only for current batch of events in the reducer, but that would’ve defeated the design ideology of this service i.e. a self-contained data pipeline with no additional dependencies, and creating a custom storage layer for this pipeline was pointless because then I would be just reinventing the wheel and birthing yet another data processing framework.

So, considering our constraints i.e. any events older than x duration become useless for downstream systems, I added a retry mechanism to the pipeline, wherein the pipeline would retry any failed stage with an exponential backoff until the events become older than x duration, at which point the pipeline simply gives up on those events.

go func() {
    for {
        select {
        case originalEvent := <-orc.pl.SourceChannel:
            go func() {
                if err := orc.retry(func() error { return orc.pl.Reduce(originalEvent) }); err != nil {
                    orc.l.WithError(err).Error("event reduction failed")
                    orc.ErrorChan <- err
                }
            }()
            orc.l.Debug("started reduction of events")

        case reducedEvents := <-orc.pl.ReducerChannel:
            go func() {
                if err := orc.retry(func() error { return orc.pl.Enrich(reducedEvents) }); err != nil {
                    orc.l.WithError(err).Error("events enrichment failed")
                    orc.ErrorChan <- err
                }
            }()
            orc.l.Debug("started enrichment of events batches")

        case enrichedEvents := <-orc.pl.EnricherChannel:
            go func() {
                if err := orc.retry(func() error { return orc.pl.Publish(enrichedEvents) }); err != nil {
                    orc.l.WithError(err).Error("events publishing failed")
                    orc.ErrorChan <- err
                }
            }()
            orc.l.Debug("started publishing of events batches")

        case <-orc.pl.ExitSourceSignal:
            orc.l.Info("pipeline halted")
            return
        }
    }
}()

I cautiously deployed the revised version of the service fully prepared for any other complications, but was met with none, thereby, enforcing my initial assumption that sometimes a naive solution to a rather complex problem can actually perform better than a feature rich solution.

Leave a Comment