119 lines
2.4 KiB
Go
119 lines
2.4 KiB
Go
package linereader
|
|
|
|
import (
|
|
"io"
|
|
"bufio"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
// Reader takes an io.Reader and pushes the lines out onto the channel.
|
|
type Reader struct {
|
|
Reader io.Reader
|
|
Timeout time.Duration
|
|
|
|
// Ch is the output channel. This will be closed when there are no
|
|
// more lines (io.EOF).
|
|
Ch chan string
|
|
|
|
started uint32
|
|
}
|
|
|
|
// New creates a new Reader that reads lines from the io.Reader.
|
|
//
|
|
// The Reader is already started when returned, so it is unsafe to modify
|
|
// any struct fields.
|
|
func New(r io.Reader) *Reader {
|
|
result := &Reader{
|
|
Reader: r,
|
|
Timeout: 100 * time.Millisecond,
|
|
Ch: make(chan string),
|
|
}
|
|
|
|
go result.Run()
|
|
return result
|
|
}
|
|
|
|
// Run reads from the Reader and dispatches lines on the Ch channel.
|
|
//
|
|
// This blocks and is usually called with `go` prefixed to dispatch onto
|
|
// a goroutine. It is safe to call this function multiple times; subsequent
|
|
// calls to Run will exit without running.
|
|
func (r *Reader) Run() {
|
|
if !atomic.CompareAndSwapUint32(&r.started, 0, 1) {
|
|
return
|
|
}
|
|
|
|
// When we're done, close the channel
|
|
defer close(r.Ch)
|
|
|
|
// Listen for bytes in a goroutine. We do this so that if we're blocking
|
|
// we can flush the bytes we have after some configured time. There is
|
|
// probably a way to make this a lot faster but this works for now.
|
|
//
|
|
// NOTE: This isn't particularly performant. I'm sure there is a better
|
|
// way to do this instead of sending single bytes on a channel, but it
|
|
// works fine.
|
|
buf := bufio.NewReader(r.Reader)
|
|
byteCh := make(chan byte)
|
|
doneCh := make(chan error)
|
|
go func() {
|
|
defer close(doneCh)
|
|
for {
|
|
b, err := buf.ReadByte()
|
|
if err != nil {
|
|
doneCh <- err
|
|
return
|
|
}
|
|
|
|
byteCh <- b
|
|
}
|
|
}()
|
|
|
|
lineBuf := make([]byte, 0, 80)
|
|
for {
|
|
var err error
|
|
line := lineBuf[0:0]
|
|
for {
|
|
brk := false
|
|
|
|
select {
|
|
case b := <-byteCh:
|
|
brk = b == '\n'
|
|
if !brk {
|
|
line = append(line, b)
|
|
}
|
|
case err = <-doneCh:
|
|
brk = true
|
|
case <-time.After(r.Timeout):
|
|
if len(line) > 0 {
|
|
brk = true
|
|
}
|
|
}
|
|
|
|
if brk {
|
|
break
|
|
}
|
|
}
|
|
|
|
// If an error occurred and its not an EOF, then report that
|
|
// error to all pipes and exit.
|
|
if err != nil && err != io.EOF {
|
|
break
|
|
}
|
|
|
|
// If we're at the end and the line is empty, then return.
|
|
if err == io.EOF && len(line) == 0 {
|
|
break
|
|
}
|
|
|
|
// Write out the line
|
|
r.Ch <- string(line)
|
|
|
|
// If we hit the end, we're done
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
}
|
|
}
|