基于Kafka构建事件溯源模式的微服务

概要

本文中我们将讨论如何借助Kafka实现分布式消息管理,使用事件溯源(Event Sourcing)模式实现原子化数据处理,使用CQRS模式(Command-Query Responsibility Segregation )实现查询职责分离,使用消费者群组解决单点故障问题,理解分布式协调框架Zookeeper的运行机制。整个应用的代码实现使用Go语言描述。

  • 第一部分 引子、环境准备、整体设计及实现
  • 第二部分 消息消费者及其集群化
  • 第三部分 测试驱动开发、Docker部署和持续集成

第一部分 引子、环境准备、整体设计及实现

为什么需要微服务

微服务本身并不算什么新概念,它要解决的问题在软件工程历史中早已经有人提出:解耦、扩展性、灵活性,解决“烂架构”膨胀后带来的复杂度问题。

Conway's law(康威定律)

Any organization that designs a system (defined broadly) will produce a design whose structure is a copy of the organization's communication structure.(任何组织在设计一套系统(广义概念上的系统)时,所交付的设计方案在结构上都与该组织的通信结构保持一致)
-- Melvyn Conway, 1967

《人月神话》:Adding manpower to a late software project makes it later --Fred Brooks, (1975)

为了赶进度加程序员就像用水去灭油锅里的火一样,原因在于:沟通成本 = n(n-1)/2,沟通成本随着项目或者组织的人员增加呈指数级增长。很多项目在经过一段时间的发展之后,都会有不少恐龙级代码,无人敢挑战。比如一个类的规模就多达数千行,核心方法近千行,大量重复代码,每次调整都以失败告终。庞大的系统规模导致团队新成员接手困难,项目组人员增加导致的代码冲突问题,系统复杂度的增加导致的不确定上线风险、引入新技术困难等。

微服务 (Microservices)是解决这些困难的众多方案之一。它本质上是一种软件架构风格,它是以专注于单一责任与功能的小型功能区块 (Small Building Blocks) 为基础,利用模组化的方式组合出复杂的大型应用程序,各功能区块使用与语言无关 (Language-Independent/Language agnostic) 的 API 集相互通讯。

Event Sourcing(事件溯源)

真正构建一个微服务是非常具有挑战性的。其中一个最重要的挑战就是原子化————如何处理分布式数据,如何设计服务的粒度。例如,常见的客户、工单场景,如果拆分成两个服务,查询都变成了一个难题:

select * from order o, customer c
  where o.customer_id = c.id
  and o.gross_amount > 50000
  and o.status = 'PAID'
  and c.country = 'INDONESIA';

在DDD领域(Domain-Driven Design,领域驱动设计)有一种架构风格被广泛应用,即CQRS (Command Query Responsibility Seperation,命令查询职责分离)。CQRS最核心的概念是Command、Event,“将数据(Data)看做是事实(Fact)。每个事实都是过去的痕迹,虽然这种过去可以遗忘,但却无法改变。” 这一思想直接发展了Event Source,即将这些事件的发生过程记录下来,使得我们可以追溯业务流程。CQRS对设计者的影响,是将领域逻辑,尤其是业务流程,皆看做是一种领域对象状态迁移的过程。这一点与REST将HTTP应用协议看做是应用状态迁移的引擎,有着异曲同工之妙。

实现方案

Kafka in a Nutshell

Apache Kafka是由Apache软件基金会开发的一个开源消息中间件项目,由Scala写成。Kafka最初是由LinkedIn开发,并于2011年初开源。2012年10月从Apache Incubator毕业。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。Kafka使用Zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。同时借助Zookeeper,kafka能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。

  • Kafka Core Words
    Broker:Kafka集群包含一个或多个服务器,这种服务器被称为broker
    Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。Topic相当于数据库中的Table,行数据以log的形式存储,非常类似Git中commit log。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处。
    Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition.
    Producer:消息生产者,负责发布消息到Kafka broker
    Consumer:消息消费者,向Kafka broker读取消息的客户端。
    Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定则属于默认的group)。

整体设计

案例:假设一个银行账户系统。经过一段时间的经营发展,该行客户数量和交易规模都有了巨大的增长,系统内部变得异常复杂,每一个部分都变得沉重不堪。我们尝试对他的业务单元进行解耦,例如将余额计算逻辑从原有的核心系统拆分出来。根据银行账户业务特点,我们设计一个生产者——负责根据业务事件触发生成一个事件,所有事件基于Kafka存储,再设计一个消费者——负责从Kafka抓去未处理事件,通过调用业务逻辑处理单元完成后续持久化操作。这样一个账户的所有业务操作都可以有完整的快照历史,符合金融业务Audit(审计)的需要。而且通过使用事件,我们可以很方便地重建数据。

业务事件列表:

  • CreateEvent:开户
  • DepositEvent:存款
  • WithdrawEvent:取款
  • TransferEvent:转账

领域模型:账户(Account)
holder's name:持有人名称
balance:余额
registration date:开户日期
......

领域模型:事件(Event)
name:事件名称
ID:序号
......

环境准备

$ wget http://mirror.bit.edu.cn/apache/kafka/0.10.1.0/kafka_2.10-0.10.1.0.tgz
$ tar -xvf kafka_2.10-0.10.1.0.tgz
$ cd kafka_2.10-0.10.1.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties
$ netstat -an | grep 2181
tcp46      0      0  *.2181                 *.*                    LISTEN     
  • 第二步,启动Kafka
$ bin/kafka-server-start.sh config/server.properties   
[2017-06-13 14:03:08,168] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2017-06-13 14:03:08,172] INFO Kafka version : 0.10.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2017-06-13 14:03:08,172] INFO Kafka commitId : 3402a74efb23d1d4 (org.apache.kafka.common.utils.AppInfoParser)
[2017-06-13 14:03:08,173] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
$ lsof -nP -iTCP -sTCP:LISTEN | sort -n
$ netstat -an | grep 9092
  tcp46      0      0  *.9092                 *.*                    LISTEN
  • 第三步,创建topic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic x-microservice-transactions-t1

Created topic "x-microservice-transactions-t1".
  • 另外,运行多个Kafka 实例
    Kafka多实例非常简单,只需要复制文件 server.properties,稍作修改即可。
config/server-1.properties:
    broker.id=1
    listeners=PLAINTEXT://:9093
    log.dir=/tmp/kafka-logs-1
config/server-2.properties:
    broker.id=2
    listeners=PLAINTEXT://:9094
    log.dir=/tmp/kafka-logs-2
// 启动多个broker,须指定不同的属性文件
$ bin/kafka-server-start.sh config/server-1.properties
$ bin/kafka-server-start.sh config/server-2.properties

domain model

package main

// domain model: bank_account.go

type BankAccount struct {
    Id      string
    Name    string
    Balance int
}

//定义下列函数:

//1. FetchAccount(id) 从Redis读取账户实例信息
//2. updateAccount(id, data) 更新指定账户信息
//3. ToAccount(map) 将从Redis读到的账户信息转换为模型数据,return *BankAccount object.

Kafka & Redis library

// main.go
import (
    "github.com/go-redis/redis" // Redis通讯库:go-redis
)

var (
    Redis = initRedis()
)

func initRedis() *redis.Client {
    redisUrl := os.Getenv("REDIS_URL")

    if redisUrl == "" {
        redisUrl = "127.0.0.1:6379"
    }

    return redis.NewClient(&redis.Options{
        Addr:     redisUrl,
        Password: "",
        DB:       0,
    })
}
package main
//kafka.go
import (
    "encoding/json"
    "fmt"
    "github.com/Shopify/sarama" //Kafka通讯库:Sarama
    "os"
)

var (
    brokers = []string{"127.0.0.1:9092"}
    topic   = "go-microservice-transactions"
    topics  = []string{topic}
)

func newKafkaConfiguration() *sarama.Config {
    conf := sarama.NewConfig()
    conf.Producer.RequiredAcks = sarama.WaitForAll
    conf.Producer.Return.Successes = true
    conf.ChannelBufferSize = 1
    conf.Version = sarama.V0_10_1_0
    return conf
}

func newKafkaSyncProducer() sarama.SyncProducer {
    kafka, err := sarama.NewSyncProducer(brokers, newKafkaConfiguration())

    if err != nil {
        fmt.Printf("Kafka error: %s\n", err)
        os.Exit(-1)
    }
    return kafka
}

func newKafkaConsumer() sarama.Consumer {
    consumer, err := sarama.NewConsumer(brokers, newKafkaConfiguration())

    if err != nil {
        fmt.Printf("Kafka error: %s\n", err)
        os.Exit(-1)
    }

    return consumer
}

消息生产者Producer

package main
//消息生产者 producer.go
import (
    "bufio"
    "fmt"
    "os"
    "strconv"
    "strings"
)

func mainProducer() {
    var err error
    reader := bufio.NewReader(os.Stdin)
    kafka := newKafkaSyncProducer()

    for {
        fmt.Print("-> ")
        text, _ := reader.ReadString('\n')
        text = strings.Replace(text, "\n", "", -1)
        args := strings.Split(text, "###")
        cmd := args[0]

        switch cmd {
        case "create":
            if len(args) == 2 {
                accName := args[1]
                event := NewCreateAccountEvent(accName)
                sendMsg(kafka, event)
            } else {
                fmt.Println("Only specify create###Account Name")
            }
        default:
            fmt.Printf("Unknown command %s, only: create, deposit, withdraw, transfer\n", cmd)
        }

        if err != nil {
            fmt.Printf("Error: %s\n", err)
            err = nil
        }
    }
}
// kafka.go
// 增加发送消息的方法

func sendMsg(kafka sarama.SyncProducer, event interface{}) error {
    json, err := json.Marshal(event)

    if err != nil {
        return err
    }

    msgLog := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.StringEncoder(string(json)),
    }

    partition, offset, err := kafka.SendMessage(msgLog)
    if err != nil {
        fmt.Printf("Kafka error: %s\n", err)
    }

    fmt.Printf("Message: %+v\n", event)
    fmt.Printf("Message is stored in partition %d, offset %d\n",
        partition, offset)

    return nil
}
package main
//启动入口,main.go

func main() {
    mainProducer()
}
$  go build
$ ./go-microservice
-> create
Only specify create###Account Name
-> create###Yanrui
Message: {Event:{AccId:49a23d27-4ffe-4c86-ab9a-fbc308ecff1c Type:CreateEvent} AccName:Yanrui}
Message is stored in partition 0, offset 0
->

第二部分 消息消费者Consumer及其集群化

Consumer负责从Kafka加载消息队列。另外,我们需要为每一个事件创建process()函数。

package main
//processor.go
import (
  "errors"
)
func (e CreateEvent) Process() (*BankAccount, error) {
    return updateAccount(e.AccId, map[string]interface{}{
        "Id":      e.AccId,
        "Name":    e.AccName,
        "Balance": "0",
    })
}

func (e InvalidEvent) Process() error {
  return nil
}

func (e AcceptEvent) Process() error {
  return nil
}
// other Process() codes ...

package main

//consumer.go

func mainConsumer(partition int32) {
    kafka := newKafkaConsumer()
    defer kafka.Close()
    //注:开发环境中我们使用sarama.OffsetOldest,Kafka将从创建以来第一条消息开始发送。
    //在生产环境中切换为sarama.OffsetNewest,只会将最新生成的消息发送给我们。
    consumer, err := kafka.ConsumePartition(topic, partition, sarama.OffsetOldest)
    if err != nil {
        fmt.Printf("Kafka error: %s\n", err)
        os.Exit(-1)
    }

    go consumeEvents(consumer)

    fmt.Println("Press [enter] to exit consumer\n")
    bufio.NewReader(os.Stdin).ReadString('\n')
    fmt.Println("Terminating...")
}

Go语言通过goroutine提供了对于并发编程的直接支持,goroutine是Go语言运行库的功能,作为一个函数入口,在堆上为其分配的一个堆栈。所以它非常廉价,我们可以很轻松的创建上万个goroutine,但它们并不是被操作系统所调度执行。除了被系统调用阻塞的线程外,Go运行库最多会启动$GOMAXPROCS个线程来运行goroutine。

  • goroutines: A goroutine is a lightweight thread of execution.
  • channels: Channels are the pipes that connect concurrent goroutines. (<- operator)
  • for: for is Go’s only looping construct. Here are three basic types of for loops.
  • select: Go’s select lets you wait on multiple channel operations.
  • Non-Blocking Channel Operations
func consumeEvents(consumer sarama.PartitionConsumer) {
  var msgVal []byte
  var log interface{}
  var logMap map[string]interface{}
  var bankAccount *BankAccount
  var err error

  for {
    //goruntine exec
      select {
          // blocking <- channel operator
          case err := <-consumer.Errors():
              fmt.Printf("Kafka error: %s\n", err)
          case msg := <-consumer.Messages():
              msgVal = msg.Value
          //
          if err = json.Unmarshal(msgVal, &log); err != nil {
                fmt.Printf("Failed parsing: %s", err)
          } else {
                logMap = log.(map[string]interface{})
                logType := logMap["Type"]
                fmt.Printf("Processing %s:\n%s\n", logMap["Type"], string(msgVal))

                switch logType {
                case "CreateEvent":
                  event := new(CreateEvent)
                  if err = json.Unmarshal(msgVal, &event); err == nil {
                    bankAccount, err = event.Process()
                  }
                default:
                  fmt.Println("Unknown command: ", logType)
                }

                if err != nil {
                  fmt.Printf("Error processing: %s\n", err)
                  } else {
                    fmt.Printf("%+v\n\n", *bankAccount)
                  }
            }

        }
    }
}  

重构main

package main

//main.go
//支持producer和consumer启动模式

import (
    "flag"
    ...
)

func main() {
    act := flag.String("act", "producer", "Either: producer or consumer")
    partition := flag.String("partition", "0",
        "Partition which the consumer program will be subscribing")

    flag.Parse()

    fmt.Printf("Welcome to go-microservice : %s\n\n", *act)

    switch *act {
    case "producer":
        mainProducer()
    case "consumer":
        if part32int, err := strconv.ParseInt(*partition, 10, 32); err == nil {
            mainConsumer(int32(part32int))
        }
    }
}

通过--act参数,可以启动一个消费者进程。当进程运行时,他将从Kafka一个一个拿出消息进行处理,按照我们之前在每个事件定义的Process() 方法。

$ go build
$ ./go-microservice --act=consumer
Welcome to go-microservice : consumer

Press [enter] to exit consumer

Processing CreateEvent:
{"AccId":"49a23d27-4ffe-4c86-ab9a-fbc308ecff1c","Type":"CreateEvent","AccName":"Yanrui"}
{Id:49a23d27-4ffe-4c86-ab9a-fbc308ecff1c Name:Yanrui Balance:0}
Terminating...

集群化消息消费者

问题:如果一个Consumer宕机了怎么办?(例如:程序崩溃、网络异常等原因)
解决方案:将多个Consumer编组为集群实现高可用。具体来说就是打标签,当有一个新的Log发送时,Kafka将其发送给其中一个实例。当该实例无法接收Log时,Kafka将Log传递给另一个包含相同标签的Consumer。
注意:Kafka 版本 0.9 +,另外还需要使用sarama-cluster库

#使用govendor获取
govendor fetch github.com/bsm/sarama-cluster
//修改mainConsumer方法使用sarama-cluster library连接Kafka
config := cluster.NewConfig()
config.Consumer.Offsets.Initial = sarama.OffsetNewest
consumer, err := cluster.NewConsumer(brokers, "go-microservice-consumer", topics, config)

//topics定义
var (
    topics  = []string{topic}
)

//调整consumeEvents()
case err, more := <-consumer.Errors():
    if more {
        fmt.Printf("Kafka error: %s\n", err)
    }

//consumer.Messages() : MarkOffset
//consumer.go
//func mainConsumer(partition int32)

consumer.MarkOffset(msg, "") //增加的行

msgVal = msg.Value

即使程序崩溃,MarkOffset也会将消息标记为 processed ,标签包括元数据以及这个时间点的状态。元数据可以被另外一个Consumer恢复数据状态,也就能被重新消费。即即使同样的消息被处理两次,结果也是一样的,这个过程理论上是 幂等 的(idempotent)。

Kafka Consumers
//运行多个consumer实例
$ ./go-microservice --act=consumer
$ ./go-microservice --act=consumer
$ ./go-microservice --act=consumer

第三部分:测试驱动开发、Docker部署和持续集成

使用vendor管理Golang项目依赖

用govendor fetch <url1> <url2>新增的第三方包直接被get到根目录的vendor文件夹下,不会与其它的项目混用第三方包,完美避免多个项目同用同一个第三方包的不同版本问题。只需要对vendor/vendor.json进行版本控制,即可对第三包依赖关系进行控制。

$ //
$ go get -u github.com/kardianos/govendor
$ cd $PROJECT_PATH
$ govendor init
$ govendor add +external
$

单元测试:ginkgo Test Suite

$ go get github.com/onsi/ginkgo/ginkgo
$ go get github.com/onsi/gomega
$ ginkgo bootstrap
Generating ginkgo test suite bootstrap for main in:
    go_microservice_suite_test.go
package main_test
//go_microservice_suite_test.go,单元测试类
import (
    "github.com/onsi/ginkgo"
    "github.com/onsi/gomega"
)

var _ = Describe("Event", func() {
    Describe("NewCreateAccountEvent", func() {
        It("can create a create account event", func() {
            name := "John Smith"

            event := NewCreateAccountEvent(name)

            Expect(event.AccName).To(Equal(name))
            Expect(event.AccId).NotTo(BeNil())
            Expect(event.Type).To(Equal("CreateEvent"))
        })
    })
})
$ ginkgo
Running Suite: go-microservice Suite
==========================
Random Seed: 1490709758
Will run 1 of 1 specs
Ran 1 of 1 Specs in 0.000 seconds
SUCCESS! -- 1 Passed | 0 Failed | 0 Pending | 0 Skipped PASS
Ginkgo ran 1 suite in 905.68195ms
Test Suite Passed

单元测试的四个阶段

  1. Setup 启动
  2. Execution 执行
  3. Verification 验证
  4. Teardown 拆卸

Docker部署

Docker 容器中需要包含下列组件:

  1. Golang
  2. Redis、Kafka
  3. 微服务依赖的其它组件

在根目录创建一个Dockerfile

FROM golang:1.8.0
MAINTAINER Yanrui
//install our dependencies
RUN go get -u github.com/kardianos/govendor
RUN go get github.com/onsi/ginkgo/ginkgo
RUN go get github.com/onsi/gomega

//将整个目录拷贝到容器
ADD . /go/src/go-microservice

//检查工作目录
WORKDIR /go/src/go-microservice

//安装依赖项
RUN govendor sync

//测试
$ docker build -t go-microservice .
$ docker run -i -t go-microservice /bin/bash
$ ginkgo
.......................
.......Failed..........

由于容器本地并没有一个Redis实例运行在上面,这时运行ginkgo测试就会报错。我们为什么不在这个Dockerfile中包含一个Redis呢?这就违背了Docker分层解耦的初衷,我们可以通过docker-compose将两个服务连接起来一起工作。

创建一个docker-compose.yml文件(与Dockerfile目录一致):

version: "2.0"

services:
  app:
    environment:
      REDIS_URL: redis:6379
    build: .
    working_dir: /go/src/go-microservice
    links:
      - redis
  redis:
    image: redis:alpine

本地构建完成之后,再次运行 docker-compose run app ginkgo 测试通过。

Infrastructure as Code(基础设施即代码)

The enabling idea of infrastructure as code is that the systems and devices which are used to run software can be treated as if they, themselves, are software. — Kief Morris

云带来的好的一方面是它让公司中的任何人都可以轻松部署、配置和管理他们需要的基础设施。虽然很多基础设施团队采用了云和自动化技术,却没有采用相应的自动化测试和发布流程。它们把这些当作一门过于复杂的脚本语言来使用。他们会为每一次具体的改动编写手册、配置文件和执行脚本,再针对一部分指定的服务器手工运行它们,也就是说每一次改动都还需要花费专业知识、时间和精力。这种工作方式意味着基础设施团队没有把他们自己从日常的重复性劳动中解放出来。目前已经有很多商业云平台提供了Docker服务,只需要将自己的 git repository 链接到平台,即可以自动帮你完成部署,在云上完成集成测试。

    docker-compose build
    docker-compose run app ginkgo
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容

  • Kafka入门经典教程-Kafka-about云开发 http://www.aboutyun.com/threa...
    葡萄喃喃呓语阅读 10,825评论 4 54
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 本文转载自http://dataunion.org/?p=9307 背景介绍Kafka简介Kafka是一种分布式的...
    Bottle丶Fish阅读 5,467评论 0 34
  • 一、入门1、简介Kafka is a distributed,partitioned,replicated com...
    HxLiang阅读 3,346评论 0 9
  • 背景介绍 Kafka简介 Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下: 以时间复杂度为O...
    高广超阅读 12,830评论 8 167