multireader/multireader.go
2025-07-09 00:45:56 +04:00

81 lines
1.6 KiB
Go

// Concurrent multireader example.
// Example shows passing ownership of stream between goroutines.
package main
import (
"fmt"
"io"
"os"
"sync"
"time"
)
// tickReader returns io.Reader out of semaphore, time.Ticker and ticker's tag.
func tickReader(sem chan struct{}, ticker *time.Ticker, s string) io.Reader {
r, w := io.Pipe()
go func() {
for {
sem <- struct{}{} // acquire a ticket, possibly waiting for it
select {
case t := <-ticker.C:
fmt.Fprintf(w, "%s with %v\n", s, t)
default: // try to comment this and then run :)
}
<-sem // release it
}
}()
return r
}
type tick struct {
duration, tag string
}
// tickToReader returns io.Reader out of semaphore with tick pair.
// Errors if time.ParseDuration fails on parse of tick.duration.
func tickToReader(semaphore chan struct{}, v tick) (io.Reader, error) {
d, err := time.ParseDuration(v.duration)
if err != nil {
return nil, err
}
return tickReader(semaphore, time.NewTicker(d), v.tag), nil
}
func main() {
var (
wg sync.WaitGroup
semaphore = make(
chan struct{},
1, // only one goroutine can do its thing
)
)
var tickers = []tick{
{"24ms", "ticker 2"},
{"50ms", "ticker 1"},
{"300ms", "ticker 3"},
}
readers := make([]io.Reader, 0)
for _, v := range tickers {
r, err := tickToReader(semaphore, v)
if err != nil {
panic(err)
}
readers = append(readers, r)
}
for _, r := range readers {
wg.Add(1)
go func(r io.Reader) {
defer wg.Done()
_, err := io.Copy(os.Stdout, r)
if err != nil {
panic(err)
}
}(r)
}
wg.Wait() // to avoid exit of main goroutine
}