From bc90eca19f5d27c16c19bb732a10a95e15419858 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 14 Feb 2018 18:18:12 -0500 Subject: [PATCH 1/8] add the remote-exec retry function to communicator Every provisioner that uses communicator implements its own retryFunc. Take the remote-exec implementation (since it's the most complete) and put it in the communicator package for each provisioner to use. Add a public interface `communicator.Fatal`, which can wrap an error to indicate a fatal error that should not be retried. --- communicator/communicator.go | 93 +++++++++++++++++++++++++++++++ communicator/communicator_test.go | 68 ++++++++++++++++++++++ 2 files changed, 161 insertions(+) diff --git a/communicator/communicator.go b/communicator/communicator.go index 5fa2631a4..3749a9f98 100644 --- a/communicator/communicator.go +++ b/communicator/communicator.go @@ -1,8 +1,11 @@ package communicator import ( + "context" "fmt" "io" + "log" + "sync/atomic" "time" "github.com/hashicorp/terraform/communicator/remote" @@ -51,3 +54,93 @@ func New(s *terraform.InstanceState) (Communicator, error) { return nil, fmt.Errorf("connection type '%s' not supported", connType) } } + +// maxBackoffDealy is the maximum delay between retry attempts +var maxBackoffDelay = 10 * time.Second +var initialBackoffDelay = time.Second + +type Fatal interface { + FatalError() error +} + +func Retry(ctx context.Context, f func() error) error { + // container for atomic error value + type errWrap struct { + E error + } + + // Try the function in a goroutine + var errVal atomic.Value + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + + delay := time.Duration(0) + for { + // If our context ended, we want to exit right away. + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } + + // Try the function call + err := f() + + // return if we have no error, or a FatalError + done := false + switch e := err.(type) { + case nil: + done = true + case Fatal: + err = e.FatalError() + done = true + } + + errVal.Store(&errWrap{err}) + + if done { + return + } + + log.Printf("[WARN] retryable error: %v", err) + + delay *= 2 + + if delay == 0 { + delay = initialBackoffDelay + } + + if delay > maxBackoffDelay { + delay = maxBackoffDelay + } + + log.Printf("[INFO] sleeping for %s", delay) + } + }() + + // Wait for completion + select { + case <-ctx.Done(): + case <-doneCh: + } + + var lastErr error + // Check if we got an error executing + if ev, ok := errVal.Load().(errWrap); ok { + lastErr = ev.E + } + + // Check if we have a context error to check if we're interrupted or timeout + switch ctx.Err() { + case context.Canceled: + return fmt.Errorf("interrupted - last error: %v", lastErr) + case context.DeadlineExceeded: + return fmt.Errorf("timeout - last error: %v", lastErr) + } + + if lastErr != nil { + return lastErr + } + return nil +} diff --git a/communicator/communicator_test.go b/communicator/communicator_test.go index 33a91cd6f..659222421 100644 --- a/communicator/communicator_test.go +++ b/communicator/communicator_test.go @@ -1,7 +1,12 @@ package communicator import ( + "context" + "errors" + "io" + "net" "testing" + "time" "github.com/hashicorp/terraform/terraform" ) @@ -28,3 +33,66 @@ func TestCommunicator_new(t *testing.T) { t.Fatalf("err: %v", err) } } +func TestRetryFunc(t *testing.T) { + origMax := maxBackoffDelay + maxBackoffDelay = time.Second + origStart := initialBackoffDelay + initialBackoffDelay = 10 * time.Millisecond + + defer func() { + maxBackoffDelay = origMax + initialBackoffDelay = origStart + }() + + // succeed on the third try + errs := []error{io.EOF, &net.OpError{Err: errors.New("ERROR")}, nil} + count := 0 + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err := Retry(ctx, func() error { + if count >= len(errs) { + return errors.New("failed to stop after nil error") + } + + err := errs[count] + count++ + + return err + }) + + if count != 3 { + t.Fatal("retry func should have been called 3 times") + } + + if err != nil { + t.Fatal(err) + } +} + +func TestRetryFuncBackoff(t *testing.T) { + origMax := maxBackoffDelay + maxBackoffDelay = time.Second + origStart := initialBackoffDelay + initialBackoffDelay = 100 * time.Millisecond + + defer func() { + maxBackoffDelay = origMax + initialBackoffDelay = origStart + }() + + count := 0 + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + Retry(ctx, func() error { + count++ + return io.EOF + }) + + if count > 4 { + t.Fatalf("retry func failed to backoff. called %d times", count) + } +} From f5b8091e2ced4d0d77d00f7106ebf2e2ffa947ce Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 14 Feb 2018 18:21:26 -0500 Subject: [PATCH 2/8] remove retryFunc It's now in the communicator package --- .../remote-exec/resource_provisioner.go | 80 +------------------ .../remote-exec/resource_provisioner_test.go | 62 -------------- 2 files changed, 4 insertions(+), 138 deletions(-) diff --git a/builtin/provisioners/remote-exec/resource_provisioner.go b/builtin/provisioners/remote-exec/resource_provisioner.go index ba811dafe..8c8c09298 100644 --- a/builtin/provisioners/remote-exec/resource_provisioner.go +++ b/builtin/provisioners/remote-exec/resource_provisioner.go @@ -9,7 +9,6 @@ import ( "log" "os" "strings" - "sync/atomic" "time" "github.com/hashicorp/terraform/communicator" @@ -159,7 +158,7 @@ func runScripts( scripts []io.ReadCloser) error { // Wrap out context in a cancelation function that we use to // kill the connection. - ctx, cancelFunc := context.WithCancel(ctx) + ctx, cancelFunc := context.WithTimeout(ctx, comm.Timeout()) defer cancelFunc() // Wait for the context to end and then disconnect @@ -169,7 +168,7 @@ func runScripts( }() // Wait and retry until we establish the connection - err := retryFunc(ctx, comm.Timeout(), func() error { + err := communicator.Retry(ctx, func() error { err := comm.Connect(o) return err }) @@ -187,7 +186,8 @@ func runScripts( go copyOutput(o, errR, errDoneCh) remotePath := comm.ScriptPath() - err = retryFunc(ctx, comm.Timeout(), func() error { + + err = communicator.Retry(ctx, func() error { if err := comm.UploadScript(remotePath, script); err != nil { return fmt.Errorf("Failed to upload script: %v", err) } @@ -248,75 +248,3 @@ func copyOutput( o.Output(line) } } - -// retryFunc is used to retry a function for a given duration -func retryFunc(ctx context.Context, timeout time.Duration, f func() error) error { - // Build a new context with the timeout - ctx, done := context.WithTimeout(ctx, timeout) - defer done() - - // container for atomic error value - type errWrap struct { - E error - } - - // Try the function in a goroutine - var errVal atomic.Value - doneCh := make(chan struct{}) - go func() { - defer close(doneCh) - - delay := time.Duration(0) - for { - // If our context ended, we want to exit right away. - select { - case <-ctx.Done(): - return - case <-time.After(delay): - } - - // Try the function call - err := f() - errVal.Store(&errWrap{err}) - - if err == nil { - return - } - - log.Printf("[WARN] retryable error: %v", err) - - delay *= 2 - - if delay == 0 { - delay = initialBackoffDelay - } - - if delay > maxBackoffDelay { - delay = maxBackoffDelay - } - - log.Printf("[INFO] sleeping for %s", delay) - } - }() - - // Wait for completion - select { - case <-ctx.Done(): - case <-doneCh: - } - - // Check if we have a context error to check if we're interrupted or timeout - switch ctx.Err() { - case context.Canceled: - return fmt.Errorf("interrupted") - case context.DeadlineExceeded: - return fmt.Errorf("timeout") - } - - // Check if we got an error executing - if ev, ok := errVal.Load().(errWrap); ok { - return ev.E - } - - return nil -} diff --git a/builtin/provisioners/remote-exec/resource_provisioner_test.go b/builtin/provisioners/remote-exec/resource_provisioner_test.go index 8c447788d..a6e024fe5 100644 --- a/builtin/provisioners/remote-exec/resource_provisioner_test.go +++ b/builtin/provisioners/remote-exec/resource_provisioner_test.go @@ -2,12 +2,8 @@ package remoteexec import ( "bytes" - "context" - "errors" "io" - "net" "testing" - "time" "strings" @@ -210,64 +206,6 @@ func TestResourceProvider_CollectScripts_scriptsEmpty(t *testing.T) { } } -func TestRetryFunc(t *testing.T) { - origMax := maxBackoffDelay - maxBackoffDelay = time.Second - origStart := initialBackoffDelay - initialBackoffDelay = 10 * time.Millisecond - - defer func() { - maxBackoffDelay = origMax - initialBackoffDelay = origStart - }() - - // succeed on the third try - errs := []error{io.EOF, &net.OpError{Err: errors.New("ERROR")}, nil} - count := 0 - - err := retryFunc(context.Background(), time.Second, func() error { - if count >= len(errs) { - return errors.New("failed to stop after nil error") - } - - err := errs[count] - count++ - - return err - }) - - if count != 3 { - t.Fatal("retry func should have been called 3 times") - } - - if err != nil { - t.Fatal(err) - } -} - -func TestRetryFuncBackoff(t *testing.T) { - origMax := maxBackoffDelay - maxBackoffDelay = time.Second - origStart := initialBackoffDelay - initialBackoffDelay = 100 * time.Millisecond - - defer func() { - maxBackoffDelay = origMax - initialBackoffDelay = origStart - }() - - count := 0 - - retryFunc(context.Background(), time.Second, func() error { - count++ - return io.EOF - }) - - if count > 4 { - t.Fatalf("retry func failed to backoff. called %d times", count) - } -} - func testConfig(t *testing.T, c map[string]interface{}) *terraform.ResourceConfig { r, err := config.NewRawConfig(c) if err != nil { From 89a0ac6e8939fc8764bff1174f1ea68930cfc8e6 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 14 Feb 2018 18:25:05 -0500 Subject: [PATCH 3/8] remove retryFunc It's now in the communicator package --- .../provisioners/chef/resource_provisioner.go | 24 ++++--------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/builtin/provisioners/chef/resource_provisioner.go b/builtin/provisioners/chef/resource_provisioner.go index 8fc45e59e..6edf973f3 100644 --- a/builtin/provisioners/chef/resource_provisioner.go +++ b/builtin/provisioners/chef/resource_provisioner.go @@ -15,7 +15,6 @@ import ( "strings" "sync" "text/template" - "time" "github.com/hashicorp/terraform/communicator" "github.com/hashicorp/terraform/communicator/remote" @@ -307,8 +306,11 @@ func applyFn(ctx context.Context) error { return err } + ctx, cancel := context.WithTimeout(ctx, comm.Timeout()) + defer cancel() + // Wait and retry until we establish the connection - err = retryFunc(comm.Timeout(), func() error { + err = communicator.Retry(ctx, func() error { return comm.Connect(o) }) if err != nil { @@ -717,24 +719,6 @@ func (p *provisioner) copyOutput(o terraform.UIOutput, r io.Reader, doneCh chan< } } -// retryFunc is used to retry a function for a given duration -func retryFunc(timeout time.Duration, f func() error) error { - finish := time.After(timeout) - for { - err := f() - if err == nil { - return nil - } - log.Printf("Retryable error: %v", err) - - select { - case <-finish: - return err - case <-time.After(3 * time.Second): - } - } -} - func decodeConfig(d *schema.ResourceData) (*provisioner, error) { p := &provisioner{ ClientOptions: getStringList(d.Get("client_options")), From d02250c2b94ba20dfbbeb3913e893a2c32cf503c Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 14 Feb 2018 18:30:20 -0500 Subject: [PATCH 4/8] remove retryFunc it's now in the communicator package --- .../provisioners/file/resource_provisioner.go | 41 ++++--------------- 1 file changed, 7 insertions(+), 34 deletions(-) diff --git a/builtin/provisioners/file/resource_provisioner.go b/builtin/provisioners/file/resource_provisioner.go index 9b9e8a97b..30ed5e359 100644 --- a/builtin/provisioners/file/resource_provisioner.go +++ b/builtin/provisioners/file/resource_provisioner.go @@ -4,9 +4,7 @@ import ( "context" "fmt" "io/ioutil" - "log" "os" - "time" "github.com/hashicorp/terraform/communicator" "github.com/hashicorp/terraform/helper/schema" @@ -50,6 +48,9 @@ func applyFn(ctx context.Context) error { return err } + ctx, cancel := context.WithTimeout(ctx, comm.Timeout()) + defer cancel() + // Get the source src, deleteSource, err := getSrc(data) if err != nil { @@ -61,21 +62,11 @@ func applyFn(ctx context.Context) error { // Begin the file copy dst := data.Get("destination").(string) - resultCh := make(chan error, 1) - go func() { - resultCh <- copyFiles(comm, src, dst) - }() - // Allow the file copy to complete unless there is an interrupt. - // If there is an interrupt we make no attempt to cleanly close - // the connection currently. We just abruptly exit. Because Terraform - // taints the resource, this is fine. - select { - case err := <-resultCh: + if err := copyFiles(ctx, comm, src, dst); err != nil { return err - case <-ctx.Done(): - return fmt.Errorf("file transfer interrupted") } + return nil } func validateFn(c *terraform.ResourceConfig) (ws []string, es []error) { @@ -107,9 +98,9 @@ func getSrc(data *schema.ResourceData) (string, bool, error) { } // copyFiles is used to copy the files from a source to a destination -func copyFiles(comm communicator.Communicator, src, dst string) error { +func copyFiles(ctx context.Context, comm communicator.Communicator, src, dst string) error { // Wait and retry until we establish the connection - err := retryFunc(comm.Timeout(), func() error { + err := communicator.Retry(ctx, func() error { err := comm.Connect(nil) return err }) @@ -144,21 +135,3 @@ func copyFiles(comm communicator.Communicator, src, dst string) error { } return err } - -// retryFunc is used to retry a function for a given duration -func retryFunc(timeout time.Duration, f func() error) error { - finish := time.After(timeout) - for { - err := f() - if err == nil { - return nil - } - log.Printf("Retryable error: %v", err) - - select { - case <-finish: - return err - case <-time.After(3 * time.Second): - } - } -} From e331ae9842eebc09cfedf7ba828fd288c7d5f3ee Mon Sep 17 00:00:00 2001 From: James Bardin Date: Wed, 14 Feb 2018 18:32:29 -0500 Subject: [PATCH 5/8] remove retryFunc it's now in the communicator package --- .../habitat/resource_provisioner.go | 29 ++++--------------- 1 file changed, 6 insertions(+), 23 deletions(-) diff --git a/builtin/provisioners/habitat/resource_provisioner.go b/builtin/provisioners/habitat/resource_provisioner.go index f9d47ff07..aa404dae1 100644 --- a/builtin/provisioners/habitat/resource_provisioner.go +++ b/builtin/provisioners/habitat/resource_provisioner.go @@ -6,12 +6,10 @@ import ( "errors" "fmt" "io" - "log" "net/url" "path" "strings" "text/template" - "time" "github.com/hashicorp/terraform/communicator" "github.com/hashicorp/terraform/communicator/remote" @@ -233,10 +231,13 @@ func applyFn(ctx context.Context) error { return err } - err = retryFunc(comm.Timeout(), func() error { - err = comm.Connect(o) - return err + ctx, cancel := context.WithTimeout(ctx, comm.Timeout()) + defer cancel() + + err = communicator.Retry(ctx, func() error { + return comm.Connect(o) }) + if err != nil { return err } @@ -728,24 +729,6 @@ func (p *provisioner) uploadUserTOML(o terraform.UIOutput, comm communicator.Com } -func retryFunc(timeout time.Duration, f func() error) error { - finish := time.After(timeout) - - for { - err := f() - if err == nil { - return nil - } - log.Printf("Retryable error: %v", err) - - select { - case <-finish: - return err - case <-time.After(3 * time.Second): - } - } -} - func (p *provisioner) copyOutput(o terraform.UIOutput, r io.Reader, doneCh chan<- struct{}) { defer close(doneCh) lr := linereader.New(r) From e06f76b90f2b47db63f732d36f523450a3b4d19f Mon Sep 17 00:00:00 2001 From: James Bardin Date: Thu, 15 Feb 2018 14:04:17 -0500 Subject: [PATCH 6/8] Fix type assertion when loading stored error Fix a bug where the last error was not retrieved from errVal.Load due to an incorrect type assertion. --- communicator/communicator.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/communicator/communicator.go b/communicator/communicator.go index 3749a9f98..440c1c727 100644 --- a/communicator/communicator.go +++ b/communicator/communicator.go @@ -55,14 +55,17 @@ func New(s *terraform.InstanceState) (Communicator, error) { } } -// maxBackoffDealy is the maximum delay between retry attempts -var maxBackoffDelay = 10 * time.Second +// maxBackoffDelay is the maximum delay between retry attempts +var maxBackoffDelay = 20 * time.Second var initialBackoffDelay = time.Second +// Fatal is an interface that error values can return to halt Retry type Fatal interface { FatalError() error } +// Retry retries the function f until it returns a nil error, a Fatal error, or +// the context expires. func Retry(ctx context.Context, f func() error) error { // container for atomic error value type errWrap struct { @@ -97,7 +100,7 @@ func Retry(ctx context.Context, f func() error) error { done = true } - errVal.Store(&errWrap{err}) + errVal.Store(errWrap{err}) if done { return From c1b35ad69b75bb637780603fe258f67f5a0e0ea1 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Thu, 15 Feb 2018 14:16:37 -0500 Subject: [PATCH 7/8] have the ssh communicator return fatal errors This will let the retry loop abort when there are errors which aren't going to ever be corrected. --- .../remote-exec/resource_provisioner.go | 4 ++-- communicator/ssh/communicator.go | 14 +++++++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/builtin/provisioners/remote-exec/resource_provisioner.go b/builtin/provisioners/remote-exec/resource_provisioner.go index 8c8c09298..082675ce1 100644 --- a/builtin/provisioners/remote-exec/resource_provisioner.go +++ b/builtin/provisioners/remote-exec/resource_provisioner.go @@ -169,9 +169,9 @@ func runScripts( // Wait and retry until we establish the connection err := communicator.Retry(ctx, func() error { - err := comm.Connect(o) - return err + return comm.Connect(o) }) + if err != nil { return err } diff --git a/communicator/ssh/communicator.go b/communicator/ssh/communicator.go index 4ad67aefc..85dabb6b5 100644 --- a/communicator/ssh/communicator.go +++ b/communicator/ssh/communicator.go @@ -63,6 +63,14 @@ type sshConfig struct { sshAgent *sshAgent } +type fatalError struct { + error +} + +func (e fatalError) FatalError() error { + return e.error +} + // New creates a new communicator implementation over SSH. func New(s *terraform.InstanceState) (*Communicator, error) { connInfo, err := parseConnectionInfo(s) @@ -159,8 +167,8 @@ func (c *Communicator) Connect(o terraform.UIOutput) (err error) { host := fmt.Sprintf("%s:%d", c.connInfo.Host, c.connInfo.Port) sshConn, sshChan, req, err := ssh.NewClientConn(c.conn, host, c.config.config) if err != nil { - log.Printf("handshake error: %s", err) - return err + log.Printf("fatal handshake error: %s", err) + return fatalError{err} } c.client = ssh.NewClient(sshConn, sshChan, req) @@ -168,7 +176,7 @@ func (c *Communicator) Connect(o terraform.UIOutput) (err error) { if c.config.sshAgent != nil { log.Printf("[DEBUG] Telling SSH config to forward to agent") if err := c.config.sshAgent.ForwardToAgent(c.client); err != nil { - return err + return fatalError{err} } log.Printf("[DEBUG] Setting up a session to request agent forwarding") From 0345d960b2068cdddc6546df2b06a9b22411b8b4 Mon Sep 17 00:00:00 2001 From: James Bardin Date: Thu, 15 Feb 2018 15:01:55 -0500 Subject: [PATCH 8/8] simplify remote-exec runScripts There no reason to retry around the execution of remote scripts. We've already established a connection, so the only that could happen here is to continually retry uploading or executing a script that can't succeed. This also simplifies the streaming output from the command, which doesn't need such explicit synchronization. Closing the output pipes is sufficient to stop the copyOutput functions, and they don't close around any values that are accessed again after the command executes. --- .../provisioners/file/resource_provisioner.go | 11 +++- .../remote-exec/resource_provisioner.go | 63 ++++++------------- 2 files changed, 28 insertions(+), 46 deletions(-) diff --git a/builtin/provisioners/file/resource_provisioner.go b/builtin/provisioners/file/resource_provisioner.go index 30ed5e359..5514250d7 100644 --- a/builtin/provisioners/file/resource_provisioner.go +++ b/builtin/provisioners/file/resource_provisioner.go @@ -101,13 +101,18 @@ func getSrc(data *schema.ResourceData) (string, bool, error) { func copyFiles(ctx context.Context, comm communicator.Communicator, src, dst string) error { // Wait and retry until we establish the connection err := communicator.Retry(ctx, func() error { - err := comm.Connect(nil) - return err + return comm.Connect(nil) }) if err != nil { return err } - defer comm.Disconnect() + + // disconnect when the context is canceled, which will close this after + // Apply as well. + go func() { + <-ctx.Done() + comm.Disconnect() + }() info, err := os.Stat(src) if err != nil { diff --git a/builtin/provisioners/remote-exec/resource_provisioner.go b/builtin/provisioners/remote-exec/resource_provisioner.go index 082675ce1..378a282ed 100644 --- a/builtin/provisioners/remote-exec/resource_provisioner.go +++ b/builtin/provisioners/remote-exec/resource_provisioner.go @@ -171,57 +171,40 @@ func runScripts( err := communicator.Retry(ctx, func() error { return comm.Connect(o) }) - if err != nil { return err } for _, script := range scripts { var cmd *remote.Cmd + outR, outW := io.Pipe() errR, errW := io.Pipe() - outDoneCh := make(chan struct{}) - errDoneCh := make(chan struct{}) - go copyOutput(o, outR, outDoneCh) - go copyOutput(o, errR, errDoneCh) + defer outW.Close() + defer errW.Close() + + go copyOutput(o, outR) + go copyOutput(o, errR) remotePath := comm.ScriptPath() - err = communicator.Retry(ctx, func() error { - if err := comm.UploadScript(remotePath, script); err != nil { - return fmt.Errorf("Failed to upload script: %v", err) - } - - cmd = &remote.Cmd{ - Command: remotePath, - Stdout: outW, - Stderr: errW, - } - if err := comm.Start(cmd); err != nil { - return fmt.Errorf("Error starting script: %v", err) - } - - return nil - }) - if err == nil { - cmd.Wait() - if cmd.ExitStatus != 0 { - err = fmt.Errorf("Script exited with non-zero exit status: %d", cmd.ExitStatus) - } + if err := comm.UploadScript(remotePath, script); err != nil { + return fmt.Errorf("Failed to upload script: %v", err) } - // If we have an error, end our context so the disconnect happens. - // This has to happen before the output cleanup below since during - // an interrupt this will cause the outputs to end. - if err != nil { - cancelFunc() + cmd = &remote.Cmd{ + Command: remotePath, + Stdout: outW, + Stderr: errW, + } + if err := comm.Start(cmd); err != nil { + return fmt.Errorf("Error starting script: %v", err) } - // Wait for output to clean up - outW.Close() - errW.Close() - <-outDoneCh - <-errDoneCh + cmd.Wait() + if cmd.ExitStatus != 0 { + err = fmt.Errorf("Script exited with non-zero exit status: %d", cmd.ExitStatus) + } // Upload a blank follow up file in the same path to prevent residual // script contents from remaining on remote machine @@ -230,19 +213,13 @@ func runScripts( // This feature is best-effort. log.Printf("[WARN] Failed to upload empty follow up script: %v", err) } - - // If we have an error, return it out now that we've cleaned up - if err != nil { - return err - } } return nil } func copyOutput( - o terraform.UIOutput, r io.Reader, doneCh chan<- struct{}) { - defer close(doneCh) + o terraform.UIOutput, r io.Reader) { lr := linereader.New(r) for line := range lr.Ch { o.Output(line)