使用Go语言编写一个简易的分布式系统

使用Go语言编写一个简易的分布式系统

想法

在一开始我要去学习用go语言编写一个分布式系统的时候。我会在想什么是分布式系统,分布式系统又跟以往的系统有什么很大的差异,或者说区别嘛。带着这个好奇,我去搜索一下什么才是真正的分布式系统,我以为是非常高深,又难以明白的一门学科,但是我仔细了解分布式系统的原理后。我发现我好像学过?!

以下资料是来源于我在网上搜索得出的信息:

一.概念

​ 集 群: 同一个业务,部署在多个服务器上

​ 分布式: 同一个业务,拆分成多个子业务,部署在不同的服务器上

​ 微服务: 同一个业务,按照功能模块拆分,每一个服务只对应一个功能模块

二.区别

集群是多台服务器一起处理同一个业务,可以使用负载均衡使得每一个服务器的负载相对平衡,集群中的一台服务器出现问题,该服务器所负责的业务可以由其他的服务器代为处理.集群是一种物理形态.

分布式是把一个业务拆分成多个子业务,给不同的服务器去处理,这里的服务器可以是单个的服务器,也可以是多个服务器集群,一旦处理该业务的服务器出现问题,那么该业务就无法实现了.分布式是一种工作方式.

微服务是把一个业务中的各种功能模块进行拆分,给不同的服务去处理,每个服务只处理一个功能模块,该服务可以是单个服务器也可以是多个服务器集群,每个服务之间都是低耦合的.微服务是一种架构风格.

为什么说分布式不一定是微服务:

​ 假设有一个很大应用,拆分成几个小应用,但还是很庞大,即便使用了分布式,但其依旧不算是微服务,因为微服务的核心要素是微小,简单来说就是这个应用还不够小(嗯..没错就是这样!)

​ 所以我们可以理解为:微服务是分布式的一个子集

三.应用场景

假设有一个业务,该业务有5个功能,每个功能单独处理需要1个小时.

此时,如果只部署一台服务器,则需要5个小时才能处理完该业务,若采用集群或者分布式来处理,结果如下:

​ 1.采用集群处理:提供5台服务器一起处理该业务,则处理每个功能只需12分钟,即处理整个业务只需1个小时

​ 2.采用分布式处理:提供5台服务器,每个服务器处理不同的功能,则一共也只需要一个小时.

​ 该情况下,微服务和分布式的工作原理和最终结果是一样的.

四.总结

​ 分布式中的每一个节点,都可以做集群.而集群并不一定就是分布式的.

​ 微服务肯定是分布式的,但分布式不一定是微服务的.

作者:晔歌歌
链接:https://www.jianshu.com/p/5f157ac8efcf
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

所以说分布式系统其实是一个非常广泛的概念,很多的应用都可以是一个分布式系统,所以我想以我曾经学过的知识微服务,这一方面去了解,或者说是使用:如何用go编写一个微服务,也就是分布式系统。

注:我觉得写go还用前后端耦合,并且还用模板,是非常愚蠢的行为。而微服务是天生前后端分离的(战术后仰)。

总体分为三个部分:服务注册,服务发现,状态监控。

服务注册

创建自定义的日志服务

实现基本逻辑

目的在于接受请求,并把请求写入到log里面,是很多应用必备的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package log

//因为标准库也有一个log,所以可以起一个别名
import (
"io/ioutil"
stlog "log"
"net/http"
"os"
)

var log *stlog.Logger

//目的在于把日志写入文件系统
type fileLog string

func (fl fileLog) Write(data []byte) (int, error) {
//首先要打开文件,才能写入
//fl文件路径,os...表示没有则创造,只写,只附加,
f, err := os.OpenFile(string(fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600)
if err != nil {
return 0, err
}
//defer表示程序最后一定会执行的,这句的意思是必须把文件关闭
defer f.Close()
return f.Write(data)
}

//最后把log指向某个文件地址
func Run(destination string) {
log = stlog.New(fileLog(destination), "go", stlog.LstdFlags)
}

//注册一个Handler
func RegisterHandler() {
http.HandleFunc("/log", func(writer http.ResponseWriter, request *http.Request) {
switch request.Method {
case http.MethodPost: //如果请求是post
msg, err := ioutil.ReadAll(request.Body) //先读取内容
if err != nil || len(msg) == 0 { //如果有错误
writer.WriteHeader(http.StatusBadRequest)
}
//没有错误则写入日志
write(string(msg))
default: //对于其他情况,方法就不进行,直接返回
writer.WriteHeader(http.StatusMethodNotAllowed)
return
}
})
}

func write(message string) {
log.Printf("%v\n", message)
}

先写好一个基本的日志服务的逻辑,逻辑较为简单。但还需要完善,接下来就要实现能够运行的日志服务。也就是说,还需要把web服务集中化管理,使其能够正常的运行。

接着创立一个service,去完善服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package service

import (
"context"
"fmt"
"log"
"net/http"
)

//公共的函数,用于启动服务
func Start(ctx context.Context, serviceName, host, port string,
registerHandlersFunc func()) (context.Context, error) {
//将传入的函数运行
registerHandlersFunc()

//对服务进行基本的定义,完善服务,并将信息返回给主函数
ctx = startService(ctx, serviceName, host, port)

return ctx, nil
}

func startService(ctx context.Context, name string, host string, port string) context.Context {
//使得ctx具有取消的功能
ctx, cancel := context.WithCancel(ctx)

//定义服务地址
var server http.Server
server.Addr = ":" + port

//
go func() {
//一旦发生可错误,就取消上下文
log.Println(server.ListenAndServe())
cancel()
}()

go func() {
fmt.Printf("%v 服务开始。按任意键停止. \n", name)
var s string
fmt.Scanln(&s)
server.Shutdown(ctx)
cancel()
}()

return ctx
}

这里完善了服务启动的逻辑,接着还需要去使这个服务能够正常的运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package main

import (
"context"
"fmt"
"go_distributed_system_study/log"
"go_distributed_system_study/service"
stlog "log"
)

func main() {
//定义日志地址
log.Run("./distribute.log")
//定义具体参数,其实通常应该由配置文件中定义
host, port := "localhost", "4000"
ctx, err := service.Start(
context.Background(),
"Log service",
host,
port,
log.RegisterHandler,
)
//如果有错误,就先执行标准库的log打印出结果
if err != nil {
stlog.Fatalln(err)
}
//接需要等待ctx的信号
//如果在启动服务器的时候出现了错误
//或者在按下任意键停止后,就会发送信号
<-ctx.Done()
//接受到信号后,就会继续
fmt.Println("停止服务")
}

测试

接着启动服务,并使用postman进行测试:

1
http://localhost:4000/log

输入任意文字,就会看到在根目录下,有一个日志文件生成了。

服务注册的基本逻辑

注册中心

首先需要去尝试编写一下,一个可以将服务都注册进去的注册中心。

先写一个数据结构,注册中心:

1
2
3
4
5
6
7
8
9
10
11
12
13
package registry

//注册中心
type Registration struct {
ServiceName ServiceName
ServiceURL string
}

type ServiceName string

const (
LogService = ServiceName("LogService")
)

注册中心包含了各个服务的名字的地址,紧接着,编写服务注册进去之后的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package registry

import (
"encoding/json"
"log"
"net/http"
"sync"
)

const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services"

type registry struct {
//这个slice,是动态变化的,而且多个线程可能会并发的进行访问,
//为了保证线程安全,需要加锁。
registrations []Registration
mutex *sync.Mutex
}

//在增加服务的时候是需要加锁的
func (registry *registry) add(reg Registration) error {
registry.mutex.Lock()
registry.registrations = append(registry.registrations, reg)
registry.mutex.Unlock()
return nil
}

var reg = registry{
registrations: make([]Registration, 0),
mutex: new(sync.Mutex),
}

//创建一个web服务
type RegistryService struct{}

func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println("接受请求:")
switch r.Method {
case http.MethodPost:
//解码
dec := json.NewDecoder(r.Body)
var r Registration
err := dec.Decode(&r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}

log.Printf("增加服务:%v ,该服务的地址是:%s \n",
r.ServiceName, r.ServiceURL)
err = reg.add(r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

在之后,需要让服务独立运行

独立服务

接着就需要将之前的服务,注册到服务中心中。

这就需要创建一个服务中心主要运行逻辑了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"context"
"fmt"
"go_distributed_system_study/registry"
"log"
"net/http"
)

func main() {
//将之前的处理逻辑注册进去
http.Handle("/services", registry.RegistryService{})

//接下来的逻辑一样,需要有取消功能,当然其实你在ide中能直接打断,但在大型服务中,每秒都要运行。还是需要自定义取消功能的
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

//一样是定义服务的地址
var srv http.Server
srv.Addr = registry.ServerPort

go func() {
log.Println(srv.ListenAndServe())
cancel()
}()

go func() {
fmt.Printf("注册中心 的服务开始。按任意键停止. \n")
var s string
fmt.Scanln(&s)
srv.Shutdown(ctx)
cancel()
}()

<-ctx.Done()
fmt.Println("结束服务注册")
}

其实和日志服务的注册类似,没什么特别的,接着测试:

测试

1
http://localhost:3000/services

接着输入json

1
2
3
4
5
{   
"serviceName": "study service",
"serviceURL" : " http://localhost/5000/study"

}

接着显示服务注册成功:

1
2
3
注册中心 的服务开始。按任意键停止.
接受请求:
增加服务:study service ,该服务的//localhost/5000/study

注册服务

微服务思想

首先微服务的基本含义是:注册中心是一个服务,然后其他的服务注册到注册中心,然后由主要控制台相互控制和调用。

那么现在会需要一个客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package registry

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)

//这个函数的目的是给web service发送一个post请求
func RegisterService(r Registration) error {
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
err := enc.Encode(r)
if err != nil {
return err
}

res, err := http.Post(ServicesURL, "application/json", buf)
if err != nil {
return err
}

if res.StatusCode != http.StatusOK {
return fmt.Errorf("服务注册失败 "+"状态码为: %v", res.StatusCode)
}
return nil
}

接着需要去改一下日志服务的逻辑,使得日志服务会主动去注册自己:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package service

import (
"context"
"fmt"
"go_distributed_system_study/registry"
"log"
"net/http"
)

//公共的函数,用于启动服务
func Start(ctx context.Context, host, port string, reg registry.Registration,
registerHandlersFunc func()) (context.Context, error) {
//将传入的函数运行
registerHandlersFunc()

//对服务进行基本的定义,完善服务,并将信息返回给主函数
ctx = startService(ctx, reg.ServiceName, host, port)

//启动web服务之后注册:
err := registry.RegisterService(reg)
if err != nil {
return ctx, err
}

return ctx, nil
}

func startService(ctx context.Context, name registry.ServiceName, host string, port string) context.Context {
//使得ctx具有取消的功能
ctx, cancel := context.WithCancel(ctx)

//定义服务地址
var server http.Server
server.Addr = ":" + port

//
go func() {
//一旦发生可错误,就取消上下文
log.Println(server.ListenAndServe())
cancel()
}()

go func() {
fmt.Printf("%v 服务开始。按任意键停止. \n", name)
var s string
fmt.Scanln(&s)
server.Shutdown(ctx)
cancel()
}()

return ctx
}

这的registry.RegisterService(reg)实际上会去调用client的func RegisterService(r Registration),这样会向注册中心发送一个post请求,去注册自己。

接着,要去改动日志服务的main函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"context"
"fmt"
"go_distributed_system_study/log"
"go_distributed_system_study/registry"
"go_distributed_system_study/service"
stlog "log"
)

func main() {
//定义日志地址
log.Run("./distribute.log")
//定义具体参数,其实通常应该由配置文件中定义
host, port := "localhost", "4000"

serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
r := registry.Registration{
ServiceName: "log service",
ServiceURL: serviceAddress,
}

ctx, err := service.Start(
context.Background(),
host,
port,
r,
log.RegisterHandler,
)
//如果有错误,就先执行标准库的log打印出结果
if err != nil {
stlog.Fatalln(err)
}
//接需要等待ctx的信号
//如果在启动服务器的时候出现了错误
//或者在按下任意键停止后,就会发送信号
<-ctx.Done()
//接受到信号后,就会继续
fmt.Println("停止服务")
}

主要是增加了serviceAddress ,也就是说所有服务都会使用这同一个逻辑。

测试

紧接着两个服务连续启动,先启动注册中心,后启动日志逻辑,结果如下:

1
2
3
4
5
6
7
注册中心 的服务开始。按任意键停止.

log service 服务开始。按任意键停止


接受请求:
增加服务:log service ,该服务的地址是:http://localhost:4000

很简单对吧,和Spring cloud的微服务简直一模一样。

取消注册

那么我们把微服务注册进去了,自然能够调用,但是怎么主动去取消微服务呢?它肯定不是说我自己把自己的微服务关了就行了,同时也需要通知注册中心。

修改注册中心

直接在注册中心加上一个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//那么肯定要有个方法取消服务
func (registry *registry) remove(url string) error {
//看看服务中台有没有这服务
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {
registry.mutex.Lock()
//把后面的接上前面的,自然的去除了,方法其实不唯一
reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
registry.mutex.Unlock()
return nil
}
}
return fmt.Errorf("服务地址未发现: %s ", url)
}

之后直接在Switch里面增加一个情况Delete:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
case http.MethodDelete:
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url := string(payload)
log.Printf("移除服务: %s", url)
err = reg.remove(url)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

代码很清晰,就是一模一样的移除服务,总体代码改动如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package registry

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
)

const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services"

type registry struct {
//这个slice,是动态变化的,而且多个线程可能会并发的进行访问,
//为了保证线程安全,需要加锁。
registrations []Registration
mutex *sync.Mutex
}

//在增加服务的时候是需要加锁的
//add方法表示增加服务
func (registry *registry) add(reg Registration) error {
registry.mutex.Lock()
registry.registrations = append(registry.registrations, reg)
registry.mutex.Unlock()
return nil
}

//那么肯定要有个方法取消服务
func (registry *registry) remove(url string) error {
//看看服务中台有没有这服务
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {
registry.mutex.Lock()
//把后面的接上前面的,自然的去除了,方法其实不唯一
reg.registrations = append(reg.registrations[:i], reg.registrations[:i+1]...)
registry.mutex.Unlock()
return nil
}
}
return fmt.Errorf("服务地址未发现: %s ", url)
}

var reg = registry{
registrations: make([]Registration, 0),
mutex: new(sync.Mutex),
}

//创建一个web服务
type RegistryService struct{}

func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println("接受请求:")
switch r.Method {
case http.MethodPost:
//解码
dec := json.NewDecoder(r.Body)
var r Registration
err := dec.Decode(&r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}

log.Printf("增加服务:%v ,该服务的地址是:%s \n",
r.ServiceName, r.ServiceURL)
err = reg.add(r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}

case http.MethodDelete:
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url := string(payload)
log.Printf("移除服务: %s", url)
err = reg.remove(url)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

注册中心取消服务的方法定义好了,那么也就需要在其他可注册服务的函数体中定义方法。为了进一步的解除耦合度,取消服务的方法和建立服务的方法一样,需要在client里面编写。

修改客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package registry

import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)

//这个函数的目的是给web service发送一个post请求
func RegisterService(r Registration) error {
buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
err := enc.Encode(r)
if err != nil {
return err
}

res, err := http.Post(ServicesURL, "application/json", buf)
if err != nil {
return err
}

if res.StatusCode != http.StatusOK {
return fmt.Errorf("服务注册失败 "+"状态码为: %v", res.StatusCode)
}
return nil
}

//结束服务
func ShutdownService(url string) error {
req, err := http.NewRequest(
http.MethodDelete, ServicesURL,
bytes.NewBuffer([]byte(url))) //把string转化为slice
if err != nil {
return err
}

req.Header.Add("Content-Type", "text/plain")
//紧接着发送请求
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("服务取消失败,状态码为:%v",
res.StatusCode)
}
return nil
}

增加了ShutdownService去结束这个服务。

紧接着,去开始服务注册的函数里边,进行取消注册的修改:

修改服务的注册功能

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package service

import (
"context"
"fmt"
"go_distributed_system_study/registry"
"log"
"net/http"
)

//公共的函数,用于启动服务
func Start(ctx context.Context, host, port string, reg registry.Registration,
registerHandlersFunc func()) (context.Context, error) {
//将传入的函数运行
registerHandlersFunc()

//对服务进行基本的定义,完善服务,并将信息返回给主函数
ctx = startService(ctx, reg.ServiceName, host, port)

//启动web服务之后注册:
err := registry.RegisterService(reg)
if err != nil {
return ctx, err
}

return ctx, nil
}

func startService(ctx context.Context, name registry.ServiceName, host string, port string) context.Context {
//使得ctx具有取消的功能
ctx, cancel := context.WithCancel(ctx)

//定义服务地址
var server http.Server
server.Addr = ":" + port

//
go func() {
//启动服务,一旦发生可错误,就取消
log.Println(server.ListenAndServe())
//调用取消服务的服务
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}
//
cancel()
}()

go func() {
fmt.Printf("%v 服务开始。按任意键停止. \n", name)
var s string
fmt.Scanln(&s)

//调用取消服务的服务
err := registry.ShutdownService(fmt.Sprintf("http://%s:%s", host, port))
if err != nil {
log.Println(err)
}

server.Shutdown(ctx)
cancel()
}()

return ctx
}

主要是两个goroutine的修改,使其具有取消服务的功能。

测试:

那么注册中心和服务的逻辑都修改好了,然后和上面的步骤一样,先启动注册中心,后启动日志逻辑,结果为:

1
2
3
4
5
6
7
8
9
10
11
12
注册中心 的服务开始。按任意键停止.
等待接受请求:
增加服务:log service ,该服务的地址是:http://localhost:4000

log service 服务开始。按任意键停止.
q
http: Server closed
停止服务


移除服务: http://localhost:4000
http: Server closed

服务发现

前面的服务注册都是一对一的,还体现不了分布式的特点。接下来进行多服务注册,使得一个学生成绩的服务既要使用日志服务,也要注册到注册中心

业务服务

基本的数据结构与方法

首先要编写一个学生的基础信息的数据结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package grades

import (
"fmt"
"sync"
)

//学生信息
type Student struct {
ID int
FirstName string
LastName string
Grades []Grade
}

//分数
type Grade struct {
Title string
Type GradeType
Score float32
}
type GradeType string

const ( //考试类型
GradeQuiz = GradeType("Quiz")
GradeTest = GradeType("Test")
GradeExam = GradeType("Exam")
)

//学生的平均成绩
func (s Student) Average() float32 {
var result float32
for _, grade := range s.Grades {
result += grade.Score
}

return result / float32(len(s.Grades))
}

//寻找学生 by ID
type Students []Student

func (ss Students) GetByID(id int) (*Student, error) {
for i := range ss {
if ss[i].ID == id {
return &ss[i], nil
}
}

return nil, fmt.Errorf("学生的ID: %d 未找到", id)
}

//用于外部的访问
var (
students Students
studentsMutex sync.Mutex
)

接着肯定得有一些学生的数据,来做测试,这些数据一开始就会被加载进数据结构中,这暂时是用来代替数据库的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package grades

func init() {
students = []Student{
{
ID: 1,
FirstName: "Nick",
LastName: "Carter",
Grades: []Grade{
{
Title: "Quiz 1",
Type: GradeQuiz,
Score: 85,
},
{
Title: "Final Exam",
Type: GradeExam,
Score: 94,
},
{
Title: "Quiz 2",
Type: GradeQuiz,
Score: 82,
},
},
},
{
ID: 2,
FirstName: "Roberto",
LastName: "Baggio",
Grades: []Grade{
{
Title: "Quiz 1",
Type: GradeQuiz,
Score: 100,
},
{
Title: "Final Exam",
Type: GradeExam,
Score: 100,
},
{
Title: "Quiz 2",
Type: GradeQuiz,
Score: 81,
},
},
},
{
ID: 3,
FirstName: "Emma",
LastName: "Stone",
Grades: []Grade{
{
Title: "Quiz 1",
Type: GradeQuiz,
Score: 67,
},
{
Title: "Final Exam",
Type: GradeExam,
Score: 0,
},
{
Title: "Quiz 2",
Type: GradeQuiz,
Score: 75,
},
},
},
{
ID: 4,
FirstName: "Rachel",
LastName: "McAdams",
Grades: []Grade{
{
Title: "Quiz 1",
Type: GradeQuiz,
Score: 98,
},
{
Title: "Final Exam",
Type: GradeExam,
Score: 99,
},
{
Title: "Quiz 2",
Type: GradeQuiz,
Score: 94,
},
},
},
{
ID: 5,
FirstName: "Kelly",
LastName: "Clarkson",
Grades: []Grade{
{
Title: "Quiz 1",
Type: GradeQuiz,
Score: 95,
},
{
Title: "Final Exam",
Type: GradeExam,
Score: 100,
},
{
Title: "Quiz 2",
Type: GradeQuiz,
Score: 97,
},
},
},
}
}

那么,也肯定要有server,才能正常的启动服务,需要去编写基本的逻辑,比如获取全部学生信息,根据ID进行信息搜索,增加学生信息等功能:

服务的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package grades

import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"strings"
)

func RegisterHandlers() {
handler := new(studentsHandler)
//这个两个地址是不同的,一个是单个页面
http.Handle("/students", handler)
//另一个是必须传入参数的页面
http.Handle("/students/", handler)
}

type studentsHandler struct{}

//所以一个链接需要处理很多情况
// /students 分割后长度是2
// /students/{id} 分割后长度是3
// /students/{id}/grades 分割后长度是4
func (sh studentsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//简单对字符进行处理:
pathSegments := strings.Split(r.URL.Path, "/")
switch len(pathSegments) {
case 2: //获取全部信息
sh.getAll(w, r)
case 3: //查询
//提取id
id, err := strconv.Atoi(pathSegments[2])
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
sh.getOne(w, r, id)
case 4: //新增
id, err := strconv.Atoi(pathSegments[2])
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
sh.addGrade(w, r, id)
default:
w.WriteHeader(http.StatusNotFound)
}
}

//获取全部学生信息
func (sh studentsHandler) getAll(w http.ResponseWriter, r *http.Request) {
studentsMutex.Lock()
defer studentsMutex.Unlock()

//将学生信息全部转为JSON,返回给data,最后写入
data, err := sh.toJSON(students)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Println(err)
return
}
w.Header().Add("Content-Type", "application/json")
w.Write(data)
}

//根据ID,搜索学生信息
func (sh studentsHandler) getOne(w http.ResponseWriter, r *http.Request, id int) {
studentsMutex.Lock()
defer studentsMutex.Unlock()

student, err := students.GetByID(id)
if err != nil {
w.WriteHeader(http.StatusNotFound)
log.Println(err)
return
}

//逻辑与获取全部信息几乎一致
data, err := sh.toJSON(student)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Printf("学生信息序列化失败: %q", err)
return
}
w.Header().Add("Content-Type", "application/json")
w.Write(data)
}

//增加学生成绩信息
func (sh studentsHandler) addGrade(w http.ResponseWriter, r *http.Request, id int) {
studentsMutex.Lock()
defer studentsMutex.Unlock()

student, err := students.GetByID(id)
if err != nil {
w.WriteHeader(http.StatusNotFound)
log.Println(err)
return
}
//上面的逻辑一样的,没什么好说

//接着要从URL中获取到要传达的学生成绩信息
var g Grade
dec := json.NewDecoder(r.Body)
err = dec.Decode(&g)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
log.Println(err)
return
}
//这部分逻辑是获取成绩信息
//如果学生成绩获取正确,就附加信息
student.Grades = append(student.Grades, g)
w.WriteHeader(http.StatusCreated) //201

data, err := sh.toJSON(g)
if err != nil {
log.Println(err)
return
}
w.Header().Add("Content-Type", "applicaiton/json")
w.Write(data)
}

//转化为JSON
func (sh studentsHandler) toJSON(obj interface{}) ([]byte, error) {
var b bytes.Buffer
enc := json.NewEncoder(&b)
err := enc.Encode(obj)
if err != nil {
return nil, fmt.Errorf("学生信息序列化失败: %q", err)
}
return b.Bytes(), nil
}

这样的话,基础逻辑也已经完善了,接着就是在注册中心里增加服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
package registry

//注册中心
type Registration struct {
ServiceName ServiceName
ServiceURL string
}

type ServiceName string

const (
LogService = ServiceName("LogService")
GradingService = ServiceName("GradingService")
)

最后,得让web服务可以运行,在cmd文件夹下创建一个新的main函数,写入一样的逻辑代码:

服务启动器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package main

import (
"context"
"go_distributed_system_study/grades"
"go_distributed_system_study/registry"
"go_distributed_system_study/service"

"fmt"
stlog "log"
)

func main() {
host, port := "localhost", "6000"
serviceAddress := fmt.Sprintf("http://%v:%v", host, port)

r := registry.Registration{
ServiceName: registry.GradingService,
ServiceURL: serviceAddress,
}
ctx, err := service.Start(context.Background(),
host,
port,
r,
grades.RegisterHandlers)
if err != nil {
stlog.Fatal(err)
}
<-ctx.Done()
fmt.Println("grading service 服务停止了")
}

这么下来,这个业务服务也就完成了,他们可以互相不干扰的进行服务注册,但是现在grade服务还不能去调用日志服务。所以我们还需要服务发现。

服务发现

服务发现作用能让grade服务可以请求log服务

去引用日志服务

首先肯定是要给服务的数据结构增加一些基本信息,这样才能使得服务有这些基本的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package registry

//注册中心
type Registration struct {
ServiceName ServiceName
ServiceURL string

//该服务所依赖的其他服务。用slice去保存
RequiredServices []ServiceName
//向外暴露的服务端口
ServiceUpdateURL string
}

type ServiceName string

const (
LogService = ServiceName("LogService")
GradingService = ServiceName("GradingService")
)

//每一条目
type patchEntry struct {
Name ServiceName
URL string
}

//服务变化
type patch struct {
Added []patchEntry
Removed []patchEntry
}

接下来可以想想,一个服务如果还依赖着其他的服务。那么,当这个服务正要注册的时候,或者说要加入服务群体的时候。就会在:

1
func (registry *registry) add(reg Registration)

进行服务注册,那么这个时候如果服务还依赖其他服务,比如正要注册的grade服务还依赖log服务,这时候就正好可以去获取依赖。

修改后代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package registry

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
)

const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services"

type registry struct {
//这个slice,是动态变化的,而且多个线程可能会并发的进行访问,
//为了保证线程安全,需要加锁。
registrations []Registration
mutex *sync.RWMutex
}

//在增加服务的时候是需要加锁的
//add方法表示增加服务
func (registry *registry) add(reg Registration) error {
registry.mutex.Lock()
registry.registrations = append(registry.registrations, reg)
registry.mutex.Unlock()

//正好去获取这个服务所依赖的其他服务
err := registry.sendRequireServices(reg)

return err
}

//获取其他服务
func (registry *registry) sendRequireServices(reg Registration) error {
registry.mutex.RLock()
defer registry.mutex.RUnlock()

//寻找服务
var p patch
for _, serviceReg := range registry.registrations {
for _, reqService := range reg.RequiredServices {
if serviceReg.ServiceName == reqService {
p.Added = append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
//找到之后,注册
err := registry.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
return err
}
return nil
}

//将需要的服务发送过去注册的过程
func (r registry) sendPatch(p patch, url string) error {
d, err := json.Marshal(p)
if err != nil {
return err
}
_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
if err != nil {
return err
}
return nil
}

//那么肯定要有个方法取消服务
func (registry *registry) remove(url string) error {
//看看服务中台有没有这服务
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {
registry.mutex.Lock()
//把后面的接上前面的,自然的去除了,方法其实不唯一
reg.registrations = append(reg.registrations[:i], reg.registrations[:i+1]...)
//reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
registry.mutex.Unlock()
return nil
}
}
return fmt.Errorf("服务地址未发现: %s ", url)
}

var reg = registry{
registrations: make([]Registration, 0),
mutex: new(sync.RWMutex),
}

//创建一个web服务
type RegistryService struct{}

func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println("等待接受请求:")
switch r.Method {
case http.MethodPost:
//解码
dec := json.NewDecoder(r.Body)
var r Registration
err := dec.Decode(&r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}

log.Printf("增加服务:%v ,该服务的地址是:%s \n",
r.ServiceName, r.ServiceURL)
err = reg.add(r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}

case http.MethodDelete:
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url := string(payload)
log.Printf("移除服务: %s", url)
err = reg.remove(url)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

这是一个找到所需要服务并将其注册的过程。

接着,grade服务会向注册中心请求这些服务,但是注册中心也需要地方去存储这些请求的服务。

log服务就会向grade服务提供服务,那么会需要一些数据结构去存储:

1
2
3
4
5
6
7
//log服务会向多个服务提供服务
type providers struct {
//服务与服务的URL
services map[ServiceName][]string
//互斥锁
mutex *sync.RWMutex
}

然后去实现它的逻辑,总体修改后代码如下:

服务的提供者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package registry

import (
"bytes"
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"sync"
)

//这个函数的目的是给web service发送一个post请求
func RegisterService(r Registration) error {

//服务注册中心要向URL更新一些信息
serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL)
if err != nil {
return err
}
http.Handle(serviceUpdateURL.Path, &serviceUpdateHandler{})
//

buf := new(bytes.Buffer)
enc := json.NewEncoder(buf)
err = enc.Encode(r)
if err != nil {
return err
}

res, err := http.Post(ServicesURL, "application/json", buf)
if err != nil {
return err
}

if res.StatusCode != http.StatusOK {
return fmt.Errorf("服务注册失败 "+"状态码为: %v", res.StatusCode)
}
return nil
}

//更新服务的处理
type serviceUpdateHandler struct{}

func (suh serviceUpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
//先进行解码
dec := json.NewDecoder(r.Body)
var p patch
err := dec.Decode(&p)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}
//fmt.Printf("收到更新: %v\n", p)
prov.Update(p)
}

//结束服务
func ShutdownService(url string) error {
req, err := http.NewRequest(
http.MethodDelete, ServicesURL,
bytes.NewBuffer([]byte(url))) //把string转化为slice
if err != nil {
return err
}

req.Header.Add("Content-Type", "text/plain")
//紧接着发送请求
res, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusOK {
return fmt.Errorf("服务取消失败,状态码为:%v",
res.StatusCode)
}
return nil
}

//log服务会向多个服务提供服务
type providers struct {
//服务与服务的URL
services map[ServiceName][]string
//互斥锁
mutex *sync.RWMutex
}

//接受到patch的时候,需要进行更新,
var prov = providers{
services: make(map[ServiceName][]string),
mutex: new(sync.RWMutex),
}

//实现
func (p *providers) Update(pat patch) {
//对传进来的patch更新provider
p.mutex.Lock()
defer p.mutex.Unlock()

//新增的情况
for _, patchEntry := range pat.Added {
//如果这个服务名目前还不存在,就创建新的slice
if _, ok := p.services[patchEntry.Name]; !ok {
p.services[patchEntry.Name] = make([]string, 0)
}
//如果存在的话,就在值后边附加URL
p.services[patchEntry.Name] = append(p.services[patchEntry.Name],
patchEntry.URL)
}
//减少的情况
//遍历,对比,移除
for _, patchEntry := range pat.Removed {
if providerURLs, ok := p.services[patchEntry.Name]; ok {
for i := range providerURLs {
if providerURLs[i] == patchEntry.URL {
p.services[patchEntry.Name] = append(providerURLs[:i],
providerURLs[i+1:]...)
}
}
}
}
}

//然后还需要,根据服务的名称来找到它所依赖服务的url
func (p providers) get(name ServiceName) (string, error) {
providers, ok := p.services[name]
if !ok {
return "", fmt.Errorf("没有可提供服务的提供商: %v", name)
}
//随机数
idx := int(rand.Float32() * float32(len(providers)))
return providers[idx], nil
}

//由于这个get方法是私有的,对外再套一个函数:
func GetProvider(name ServiceName) (string, error) {
return prov.get(name)
}

客户端的client

log服务现在有服务端的逻辑,但是客户端的服务想使用这个client还是比较麻烦的,所以还需要对log服务有一个自己的client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package log

import (
"bytes"
"fmt"
"go_distributed_system_study/registry"
"net/http"

stlog "log"
)

//写日志,把日志写到server
func SetClientLogger(serviceURL string, clientService registry.ServiceName) {
stlog.SetPrefix(fmt.Sprintf("[%v] - ", clientService))
stlog.SetFlags(0)
stlog.SetOutput(&clientLogger{url: serviceURL})
}

type clientLogger struct {
url string
}

func (cl clientLogger) Write(data []byte) (int, error) {
b := bytes.NewBuffer([]byte(data))
//写到服务端
res, err := http.Post(cl.url+"/log", "text/plain", b)
if err != nil {
return 0, err
}
if res.StatusCode != http.StatusOK {
return 0, fmt.Errorf("Failed to send log message. Service responded with %d - %s", res.StatusCode, res.Status)
}
//如果都没有问题,返回数据
return len(data), nil
}

这样就可以让本地的日志服务写好日志后发送到服务器端保存

使main函数具有服务发现的功能

主要是使得两个启动器拥有新的功能:

grading service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"context"
"go_distributed_system_study/grades"
"go_distributed_system_study/log"
"go_distributed_system_study/registry"
"go_distributed_system_study/service"

"fmt"
stlog "log"
)

func main() {
host, port := "localhost", "6000"
serviceAddress := fmt.Sprintf("http://%v:%v", host, port)

r := registry.Registration{
ServiceName: registry.GradingService,
ServiceURL: serviceAddress,
//添加两个信息
RequiredServices: []registry.ServiceName{registry.LogService},
ServiceUpdateURL: serviceAddress + "/services",
}
ctx, err := service.Start(context.Background(),
host,
port,
r,
grades.RegisterHandlers)
if err != nil {
stlog.Fatal(err)
}

//在服务启动之后使用log服务
if logProvider, err := registry.GetProvider(registry.LogService); err == nil {
fmt.Printf("发现日志服务: %s\n", logProvider)
log.SetClientLogger(logProvider, r.ServiceName)
}

<-ctx.Done()
fmt.Println("grading service 服务停止了")
}
log service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package main

import (
"context"
"fmt"
"go_distributed_system_study/log"
"go_distributed_system_study/registry"
"go_distributed_system_study/service"
stlog "log"
)

func main() {
//定义日志地址
log.Run("./distribute.log")
//定义具体参数,其实通常应该由配置文件中定义
host, port := "localhost", "4000"

serviceAddress := fmt.Sprintf("http://%s:%s", host, port)
r := registry.Registration{
ServiceName: registry.LogService,
ServiceURL: serviceAddress,
//添加两个信息
RequiredServices: make([]registry.ServiceName, 0),
ServiceUpdateURL: serviceAddress + "/services",
}

ctx, err := service.Start(
context.Background(),
host,
port,
r,
log.RegisterHandler,
)
//如果有错误,就先执行标准库的log打印出结果
if err != nil {
stlog.Fatalln(err)
}
//接需要等待ctx的信号
//如果在启动服务器的时候出现了错误
//或者在按下任意键停止后,就会发送信号
<-ctx.Done()
//接受到信号后,就会继续
fmt.Println("停止服务")
}

接下来便可以测试了。

测试

按照:registryservice,logservice,gradingservice的顺序启动,测试结果如:

1
2
3
4
5
6
7
8
9
10
注册中心 的服务开始。按任意键停止.
等待接受请求:
增加服务:LogService ,该服务的地址是:http://localhost:4000
等待接受请求:
增加服务:GradingService ,该服务的地址是:http://localhost:6000

LogService 服务开始。按任意键停止.

GradingService 服务开始。按任意键停止.
发现日志服务: http://localhost:4000

这么一来就完成了。

依赖变化

重新发现服务

可以从上述的情况下看到一些不那么方便的点,一是:启动必须按照顺序来,不能随意。二是:当log服务下线后,再上线的话不会被再次发现。这都是服务极为脆弱的表现。那么解决这个问题的最好方法是:使服务具有依赖变化时进行通知的功能。

可以在服务中更改,使其具备通知的功能,主要是有notify函数,总体修改如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package registry

import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"sync"
)

const ServerPort = ":3000"
const ServicesURL = "http://localhost" + ServerPort + "/services"

type registry struct {
//这个slice,是动态变化的,而且多个线程可能会并发的进行访问,
//为了保证线程安全,需要加锁。
registrations []Registration
mutex *sync.RWMutex
}

//在增加服务的时候是需要加锁的
//add方法表示增加服务
func (registry *registry) add(reg Registration) error {
registry.mutex.Lock()
registry.registrations = append(registry.registrations, reg)
registry.mutex.Unlock()

//正好去获取这个服务所依赖的其他服务
err := registry.sendRequireServices(reg)

//服务通知,当服务上线,而这个服务又被依赖时,告知依赖服务自己上线了
registry.notify(patch{
Added: []patchEntry{
patchEntry{
Name: reg.ServiceName,
URL: reg.ServiceURL,
},
},
})

return err
}

//获取其他服务
func (registry *registry) sendRequireServices(reg Registration) error {
registry.mutex.RLock()
defer registry.mutex.RUnlock()

//寻找服务
var p patch
for _, serviceReg := range registry.registrations {
for _, reqService := range reg.RequiredServices {
if serviceReg.ServiceName == reqService {
p.Added = append(p.Added, patchEntry{
Name: serviceReg.ServiceName,
URL: serviceReg.ServiceURL,
})
}
}
}
//找到之后,注册
err := registry.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
return err
}
return nil
}

//通知其他服务
func (r registry) notify(fullPatch patch) {
r.mutex.RLock()
defer r.mutex.RUnlock()

//看看服务的依赖在patch里面存不存在
//对已经注册的服务循环遍历
for _, reg := range r.registrations {
//并发的发出通知
go func(reg Registration) {
//对服务所需要的服务进行循环
for _, reqService := range reg.RequiredServices {
p := patch{Added: []patchEntry{}, Removed: []patchEntry{}}
//标志位,为TRUE表示有需要更新的地方
sendUpdate := false
for _, added := range fullPatch.Added {
//如果添加的服务正好是某个服务的依赖项
if added.Name == reqService {
p.Added = append(p.Added, added)
sendUpdate = true
}
}
//看看有哪些服务停止了
for _, removed := range fullPatch.Removed {
///如果停掉的服务正好是所被依赖的服务
if removed.Name == reqService {
p.Removed = append(p.Removed, removed)
sendUpdate = true
}
}
//最后判断标志位,把更新发送到对应的服务
if sendUpdate {
err := r.sendPatch(p, reg.ServiceUpdateURL)
if err != nil {
log.Println(err)
return
}
}

}
}(reg)
}
}

//将需要的服务发送过去注册的过程
func (r registry) sendPatch(p patch, url string) error {
d, err := json.Marshal(p)
if err != nil {
return err
}
_, err = http.Post(url, "application/json", bytes.NewBuffer(d))
if err != nil {
return err
}
return nil
}

//那么肯定要有个方法取消服务
func (registry *registry) remove(url string) error {
//看看服务中台有没有这服务
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {
registry.mutex.Lock()
//把后面的接上前面的,自然的去除了,方法其实不唯一
reg.registrations = append(reg.registrations[:i], reg.registrations[:i+1]...)
//reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
registry.mutex.Unlock()
return nil
}
}
return fmt.Errorf("服务地址未发现: %s ", url)
}

var reg = registry{
registrations: make([]Registration, 0),
mutex: new(sync.RWMutex),
}

//创建一个web服务
type RegistryService struct{}

func (s RegistryService) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Println("等待接受请求:")
switch r.Method {
case http.MethodPost:
//解码
dec := json.NewDecoder(r.Body)
var r Registration
err := dec.Decode(&r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}

log.Printf("增加服务:%v ,该服务的地址是:%s \n",
r.ServiceName, r.ServiceURL)
err = reg.add(r)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusBadRequest)
return
}

case http.MethodDelete:
payload, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
url := string(payload)
log.Printf("移除服务: %s", url)
err = reg.remove(url)
if err != nil {
log.Println(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

default:
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
}

测试1

接着再进行测试,可以看到,当log服务下线后,重新上线时,grading 服务就能够发现log服务了。

1
2
3
4
收到更新: {[{LogService http://localhost:4000} {LogService http://localhost:4000}] []}
发现日志服务: http://localhost:4000
收到更新: {[{LogService http://localhost:4000}] []}
收到更新: {[{LogService http://localhost:4000}] []}

服务下线告知

接着也容易,把remove方法里面添加下线告知的功能就行了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (registry *registry) remove(url string) error {
//看看服务中台有没有这服务
for i := range reg.registrations {
if reg.registrations[i].ServiceURL == url {

//下线也告知
registry.notify(patch{
Removed: []patchEntry{
{
Name: registry.registrations[i].ServiceName,
URL: registry.registrations[i].ServiceURL,
},
},
})
//下线告知

registry.mutex.Lock()
//把后面的接上前面的,自然的去除了,方法其实不唯一
reg.registrations = append(reg.registrations[:i], reg.registrations[:i+1]...)
//reg.registrations = append(reg.registrations[:i], reg.registrations[i+1:]...)
registry.mutex.Unlock()
return nil
}
}
return fmt.Errorf("服务地址未发现: %s ", url)
}

这样,这部分逻辑就完成了。接下来测试代码。

测试2

测试的步骤是先开启注册中心,再开启日志服务,后开始grade服务。然后使得日志服务停止,再重启。可以看到一系列的结果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
注册中心 的服务开始。按任意键停止.
等待接受请求:
增加服务:LogService ,该服务的地址是:http://localhost:4000
等待接受请求:
增加服务:GradingService ,该服务的地址是:http://localhost:6000
移除服务: http://localhost:4000
等待接受请求:
增加服务:LogService ,该服务的地址是:http://localhost:4000


LogService 服务开始。按任意键停止.
收到更新: {[] []}
http: Server closed
停止服务
LogService 服务开始。按任意键停止


GradingService 服务开始。按任意键停止.
收到更新: {[{LogService http://localhost:4000}] []}
发现日志服务: http://localhost:4000
收到更新: {[] [{LogService http://localhost:4000}]}
收到更新: {[{LogService http://localhost:4000}] []}

使用解除了耦合的网络接口,这也是Spring Cloud的微服务思想,同时也是分布式的一种类型。所以说分布式也没什么神奇之处,最核心的一处在于:把本地接口转化为了网络接口。能够理解这一过程,也就理解了分布式的思想。