ProductPromotion
Logo

Go.Lang

made by https://0x3d.site

GitHub - robinjoseph08/redisqueue: redisqueue provides a producer and consumer of a queue that uses Redis streams
redisqueue provides a producer and consumer of a queue that uses Redis streams - robinjoseph08/redisqueue
Visit Site

GitHub - robinjoseph08/redisqueue: redisqueue provides a producer and consumer of a queue that uses Redis streams

GitHub - robinjoseph08/redisqueue: redisqueue provides a producer and consumer of a queue that uses Redis streams

redisqueue

Version GoDoc Build Status Coverage Status Go Report Card License

redisqueue provides a producer and consumer of a queue that uses Redis streams.

Features

  • A Producer struct to make enqueuing messages easy.
  • A Consumer struct to make processing messages concurrenly.
  • Claiming and acknowledging messages if there's no error, so that if a consumer dies while processing, the message it was working on isn't lost. This guarantees at least once delivery.
  • A "visibility timeout" so that if a message isn't processed in a designated time frame, it will be be processed by another consumer.
  • A max length on the stream so that it doesn't store the messages indefinitely and run out of memory.
  • Graceful handling of Unix signals (SIGINT and SIGTERM) to let in-flight messages complete.
  • A channel that will surface any errors so you can handle them centrally.
  • Graceful handling of panics to avoid crashing the whole process.
  • A concurrency setting to control how many goroutines are spawned to process messages.
  • A batch size setting to limit the total messages in flight.
  • Support for multiple streams.

Installation

redisqueue requires a Go version with Modules support and uses import versioning. So please make sure to initialize a Go module before installing redisqueue:

go mod init github.com/my/repo
go get github.com/robinjoseph08/redisqueue/v2

Import:

import "github.com/robinjoseph08/redisqueue/v2"

Example

Here's an example of a producer that inserts 1000 messages into a queue:

package main

import (
	"fmt"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	p, err := redisqueue.NewProducerWithOptions(&redisqueue.ProducerOptions{
		StreamMaxLength:      10000,
		ApproximateMaxLength: true,
	})
	if err != nil {
		panic(err)
	}

	for i := 0; i < 1000; i++ {
		err := p.Enqueue(&redisqueue.Message{
			Stream: "redisqueue:test",
			Values: map[string]interface{}{
				"index": i,
			},
		})
		if err != nil {
			panic(err)
		}

		if i%100 == 0 {
			fmt.Printf("enqueued %d\n", i)
		}
	}
}

And here's an example of a consumer that reads the messages off of that queue:

package main

import (
	"fmt"
	"time"

	"github.com/robinjoseph08/redisqueue/v2"
)

func main() {
	c, err := redisqueue.NewConsumerWithOptions(&redisqueue.ConsumerOptions{
		VisibilityTimeout: 60 * time.Second,
		BlockingTimeout:   5 * time.Second,
		ReclaimInterval:   1 * time.Second,
		BufferSize:        100,
		Concurrency:       10,
	})
	if err != nil {
		panic(err)
	}

	c.Register("redisqueue:test", process)

	go func() {
		for err := range c.Errors {
			// handle errors accordingly
			fmt.Printf("err: %+v\n", err)
		}
	}()

	fmt.Println("starting")
	c.Run()
	fmt.Println("stopped")
}

func process(msg *redisqueue.Message) error {
	fmt.Printf("processing message: %v\n", msg.Values["index"])
	return nil
}

Articles
to learn more about the golang concepts.

Resources
which are currently available to browse on.

mail [email protected] to add your project or resources here ๐Ÿ”ฅ.

FAQ's
to know more about the topic.

mail [email protected] to add your project or resources here ๐Ÿ”ฅ.

Queries
or most google FAQ's about GoLang.

mail [email protected] to add more queries here ๐Ÿ”.

More Sites
to check out once you're finished browsing here.

0x3d
https://www.0x3d.site/
0x3d is designed for aggregating information.
NodeJS
https://nodejs.0x3d.site/
NodeJS Online Directory
Cross Platform
https://cross-platform.0x3d.site/
Cross Platform Online Directory
Open Source
https://open-source.0x3d.site/
Open Source Online Directory
Analytics
https://analytics.0x3d.site/
Analytics Online Directory
JavaScript
https://javascript.0x3d.site/
JavaScript Online Directory
GoLang
https://golang.0x3d.site/
GoLang Online Directory
Python
https://python.0x3d.site/
Python Online Directory
Swift
https://swift.0x3d.site/
Swift Online Directory
Rust
https://rust.0x3d.site/
Rust Online Directory
Scala
https://scala.0x3d.site/
Scala Online Directory
Ruby
https://ruby.0x3d.site/
Ruby Online Directory
Clojure
https://clojure.0x3d.site/
Clojure Online Directory
Elixir
https://elixir.0x3d.site/
Elixir Online Directory
Elm
https://elm.0x3d.site/
Elm Online Directory
Lua
https://lua.0x3d.site/
Lua Online Directory
C Programming
https://c-programming.0x3d.site/
C Programming Online Directory
C++ Programming
https://cpp-programming.0x3d.site/
C++ Programming Online Directory
R Programming
https://r-programming.0x3d.site/
R Programming Online Directory
Perl
https://perl.0x3d.site/
Perl Online Directory
Java
https://java.0x3d.site/
Java Online Directory
Kotlin
https://kotlin.0x3d.site/
Kotlin Online Directory
PHP
https://php.0x3d.site/
PHP Online Directory
React JS
https://react.0x3d.site/
React JS Online Directory
Angular
https://angular.0x3d.site/
Angular JS Online Directory