Tech/Engineering

Concurrent and Efficient Pipelines using Golang Channels

October 1, 2020 Nishith Chitaliya, Staff Software Engineer

Druva manages a colossal amount of data under its data protection hood and utilizes a multiple stage structure from which data flows through different systems, with a variety of architectures.

One of such systems has a pipeline architecture which runs parallely and handles loads of data, however, sometimes just parallel running pipelines is not efficient. Pipelines can become inefficient due to various factors such as scaling due to resource constraints, wait time introduced by way of differential execution time of components, or a number of other reasons like communication by shared memory intensive objects. To tackle this problem we started building concurrent programs rather than parallel ones.

What is the Pipeline?

A pipeline is a series of independent components or stages connected via some connectors which complete a specific task so that computations are performed in a stream-like fashion. These connectors can be anything from pipes to message queues to channels or even shared memory. The component obtains data from the inward connector, performs some set of operations, and emits the data on the outward connector which is then operated by the next component in line.

blog-image1

Usually, a single pipeline execution is handled in a single thread and we can spawn multiple pipelines in multiple threads simultaneously to handle a heavier load. However, this won’t represent an efficient pipeline if each stage has accompanying differential resources and execution time. Considering a single thread, among multiple, executing threads; its stage 1, stage 2, and stage 3 would be idle until stage 4 is completed. So at any given point in time only one of the four stages of a thread is active and others are idle, bringing the overall efficiency down.

blog-image2

What if we could have multiple instances of each stage running independently and simultaneously interacting with each other?

blog-image3

One advantage to this approach is the ability to individually scale, configure, and execute stages. For example, if stage 3 is more compute-intensive, then we can have more instances of stage 3 as compared to other stages and can configure the complete pipeline according to our needs.

As Go provides native supports for concurrency, we have implemented the above-mentioned architecture using Go channels and goroutines. Before we proceed ahead, we need to understand the difference between concurrency and parallelism, Channels in Go, as well as Goroutines.

Concurrency vs Parallelism: Concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in partial order, without affecting the final outcome. Parallelism refers to techniques to make programs faster by performing several computations at the same time, which requires hardware with multiple processing units. Concurrency means an application is dealing with multiple tasks simultaneously whereas Parallelism means the application is executing multiple tasks simultaneously. When this is happening in multiple CPU environments, we could not observe much difference between concurrent and parallel execution, however could see striking differences when running in a single CPU environment. Concurrency gives us the edge over parallelism when we are running in a single core environment and having limited resources, as concurrency gives us the impression of running multiple tasks parallely.  Concurrency is not parallelism but it enables parallelism. If you have only one processor, your program can still be concurrent but it cannot be parallel.

blog-image4

blog-image5

1 CPU parallel execution may cause starvation and task 2 might never be scheduled or might not be scheduled for a long time. `Concurrency is not parallelism` is an excellent talk by Rob Pike, one of the designers of Go.

Channel: Channel is a communication pipe between concurrently executing functions or goroutines. Similar to how water flows from one end to other in a pipe, data can be sent from one goroutine and received by another goroutine using channels.

Goroutines: Goroutines are functions that run concurrently with other functions in the same address space. They can be thought of as lightweight threads, but have some distinct advantages over threads. Goroutines are extremely cheap in terms of memory consumption as compared to threads as its stack size can change dynamically according to needs. Goroutines are mapped to a fewer number of OS threads as compared to the number of goroutines. There might be only one OS thread in a program with thousands of goroutines. If any goroutines in a thread get blocked, say waiting for IO, then a new OS thread is created and the remaining threads are moved to a new thread. This gives us a clean API to work with concurrency, abstracting us from details.

“Do not communicate by sharing memory; instead, share memory by communicating.” – Effective Go

Traditional threading models require the programmer to communicate between threads using shared memory which are protected by locks and force the user to wait untill they get access to the shared memory, which sometimes may cause race conditions as well if not protected properly. Go on the other hand,  have a different approach in which shared values are passed around on channels and, in fact, never actively shared by separate threads of execution. Only one goroutine has access to the value at any given time. By design, data races cannot occur.

We build our pipelines using goroutines and channels. Each stage is executed as multiple goroutines and channels acting as communication medium for data between stages.

blog-image6

But still this is not efficient because:

  1. We need some information to return from the later stage to the former stage which cannot be done right now.
  2. By default the capacity of the channel is set to 0, if not specified. In this case, the send and receive in a channel are blocking calls, which means, when data is sent to a channel, the control is blocked in the send goroutine until some other goroutine reads from that channel. Similarly when data is read from a channel, the read is blocked until some goroutine writes data to that channel. Blocking breaks the concurrency which can prevent scaling and can lead to deadlocks.

We could solve this problem by using bidirectional channels; but to avoid complexity, we used two unidirectional buffered channels, one for sending requests to the next stage and another for accepting responses from the next stage. Buffered channels enabled us by unblocking channels and blocking only when the channel is completely full.

blog-image7

Sample Pipeline

For this example, we will create a very simplistic pipeline based on the above concept, whose objective will be to calculate the value of 3×2 for every x. We will have 3 stages; Stage1 will accept numbers from the main goroutine and convert them to Data and send it to Stage2 via channel along with accepting results from stage2. Stage2 will accept the Data and will calculate the square of the number and send it to Stage3 and also accept the result from stage3. Stage 3 will accept a squared number from Stage2 and multiply it with 3 and send the result downstream.

  • Stage1 – x
  • Stage2 – x2
  • Stage3 – 3×2

We run each stage as goroutine and a goroutine to generate numbers to operate on.

blog-code1

blog-code2

blog-code3

blog-code4

Output:

blog-code5

blog-code6

blog-code7

We created single goroutines for each stage which are executing concurrently but we scale up by creating more goroutines for each stage.

Conclusion

Go’s concurrency pattern makes it easy to construct efficient streaming data pipelines that make efficient use of I/O and multiple CPUs. As goroutines are lightweight when compared to threads and have dynamic stack size, there is an advantage conferred with running multiple concurrent stages. Channel provides an efficient way of communicating between multiple goroutines but it is important to be aware of its limitations; if not designed properly they can cause deadlock. It’s also possible for channels to pass around copies so passing around large objects should be avoided. Collectively, Channels, Goroutines and Concurrent design gives us ability to efficiently configure, scale and run the pipelines.

If you would like to learn more about how Druva uses the latest tech in its development efforts, read more here.

topbar-DxP-logo-navyJoin us at DxP, the industry’s first cloud data protection summit, Nov 17 | Free registration! sticky-promo-icon-carrot Nov 17 | Free registration! sticky-promo-icon-carrot topbar-DxP-logo-navy