Go & 管道

管道

管道是不同进程间通信的一种手段,是UNIX系统IPC的最古老形式,管道有如下特点:

  • 历史上,他们是半双工的(虽然有的系统提供了全双工)
  • 只能在具有公共祖先的两个进程间使用

虽然有上面的局限,半双工管道仍是最常用的IPC方式,shell执行命令时,会为每一条命令单独创建一个进程,然后用管道将前一条命令进程的标准输出与后一条命令的标准输入相连接。

这里我们先使用C语言测试一下管道:


# include "stdio.h"
# include "stdlib.h"
# include "unistd.h"

int main(void) {
    int a[2];
    char buf[10];
    if (pipe(a) == -1) {
        perror("pipe");
        exit(-1);
    }
    write(a[1], "CODE", 10);
    printf("\n");
    read(a[0], buf, 10);
    printf("%s", buf);
}

这里使用了pipe系统调用

/* Create a one-way communication channel (pipe).
   If successful, two file descriptors are stored in PIPEDES;
   bytes written on PIPEDES[1] can be read from PIPEDES[0].
   Returns 0 if successful, -1 if not.  */
extern int pipe (int __pipedes[2]) __THROW __wur;


/* Write N bytes of BUF to FD.  Return the number written, or -1.

   This function is a cancellation point and therefore not marked with
   __THROW.  */
extern ssize_t write (int __fd, const void *__buf, size_t __n) __wur;

FIFO

FIFO有时称为命名管道。未命名的管道只能在两个相关进程之间调用,而且这两个相关进程还有一个共同的创建了他们的祖先进程。但是,通过FIFO,不相关的进程也能交换数据。
创建FIFO类似于创建文件,FIFO的路径名存在于文件系统中。
对应的系统调用为:

/* Create a new FIFO named PATH, with permission bits MODE.  */
extern int mkfifo (const char *__path, __mode_t __mode)
     __THROW __nonnull ((1));

命名管道允许任何进程通过它来交换数据,下面在命令行测试下FIFO的使用:

mkfifo -m 666 myfifo
ls -ll
touch test.txt
tee test.txt < myfifo
cat ./main.go > myfifo
cat ./test.txt

首先创建了一个命名管道叫myfifo,执行完之后就会在当前目录下生成一个文件叫myfifo,ls -ll查看其类型为p,也就是管道的意思,然后使用这个管道实现了简单的交换数据。

使用os/exec 调用外部命令

接下来我们使用go的os/exec测试管道,这个包提供了调用外部命令的能力

func main() {
    cmd := exec.Command("echo", "-n", "hello world")
    // ReadCloser
    out, err := cmd.StdoutPipe()
    if err != nil {
        log.Printf("out pipe fail:%v", err)
    }
    err = cmd.Start()
    if err != nil {
        log.Printf("start fail:%v\n", err)
    }
    // 1. 直接用字节切片接收
    //temp := make([]byte, 20)
    //out.Read(temp)
    //log.Println(string(temp))
    // 2. 使用bytes.Buffer
    //buf := bytes.Buffer{}
    //for {
    //  temp := make([]byte, 3)
    //  _, err := out.Read(temp)
    //  if err == io.EOF {
    //      break
    //  }
    //  buf.Write(temp)
    //}
    //log.Println(buf.String())
    // 3. 使用bufio.Reader,默认4096字节的缓冲区
    reader := bufio.NewReader(out)
    //output, _, err := reader.ReadLine()
    str, err := reader.ReadString('\n')
    if err != nil {
        log.Printf("reader read fail:%v", err)
    }
    log.Println(str)
    //log.Println(string(output))
    err = cmd.Wait()
    if err != nil {
        log.Printf("wait failed:%v", err)
    }
}

简单介绍下Go实现执行外部命令的细节,exec.Command函数会调用exec的LookPath函数,会去OS的PATH路径下查找,是否有对应的可执行程序文件,比如这里是echo,会依次访问path的路径,看有没有echo这个文件,在我的Mac上,最终找到的路径为/bin/echo。在exec.Start函数内部会调用exec.StartProcess来创建一个新的进程,在运行时执行fork系统调用

pid, h, e := syscall.StartProcess(name, argv, sysattr)

创建进程后,可使用pstree命令查看进程的关系,可以看到echo进程相关的祖先进程。

$ pstree -g 2 -s echo 
─┬◆ 00001 root /sbin/launchd
 └─┬◆ 03320 hongyi /Applications/GoLand.app/Contents/MacOS/goland
   └─┬─ 18046 hongyi /Applications/GoLand.app/Contents/plugins/go/lib/dlv/mac/dlv --listen=localhost:49511 --h
     └─┬◆ 18047 hongyi /Library/Developer/CommandLineTools/Library/PrivateFrameworks/LLDB.framework/Versions/A
       └─┬─ 18048 hongyi /private/var/folders/v4/9s11xdk514n5x6ntjptb93680000gn/T/___go_build_main_go__1_
         └─── 18052 hongyi (echo)

上面的go程序使用了cmd.StdoutPipe()方法,在StdoutPipe内部使用了下文要说的os.Pipe()函数,而在这个函数内部调用了syscall.Pipe(),这个函数最终调用了pipe系统调用。

用管道连接命令

我们通过管道来连接两个命令,这里会新起两个进程,一个ls,一个grep。

ls ll | grep main
  func main() {
    cmd1 := exec.Command("ps", "-axv")
    cmd2 := exec.Command("grep", "Code.app")
    pipeTemp := bytes.Buffer{}
    // 模拟管道
    cmd1.Stdout = &pipeTemp
    if err := cmd1.Start(); err != nil {
        log.Fatalf("cmd1 start fail:%v", err)
    }
    if err := cmd1.Wait(); err != nil {
        log.Fatalf("cmd1 wait fail:%v", err)
    }

    cmd2.Stdin = &pipeTemp
    var outputBuf2 bytes.Buffer
    cmd2.Stdout = &outputBuf2
    if err := cmd2.Run(); err != nil {
        fmt.Printf("cmd2 run failed: %s\n", err)
        return
    }
    fmt.Println(outputBuf2.String())
}

cmd.Start函数如下:

    // 省略......
    type F func(*Cmd) (*os.File, error)
    for _, setupFd := range []F{(*Cmd).stdin, (*Cmd).stdout, (*Cmd).stderr} {
        fd, err := setupFd(c)
        if err != nil {
            c.closeDescriptors(c.closeAfterStart)
            c.closeDescriptors(c.closeAfterWait)
            return err
        }
        c.childFiles = append(c.childFiles, fd)
    }
    c.childFiles = append(c.childFiles, c.ExtraFiles...)

    var err error
    //  起一个新进程
    c.Process, err = os.StartProcess(c.Path, c.argv(), &os.ProcAttr{
        Dir:   c.Dir,
        Files: c.childFiles,
        Env:   dedupEnv(c.envv()),
        Sys:   c.SysProcAttr,
    })
    if err != nil {
        c.closeDescriptors(c.closeAfterStart)
        c.closeDescriptors(c.closeAfterWait)
        return err
    }

    c.closeDescriptors(c.closeAfterStart)

    c.errch = make(chan error, len(c.goroutine))
    for _, fn := range c.goroutine {
        // 起goroutine执行
        go func(fn func() error) {
            c.errch <- fn()
        }(fn)
    }

对于cmd1,没有设置其标准输入,其标准输入为null device,设置了其标准输出为bytes.Buffer类型的pipeTemp,调用Start函数后,在方法writerDescriptor内部会创建一个管道,以并发的形式将管道的读端拷贝到Stdout,在Start函数中会执行这个c.goroutine,writerDescriptor返回管道的写端,加入到c. childFiles作为新进程的标准输出,最终childFiles包含的标准输入,输出,错误将传给新起的进程(ps进程),因此新进程向标准输出写,即是向管道里写入,而管道的读端又被copy到了pipeTemp,因此这个新进程的结果将被写入到pipeTemp中。

对于cmd2,设置了标准输入为pipeTemp,也创建了一个管道,将读端返回,将cmd2的标准输入也就是pipeTemp,在goroutine中拷贝到管道写端。当goroutine执行时,pipeTemp的内容将被写入管道,而返回的管道的读端将会作为新进程的标准输入,也就是将pipeTemp的内容传给了新进程作为标准输入。


func (c *Cmd) stdout() (f *os.File, err error) {
    return c.writerDescriptor(c.Stdout)
}
func (c *Cmd) writerDescriptor(w io.Writer) (f *os.File, err error) {
    //省略.....
    pr, pw, err := os.Pipe()
    if err != nil {
        return
    }

    c.closeAfterStart = append(c.closeAfterStart, pw)
    c.closeAfterWait = append(c.closeAfterWait, pr)
    c.goroutine = append(c.goroutine, func() error {
      // 将读端拷贝到stdout
        _, err := io.Copy(w, pr)
        pr.Close() // in case io.Copy stopped due to write error
        return err
    })
    return pw, nil
}

func (c *Cmd) stdin() (f *os.File, err error) {
    // 省略....
    pr, pw, err := os.Pipe()
    if err != nil {
        return
    }

    c.closeAfterStart = append(c.closeAfterStart, pr)
    c.closeAfterWait = append(c.closeAfterWait, pw)
    c.goroutine = append(c.goroutine, func() error {
        _, err := io.Copy(pw, c.Stdin)
        if skip := skipStdinCopyError; skip != nil && skip(err) {
            err = nil
        }
        if err1 := pw.Close(); err == nil {
            err = err1
        }
        return err
    })
    return pr, nil
}

os.Pipe

下面直接使用Go的os.Pipe()API,os.Pipe()函数调用了syscall包的Pipe()函数。

func basedFilePipe() {
    reader, writer, err := os.Pipe()
    if err != nil {
        log.Printf("get pipe failed:%v", err)
    }
    go func() {
        out := make([]byte, 26)
        n, err := reader.Read(out)
        if err != nil {
            log.Printf("read failed:%v", err)
        }
        log.Printf("read %d bytes", n)
        log.Println(string(out))
    }()

    temp := make([]byte, 26)
    for i := 65; i <= 90; i++ {
        temp[i-65] = byte(i)
    }
    n, err := writer.Write(temp)
    if err != nil {
        log.Printf("write failed:%v", err)
    }
    log.Printf("write %d bytes", n)
    time.Sleep(1 * time.Second)
}

上面介绍的管道是没有提供原子操作支持的,Go的io包提供了Pipe函数,这个函数有如下特点:

  • 基于内存的同步管道
  • 如果没有读端消费,写端一直阻塞(同步channel)
  • 没有内部缓存,直接从写端拷贝到读端
  • 并发读写安全(sync包)

下面是一个内存管道的示例

func ioPipe()  {
   reader, writer := io.Pipe()
   var wg sync.WaitGroup
   const goroutineCount = 2
   wg.Add(goroutineCount)
   //reader
   for i := 0; i<goroutineCount; i++ {
       go func() {
           defer wg.Done()
           output := make([]byte, 100)
           n, err := reader.Read(output)
           if err != nil {
               fmt.Printf("Error: Couldn't read data from the named pipe: %s\n", err)
           }
           fmt.Printf("Read %d byte(s). [in-memory pipe]\n", n)
           log.Println(string(output))
       }()
   }
   // writer1
   go func() {
       input := make([]byte, 26)
       for i := 65; i <= 90; i++ {
           input[i-65] = byte(i)
       }
       n, err := writer.Write(input)
       if err != nil {
           fmt.Printf("Error: Couldn't write data to the named pipe: %s\n", err)
       }
       fmt.Printf("Written %d byte(s). [in-memory pipe]\n", n)
   }()
   // writer2
   go func() {
       input := make([]byte, 26)
       for i := 97; i <= 122; i++ {
           input[i-97] = byte(i)
       }
       n, err := writer.Write(input)
       if err != nil {
           fmt.Printf("Error: Couldn't write data to the named pipe: %s\n", err)
       }
       fmt.Printf("Written %d byte(s). [in-memory pipe]\n", n)
   }()
   wg.Wait()
}

os.Pipe() vs io.Pipe()

  • os.Pipe()基于syscall.Pipe()实现,比较底层,实现了基于操作系统级别的管道,而io.Pipe()基于channel和互斥锁实现数据通信
  • os.Pipe()不保证原子操作,io.Pipe是并发安全的
  • io.Pipe()因为使用了无缓冲channel, 读写是阻塞的

syscall

该包包含了一些操作系统底层的接口,实现细节依赖于底层的系统,默认下,godoc只会展示当前系统的syscall文档。如果你想展示其他系统的syscall文档,需设置$GOOS和$GOARCH到目标系统。syscall主要的使用是提供当前系统的接口给其他代码包如os,time,net。Go推荐使用那些包而不是这个syscall包。

func Pipe(p []int) (err error)
func Pipe2(p []int, flags int) (err error)
func Mkfifo(path string, mode uint32) (err error)

下面是pipe_bsd.go对os.Pipe()的实现

func Pipe() (r *File, w *File, err error) {
    var p [2]int
    // See ../syscall/exec.go for description of lock.
    syscall.ForkLock.RLock()
    e := syscall.Pipe(p[0:])
    if e != nil {
        syscall.ForkLock.RUnlock()
        return nil, nil, NewSyscallError("pipe", e)
    }
    syscall.CloseOnExec(p[0])
    syscall.CloseOnExec(p[1])
    syscall.ForkLock.RUnlock()
    return newFile(uintptr(p[0]), "|0", kindPipe), newFile(uintptr(p[1]), "|1", kindPipe), nil
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,744评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,505评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,105评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,242评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,269评论 6 389
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,215评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,096评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,939评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,354评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,573评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,745评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,448评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,048评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,683评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,838评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,776评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,652评论 2 354

推荐阅读更多精彩内容

  • 01 做彼此最好的队友 有人说,婚姻关系里两个人最好的状态是保持朋友关系。 因为做朋友,可以谈心,可以无拘无束分享...
    诗水年华阅读 683评论 0 1
  • 话说,啊狠在学前班课堂上凭着精湛的客家话演讲,在2班的地位是越来越重要了。明显感觉到自己的形象难以撑起自己的才华…...
    Chosing_春幸阅读 131评论 0 0
  • 当你尚能安然无损地捧着这段文字细细品读时,你便已经有了值得幸福的理由。时光的长河里,每日都得淘尽多少人的青容和生命...
    六六小记阅读 636评论 0 6
  • 我又一次来到我们当初分别的地方,什么也没做,只是站着,站着。就好像你还没有离开的时候,你将手塞入我的衣兜,而我望着...
    噬忆阅读 106评论 0 0