ProductPromotion
Logo

Go.Lang

made by https://0x3d.site

GitHub - sourcegraph/conc: Better structured concurrency for go
Better structured concurrency for go. Contribute to sourcegraph/conc development by creating an account on GitHub.
Visit Site

GitHub - sourcegraph/conc: Better structured concurrency for go

GitHub - sourcegraph/conc: Better structured concurrency for go

conch

conc: better structured concurrency for go

Go Reference Sourcegraph Go Report Card codecov Discord

conc is your toolbelt for structured concurrency in go, making common tasks easier and safer.

go get github.com/sourcegraph/conc

At a glance

All pools are created with pool.New() or pool.NewWithResults[T](), then configured with methods:

Goals

The main goals of the package are:

  1. Make it harder to leak goroutines
  2. Handle panics gracefully
  3. Make concurrent code easier to read

Goal #1: Make it harder to leak goroutines

A common pain point when working with goroutines is cleaning them up. It's really easy to fire off a go statement and fail to properly wait for it to complete.

conc takes the opinionated stance that all concurrency should be scoped. That is, goroutines should have an owner and that owner should always ensure that its owned goroutines exit properly.

In conc, the owner of a goroutine is always a conc.WaitGroup. Goroutines are spawned in a WaitGroup with (*WaitGroup).Go(), and (*WaitGroup).Wait() should always be called before the WaitGroup goes out of scope.

In some cases, you might want a spawned goroutine to outlast the scope of the caller. In that case, you could pass a WaitGroup into the spawning function.

func main() {
    var wg conc.WaitGroup
    defer wg.Wait()

    startTheThing(&wg)
}

func startTheThing(wg *conc.WaitGroup) {
    wg.Go(func() { ... })
}

For some more discussion on why scoped concurrency is nice, check out this blog post.

Goal #2: Handle panics gracefully

A frequent problem with goroutines in long-running applications is handling panics. A goroutine spawned without a panic handler will crash the whole process on panic. This is usually undesirable.

However, if you do add a panic handler to a goroutine, what do you do with the panic once you catch it? Some options:

  1. Ignore it
  2. Log it
  3. Turn it into an error and return that to the goroutine spawner
  4. Propagate the panic to the goroutine spawner

Ignoring panics is a bad idea since panics usually mean there is actually something wrong and someone should fix it.

Just logging panics isn't great either because then there is no indication to the spawner that something bad happened, and it might just continue on as normal even though your program is in a really bad state.

Both (3) and (4) are reasonable options, but both require the goroutine to have an owner that can actually receive the message that something went wrong. This is generally not true with a goroutine spawned with go, but in the conc package, all goroutines have an owner that must collect the spawned goroutine. In the conc package, any call to Wait() will panic if any of the spawned goroutines panicked. Additionally, it decorates the panic value with a stacktrace from the child goroutine so that you don't lose information about what caused the panic.

Doing this all correctly every time you spawn something with go is not trivial and it requires a lot of boilerplate that makes the important parts of the code more difficult to read, so conc does this for you.

type caughtPanicError struct {
    val   any
    stack []byte
}

func (e *caughtPanicError) Error() string {
    return fmt.Sprintf(
        "panic: %q\n%s",
        e.val,
        string(e.stack)
    )
}

func main() {
    done := make(chan error)
    go func() {
        defer func() {
            if v := recover(); v != nil {
                done <- &caughtPanicError{
                    val: v,
                    stack: debug.Stack()
                }
            } else {
                done <- nil
            }
        }()
        doSomethingThatMightPanic()
    }()
    err := <-done
    if err != nil {
        panic(err)
    }
}
func main() {
    var wg conc.WaitGroup
    wg.Go(doSomethingThatMightPanic)
    // panics with a nice stacktrace
    wg.Wait()
}

Goal #3: Make concurrent code easier to read

Doing concurrency correctly is difficult. Doing it in a way that doesn't obfuscate what the code is actually doing is more difficult. The conc package attempts to make common operations easier by abstracting as much boilerplate complexity as possible.

Want to run a set of concurrent tasks with a bounded set of goroutines? Use pool.New(). Want to process an ordered stream of results concurrently, but still maintain order? Try stream.New(). What about a concurrent map over a slice? Take a peek at iter.Map().

Browse some examples below for some comparisons with doing these by hand.

Examples

Each of these examples forgoes propagating panics for simplicity. To see what kind of complexity that would add, check out the "Goal #2" header above.

Spawn a set of goroutines and waiting for them to finish:

func main() {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            // crashes on panic!
            doSomething()
        }()
    }
    wg.Wait()
}
func main() {
    var wg conc.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Go(doSomething)
    }
    wg.Wait()
}

Process each element of a stream in a static pool of goroutines:

func process(stream chan int) {
    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for elem := range stream {
                handle(elem)
            }
        }()
    }
    wg.Wait()
}
func process(stream chan int) {
    p := pool.New().WithMaxGoroutines(10)
    for elem := range stream {
        elem := elem
        p.Go(func() {
            handle(elem)
        })
    }
    p.Wait()
}

Process each element of a slice in a static pool of goroutines:

func process(values []int) {
    feeder := make(chan int, 8)

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for elem := range feeder {
                handle(elem)
            }
        }()
    }

    for _, value := range values {
        feeder <- value
    }
    close(feeder)
    wg.Wait()
}
func process(values []int) {
    iter.ForEach(values, handle)
}

Concurrently map a slice:

func concMap(
    input []int,
    f func(int) int,
) []int {
    res := make([]int, len(input))
    var idx atomic.Int64

    var wg sync.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()

            for {
                i := int(idx.Add(1) - 1)
                if i >= len(input) {
                    return
                }

                res[i] = f(input[i])
            }
        }()
    }
    wg.Wait()
    return res
}
func concMap(
    input []int,
    f func(*int) int,
) []int {
    return iter.Map(input, f)
}

Process an ordered stream concurrently:

func mapStream(
    in chan int,
    out chan int,
    f func(int) int,
) {
    tasks := make(chan func())
    taskResults := make(chan chan int)

    // Worker goroutines
    var workerWg sync.WaitGroup
    for i := 0; i < 10; i++ {
        workerWg.Add(1)
        go func() {
            defer workerWg.Done()
            for task := range tasks {
                task()
            }
        }()
    }

    // Ordered reader goroutines
    var readerWg sync.WaitGroup
    readerWg.Add(1)
    go func() {
        defer readerWg.Done()
        for result := range taskResults {
            item := <-result
            out <- item
        }
    }()

    // Feed the workers with tasks
    for elem := range in {
        resultCh := make(chan int, 1)
        taskResults <- resultCh
        tasks <- func() {
            resultCh <- f(elem)
        }
    }

    // We've exhausted input.
    // Wait for everything to finish
    close(tasks)
    workerWg.Wait()
    close(taskResults)
    readerWg.Wait()
}
func mapStream(
    in chan int,
    out chan int,
    f func(int) int,
) {
    s := stream.New().WithMaxGoroutines(10)
    for elem := range in {
        elem := elem
        s.Go(func() stream.Callback {
            res := f(elem)
            return func() { out <- res }
        })
    }
    s.Wait()
}

Status

This package is currently pre-1.0. There are likely to be minor breaking changes before a 1.0 release as we stabilize the APIs and tweak defaults. Please open an issue if you have questions, concerns, or requests that you'd like addressed before the 1.0 release. Currently, a 1.0 is targeted for March 2023.

Articles
to learn more about the golang concepts.

Resources
which are currently available to browse on.

mail [email protected] to add your project or resources here ๐Ÿ”ฅ.

FAQ's
to know more about the topic.

mail [email protected] to add your project or resources here ๐Ÿ”ฅ.

Queries
or most google FAQ's about GoLang.

mail [email protected] to add more queries here ๐Ÿ”.

More Sites
to check out once you're finished browsing here.

0x3d
https://www.0x3d.site/
0x3d is designed for aggregating information.
NodeJS
https://nodejs.0x3d.site/
NodeJS Online Directory
Cross Platform
https://cross-platform.0x3d.site/
Cross Platform Online Directory
Open Source
https://open-source.0x3d.site/
Open Source Online Directory
Analytics
https://analytics.0x3d.site/
Analytics Online Directory
JavaScript
https://javascript.0x3d.site/
JavaScript Online Directory
GoLang
https://golang.0x3d.site/
GoLang Online Directory
Python
https://python.0x3d.site/
Python Online Directory
Swift
https://swift.0x3d.site/
Swift Online Directory
Rust
https://rust.0x3d.site/
Rust Online Directory
Scala
https://scala.0x3d.site/
Scala Online Directory
Ruby
https://ruby.0x3d.site/
Ruby Online Directory
Clojure
https://clojure.0x3d.site/
Clojure Online Directory
Elixir
https://elixir.0x3d.site/
Elixir Online Directory
Elm
https://elm.0x3d.site/
Elm Online Directory
Lua
https://lua.0x3d.site/
Lua Online Directory
C Programming
https://c-programming.0x3d.site/
C Programming Online Directory
C++ Programming
https://cpp-programming.0x3d.site/
C++ Programming Online Directory
R Programming
https://r-programming.0x3d.site/
R Programming Online Directory
Perl
https://perl.0x3d.site/
Perl Online Directory
Java
https://java.0x3d.site/
Java Online Directory
Kotlin
https://kotlin.0x3d.site/
Kotlin Online Directory
PHP
https://php.0x3d.site/
PHP Online Directory
React JS
https://react.0x3d.site/
React JS Online Directory
Angular
https://angular.0x3d.site/
Angular JS Online Directory