// Concurrent multireader example. // Example shows passing ownership of stream between goroutines. package main import ( "fmt" "io" "os" "sync" "time" ) // tickReader returns io.Reader out of semaphore, time.Ticker and ticker's tag. func tickReader(sem chan struct{}, ticker *time.Ticker, s string) io.Reader { r, w := io.Pipe() go func() { for { sem <- struct{}{} select { case t := <-ticker.C: fmt.Fprintf(w, "%s with %v\n", s, t) default: // try to comment this and then run :) } <-sem } }() return r } type tick struct { duration, tag string } // tickToReader returns io.Reader out of semaphore with tick pair. // Errors if time.ParseDuration fails on parse of tick.duration. func tickToReader(semaphore chan struct{}, v tick) (io.Reader, error) { d, err := time.ParseDuration(v.duration) if err != nil { return nil, err } return tickReader(semaphore, time.NewTicker(d), v.tag), nil } func main() { var ( wg sync.WaitGroup semaphore = make(chan struct{}, 1) ) var tickers = []tick{ {"24ms", "ticker 2"}, {"50ms", "ticker 1"}, {"300ms", "ticker 3"}, } readers := make([]io.Reader, 0) for _, v := range tickers { r, err := tickToReader(semaphore, v) if err != nil { panic(err) } readers = append(readers, r) } for _, r := range readers { wg.Add(1) go func(r io.Reader) { defer wg.Done() _, err := io.Copy(os.Stdout, r) if err != nil { panic(err) } }(r) } wg.Wait() // to avoid exit of main goroutine }