使用Go语言编写一个简易的分布式系统 想法 在一开始我要去学习用go语言编写一个分布式系统的时候。我会在想什么是分布式系统,分布式系统又跟以往的系统有什么很大的差异,或者说区别嘛。带着这个好奇,我去搜索一下什么才是真正的分布式系统,我以为是非常高深,又难以明白的一门学科,但是我仔细了解分布式系统的原理后。我发现我好像学过?!
以下资料是来源于我在网上搜索得出的信息:
一.概念 集 群: 同一个业务,部署在多个服务器上
分布式: 同一个业务,拆分成多个子业务,部署在不同的服务器上
微服务: 同一个业务,按照功能模块拆分,每一个服务只对应一个功能模块
二.区别 集群 是多台服务器一起处理同一个业务,可以使用负载均衡使得每一个服务器的负载相对平衡,集群中的一台服务器出现问题,该服务器所负责的业务可以由其他的服务器代为处理.集群是一种物理形态.
分布式 是把一个业务拆分成多个子业务,给不同的服务器去处理,这里的服务器可以是单个的服务器,也可以是多个服务器集群,一旦处理该业务的服务器出现问题,那么该业务就无法实现了.分布式是一种工作方式.
微服务 是把一个业务中的各种功能模块进行拆分,给不同的服务去处理,每个服务只处理一个功能模块,该服务可以是单个服务器也可以是多个服务器集群,每个服务之间都是低耦合的.微服务是一种架构风格.
为什么说分布式不一定是微服务:
假设有一个很大应用,拆分成几个小应用,但还是很庞大,即便使用了分布式,但其依旧不算是微服务,因为微服务的核心要素是微小,简单来说就是这个应用还不够小(嗯..没错就是这样!)
所以我们可以理解为:微服务是分布式的一个子集
三.应用场景 假设有一个业务,该业务有5个功能,每个功能单独处理需要1个小时.
此时,如果只部署一台服务器,则需要5个小时才能处理完该业务,若采用集群或者分布式来处理,结果如下:
1.采用集群处理:提供5台服务器一起处理该业务,则处理每个功能只需12分钟,即处理整个业务只需1个小时
2.采用分布式处理:提供5台服务器,每个服务器处理不同的功能,则一共也只需要一个小时.
该情况下,微服务和分布式的工作原理和最终结果是一样的.
四.总结 分布式中的每一个节点,都可以做集群.而集群并不一定就是分布式的.
微服务肯定是分布式的,但分布式不一定是微服务的.
作者:晔歌歌 链接:https://www.jianshu.com/p/5f157ac8efcf 来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
所以说分布式系统其实是一个非常广泛的概念,很多的应用都可以是一个分布式系统,所以我想以我曾经学过的知识微服务,这一方面去了解,或者说是使用:如何用go编写一个微服务 ,也就是分布式系统。
注:我觉得写go还用前后端耦合,并且还用模板,是非常愚蠢的行为。而微服务是天生前后端分离的(战术后仰)。
总体分为三个部分:服务注册,服务发现,状态监控。
服务注册 创建自定义的日志服务 实现基本逻辑 目的在于接受请求,并把请求写入到log里面,是很多应用必备的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package logimport ( "io/ioutil" stlog "log" "net/http" "os" ) var log *stlog.Loggertype fileLog string func (fl fileLog) Write (data []byte ) (int , error) { f, err := os.OpenFile(string (fl), os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0600 ) if err != nil { return 0 , err } defer f.Close() return f.Write(data) } func Run (destination string ) { log = stlog.New(fileLog(destination), "go" , stlog.LstdFlags) } func RegisterHandler () { http.HandleFunc("/log" , func (writer http.ResponseWriter, request *http.Request) { switch request.Method { case http.MethodPost: msg, err := ioutil.ReadAll(request.Body) if err != nil || len (msg) == 0 { writer.WriteHeader(http.StatusBadRequest) } write(string (msg)) default : writer.WriteHeader(http.StatusMethodNotAllowed) return } }) } func write (message string ) { log.Printf("%v\n" , message) }
先写好一个基本的日志服务的逻辑,逻辑较为简单。但还需要完善,接下来就要实现能够运行的日志服务。也就是说,还需要把web服务集中化管理,使其能够正常的运行。
接着创立一个service,去完善服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package serviceimport ( "context" "fmt" "log" "net/http" ) func Start (ctx context.Context, serviceName, host, port string , registerHandlersFunc func () ) (context.Context, error) { registerHandlersFunc() ctx = startService(ctx, serviceName, host, port) return ctx, nil } func startService (ctx context.Context, name string , host string , port string ) context .Context { ctx, cancel := context.WithCancel(ctx) var server http.Server server.Addr = ":" + port go func () { log.Println(server.ListenAndServe()) cancel() }() go func () { fmt.Printf("%v 服务开始。按任意键停止. \n" , name) var s string fmt.Scanln(&s) server.Shutdown(ctx) cancel() }() return ctx }
这里完善了服务启动的逻辑,接着还需要去使这个服务能够正常的运行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 package mainimport ( "context" "fmt" "go_distributed_system_study/log" "go_distributed_system_study/service" stlog "log" ) func main () { log.Run("./distribute.log" ) host, port := "localhost" , "4000" ctx, err := service.Start( context.Background(), "Log service" , host, port, log.RegisterHandler, ) if err != nil { stlog.Fatalln(err) } <-ctx.Done() fmt.Println("停止服务" ) }
测试 接着启动服务,并使用postman进行测试:
1 http://localhost:4000/log
输入任意文字,就会看到在根目录下,有一个日志文件生成了。
服务注册的基本逻辑 注册中心 首先需要去尝试编写一下,一个可以将服务都注册进去的注册中心。
先写一个数据结构,注册中心:
1 2 3 4 5 6 7 8 9 10 11 12 13 package registrytype Registration struct { ServiceName ServiceName ServiceURL string } type ServiceName string const ( LogService = ServiceName("LogService" ) )
注册中心包含了各个服务的名字的地址,紧接着,编写服务注册进去之后的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 package registryimport ( "encoding/json" "log" "net/http" "sync" ) const ServerPort = ":3000" const ServicesURL = "http://localhost" + ServerPort + "/services" type registry struct { registrations []Registration mutex *sync.Mutex } func (registry *registry) add (reg Registration) error { registry.mutex.Lock() registry.registrations = append (registry.registrations, reg) registry.mutex.Unlock() return nil } var reg = registry{ registrations: make ([]Registration, 0 ), mutex: new (sync.Mutex), } type RegistryService struct {}func (s RegistryService) ServeHTTP (w http.ResponseWriter, r *http.Request) { log.Println("接受请求:" ) switch r.Method { case http.MethodPost: dec := json.NewDecoder(r.Body) var r Registration err := dec.Decode(&r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } log.Printf("增加服务:%v ,该服务的地址是:%s \n" , r.ServiceName, r.ServiceURL) err = reg.add(r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } default : w.WriteHeader(http.StatusMethodNotAllowed) return } }
在之后,需要让服务独立运行
独立服务 接着就需要将之前的服务,注册到服务中心中。
这就需要创建一个服务中心主要运行逻辑了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package mainimport ( "context" "fmt" "go_distributed_system_study/registry" "log" "net/http" ) func main () { http.Handle("/services" , registry.RegistryService{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() var srv http.Server srv.Addr = registry.ServerPort go func () { log.Println(srv.ListenAndServe()) cancel() }() go func () { fmt.Printf("注册中心 的服务开始。按任意键停止. \n" ) var s string fmt.Scanln(&s) srv.Shutdown(ctx) cancel() }() <-ctx.Done() fmt.Println("结束服务注册" ) }
其实和日志服务的注册类似,没什么特别的,接着测试:
测试 1 http://localhost:3000/services
接着输入json
1 2 3 4 5 { "serviceName" : "study service" , "serviceURL" : " http://localhost/5000/study" }
接着显示服务注册成功:
1 2 3 注册中心 的服务开始。按任意键停止. 接受请求: 增加服务:study service ,该服务的//localhost/5000 /study
注册服务 微服务思想 首先微服务的基本含义是:注册中心是一个服务,然后其他的服务注册到注册中心,然后由主要控制台相互控制和调用。
那么现在会需要一个客户端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package registryimport ( "bytes" "encoding/json" "fmt" "net/http" ) func RegisterService (r Registration) error { buf := new (bytes.Buffer) enc := json.NewEncoder(buf) err := enc.Encode(r) if err != nil { return err } res, err := http.Post(ServicesURL, "application/json" , buf) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("服务注册失败 " +"状态码为: %v" , res.StatusCode) } return nil }
接着需要去改一下日志服务的逻辑,使得日志服务会主动去注册自己:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 package serviceimport ( "context" "fmt" "go_distributed_system_study/registry" "log" "net/http" ) func Start (ctx context.Context, host, port string , reg registry.Registration, registerHandlersFunc func () ) (context.Context, error) { registerHandlersFunc() ctx = startService(ctx, reg.ServiceName, host, port) err := registry.RegisterService(reg) if err != nil { return ctx, err } return ctx, nil } func startService (ctx context.Context, name registry.ServiceName, host string , port string ) context .Context { ctx, cancel := context.WithCancel(ctx) var server http.Server server.Addr = ":" + port go func () { log.Println(server.ListenAndServe()) cancel() }() go func () { fmt.Printf("%v 服务开始。按任意键停止. \n" , name) var s string fmt.Scanln(&s) server.Shutdown(ctx) cancel() }() return ctx }
这的registry.RegisterService(reg) 实际上会去调用client的func RegisterService(r Registration) ,这样会向注册中心发送一个post请求,去注册自己。
接着,要去改动日志服务的main函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package mainimport ( "context" "fmt" "go_distributed_system_study/log" "go_distributed_system_study/registry" "go_distributed_system_study/service" stlog "log" ) func main () { log.Run("./distribute.log" ) host, port := "localhost" , "4000" serviceAddress := fmt.Sprintf("http://%s:%s" , host, port) r := registry.Registration{ ServiceName: "log service" , ServiceURL: serviceAddress, } ctx, err := service.Start( context.Background(), host, port, r, log.RegisterHandler, ) if err != nil { stlog.Fatalln(err) } <-ctx.Done() fmt.Println("停止服务" ) }
主要是增加了serviceAddress ,也就是说所有服务都会使用这同一个逻辑。
测试 紧接着两个服务连续启动,先启动注册中心,后启动日志逻辑,结果如下:
1 2 3 4 5 6 7 注册中心 的服务开始。按任意键停止. log service 服务开始。按任意键停止 接受请求: 增加服务:log service ,该服务的地址是:http://localhost:4000
很简单对吧,和Spring cloud的微服务简直一模一样。
取消注册 那么我们把微服务注册进去了,自然能够调用,但是怎么主动去取消微服务呢?它肯定不是说我自己把自己的微服务关了就行了,同时也需要通知注册中心。
修改注册中心 直接在注册中心加上一个方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func (registry *registry) remove (url string ) error { for i := range reg.registrations { if reg.registrations[i].ServiceURL == url { registry.mutex.Lock() reg.registrations = append (reg.registrations[:i], reg.registrations[i+1 :]...) registry.mutex.Unlock() return nil } } return fmt.Errorf("服务地址未发现: %s " , url) }
之后直接在Switch里面增加一个情况Delete:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 case http.MethodDelete: payload, err := ioutil.ReadAll(r.Body) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } url := string (payload) log.Printf("移除服务: %s" , url) err = reg.remove(url) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return }
代码很清晰,就是一模一样的移除服务,总体代码改动如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 package registryimport ( "encoding/json" "fmt" "io/ioutil" "log" "net/http" "sync" ) const ServerPort = ":3000" const ServicesURL = "http://localhost" + ServerPort + "/services" type registry struct { registrations []Registration mutex *sync.Mutex } func (registry *registry) add (reg Registration) error { registry.mutex.Lock() registry.registrations = append (registry.registrations, reg) registry.mutex.Unlock() return nil } func (registry *registry) remove (url string ) error { for i := range reg.registrations { if reg.registrations[i].ServiceURL == url { registry.mutex.Lock() reg.registrations = append (reg.registrations[:i], reg.registrations[:i+1 ]...) registry.mutex.Unlock() return nil } } return fmt.Errorf("服务地址未发现: %s " , url) } var reg = registry{ registrations: make ([]Registration, 0 ), mutex: new (sync.Mutex), } type RegistryService struct {}func (s RegistryService) ServeHTTP (w http.ResponseWriter, r *http.Request) { log.Println("接受请求:" ) switch r.Method { case http.MethodPost: dec := json.NewDecoder(r.Body) var r Registration err := dec.Decode(&r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } log.Printf("增加服务:%v ,该服务的地址是:%s \n" , r.ServiceName, r.ServiceURL) err = reg.add(r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } case http.MethodDelete: payload, err := ioutil.ReadAll(r.Body) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } url := string (payload) log.Printf("移除服务: %s" , url) err = reg.remove(url) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } default : w.WriteHeader(http.StatusMethodNotAllowed) return } }
注册中心取消服务的方法定义好了,那么也就需要在其他可注册服务的函数体中定义方法。为了进一步的解除耦合度,取消服务的方法和建立服务的方法一样,需要在client里面编写。
修改客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package registryimport ( "bytes" "encoding/json" "fmt" "net/http" ) func RegisterService (r Registration) error { buf := new (bytes.Buffer) enc := json.NewEncoder(buf) err := enc.Encode(r) if err != nil { return err } res, err := http.Post(ServicesURL, "application/json" , buf) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("服务注册失败 " +"状态码为: %v" , res.StatusCode) } return nil } func ShutdownService (url string ) error { req, err := http.NewRequest( http.MethodDelete, ServicesURL, bytes.NewBuffer([]byte (url))) if err != nil { return err } req.Header.Add("Content-Type" , "text/plain" ) res, err := http.DefaultClient.Do(req) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("服务取消失败,状态码为:%v" , res.StatusCode) } return nil }
增加了ShutdownService去结束这个服务。
紧接着,去开始服务注册的函数里边,进行取消注册的修改:
修改服务的注册功能 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package serviceimport ( "context" "fmt" "go_distributed_system_study/registry" "log" "net/http" ) func Start (ctx context.Context, host, port string , reg registry.Registration, registerHandlersFunc func () ) (context.Context, error) { registerHandlersFunc() ctx = startService(ctx, reg.ServiceName, host, port) err := registry.RegisterService(reg) if err != nil { return ctx, err } return ctx, nil } func startService (ctx context.Context, name registry.ServiceName, host string , port string ) context .Context { ctx, cancel := context.WithCancel(ctx) var server http.Server server.Addr = ":" + port go func () { log.Println(server.ListenAndServe()) err := registry.ShutdownService(fmt.Sprintf("http://%s:%s" , host, port)) if err != nil { log.Println(err) } cancel() }() go func () { fmt.Printf("%v 服务开始。按任意键停止. \n" , name) var s string fmt.Scanln(&s) err := registry.ShutdownService(fmt.Sprintf("http://%s:%s" , host, port)) if err != nil { log.Println(err) } server.Shutdown(ctx) cancel() }() return ctx }
主要是两个goroutine的修改,使其具有取消服务的功能。
测试: 那么注册中心和服务的逻辑都修改好了,然后和上面的步骤一样,先启动注册中心,后启动日志逻辑,结果为:
1 2 3 4 5 6 7 8 9 10 11 12 注册中心 的服务开始。按任意键停止. 等待接受请求: 增加服务:log service ,该服务的地址是:http://localhost:4000 log service 服务开始。按任意键停止. q http: Server closed 停止服务 移除服务: http ://localhost :4000 http : Server closed
服务发现 前面的服务注册都是一对一的,还体现不了分布式的特点。接下来进行多服务注册,使得一个学生成绩的服务既要使用日志服务,也要注册到注册中心
业务服务 基本的数据结构与方法 首先要编写一个学生的基础信息的数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package gradesimport ( "fmt" "sync" ) type Student struct { ID int FirstName string LastName string Grades []Grade } type Grade struct { Title string Type GradeType Score float32 } type GradeType string const ( GradeQuiz = GradeType("Quiz" ) GradeTest = GradeType("Test" ) GradeExam = GradeType("Exam" ) ) func (s Student) Average () float32 { var result float32 for _, grade := range s.Grades { result += grade.Score } return result / float32 (len (s.Grades)) } type Students []Studentfunc (ss Students) GetByID (id int ) (*Student, error) { for i := range ss { if ss[i].ID == id { return &ss[i], nil } } return nil , fmt.Errorf("学生的ID: %d 未找到" , id) } var ( students Students studentsMutex sync.Mutex )
接着肯定得有一些学生的数据,来做测试,这些数据一开始就会被加载进数据结构中,这暂时是用来代替数据库的方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 package gradesfunc init () { students = []Student{ { ID: 1 , FirstName: "Nick" , LastName: "Carter" , Grades: []Grade{ { Title: "Quiz 1" , Type: GradeQuiz, Score: 85 , }, { Title: "Final Exam" , Type: GradeExam, Score: 94 , }, { Title: "Quiz 2" , Type: GradeQuiz, Score: 82 , }, }, }, { ID: 2 , FirstName: "Roberto" , LastName: "Baggio" , Grades: []Grade{ { Title: "Quiz 1" , Type: GradeQuiz, Score: 100 , }, { Title: "Final Exam" , Type: GradeExam, Score: 100 , }, { Title: "Quiz 2" , Type: GradeQuiz, Score: 81 , }, }, }, { ID: 3 , FirstName: "Emma" , LastName: "Stone" , Grades: []Grade{ { Title: "Quiz 1" , Type: GradeQuiz, Score: 67 , }, { Title: "Final Exam" , Type: GradeExam, Score: 0 , }, { Title: "Quiz 2" , Type: GradeQuiz, Score: 75 , }, }, }, { ID: 4 , FirstName: "Rachel" , LastName: "McAdams" , Grades: []Grade{ { Title: "Quiz 1" , Type: GradeQuiz, Score: 98 , }, { Title: "Final Exam" , Type: GradeExam, Score: 99 , }, { Title: "Quiz 2" , Type: GradeQuiz, Score: 94 , }, }, }, { ID: 5 , FirstName: "Kelly" , LastName: "Clarkson" , Grades: []Grade{ { Title: "Quiz 1" , Type: GradeQuiz, Score: 95 , }, { Title: "Final Exam" , Type: GradeExam, Score: 100 , }, { Title: "Quiz 2" , Type: GradeQuiz, Score: 97 , }, }, }, } }
那么,也肯定要有server,才能正常的启动服务,需要去编写基本的逻辑,比如获取全部学生信息,根据ID进行信息搜索,增加学生信息等功能:
服务的逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 package gradesimport ( "bytes" "encoding/json" "fmt" "log" "net/http" "strconv" "strings" ) func RegisterHandlers () { handler := new (studentsHandler) http.Handle("/students" , handler) http.Handle("/students/" , handler) } type studentsHandler struct {}func (sh studentsHandler) ServeHTTP (w http.ResponseWriter, r *http.Request) { pathSegments := strings.Split(r.URL.Path, "/" ) switch len (pathSegments) { case 2 : sh.getAll(w, r) case 3 : id, err := strconv.Atoi(pathSegments[2 ]) if err != nil { w.WriteHeader(http.StatusNotFound) return } sh.getOne(w, r, id) case 4 : id, err := strconv.Atoi(pathSegments[2 ]) if err != nil { w.WriteHeader(http.StatusNotFound) return } sh.addGrade(w, r, id) default : w.WriteHeader(http.StatusNotFound) } } func (sh studentsHandler) getAll (w http.ResponseWriter, r *http.Request) { studentsMutex.Lock() defer studentsMutex.Unlock() data, err := sh.toJSON(students) if err != nil { w.WriteHeader(http.StatusInternalServerError) log.Println(err) return } w.Header().Add("Content-Type" , "application/json" ) w.Write(data) } func (sh studentsHandler) getOne (w http.ResponseWriter, r *http.Request, id int ) { studentsMutex.Lock() defer studentsMutex.Unlock() student, err := students.GetByID(id) if err != nil { w.WriteHeader(http.StatusNotFound) log.Println(err) return } data, err := sh.toJSON(student) if err != nil { w.WriteHeader(http.StatusInternalServerError) log.Printf("学生信息序列化失败: %q" , err) return } w.Header().Add("Content-Type" , "application/json" ) w.Write(data) } func (sh studentsHandler) addGrade (w http.ResponseWriter, r *http.Request, id int ) { studentsMutex.Lock() defer studentsMutex.Unlock() student, err := students.GetByID(id) if err != nil { w.WriteHeader(http.StatusNotFound) log.Println(err) return } var g Grade dec := json.NewDecoder(r.Body) err = dec.Decode(&g) if err != nil { w.WriteHeader(http.StatusBadRequest) log.Println(err) return } student.Grades = append (student.Grades, g) w.WriteHeader(http.StatusCreated) data, err := sh.toJSON(g) if err != nil { log.Println(err) return } w.Header().Add("Content-Type" , "applicaiton/json" ) w.Write(data) } func (sh studentsHandler) toJSON (obj interface {}) ([]byte , error) { var b bytes.Buffer enc := json.NewEncoder(&b) err := enc.Encode(obj) if err != nil { return nil , fmt.Errorf("学生信息序列化失败: %q" , err) } return b.Bytes(), nil }
这样的话,基础逻辑也已经完善了,接着就是在注册中心里增加服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 package registrytype Registration struct { ServiceName ServiceName ServiceURL string } type ServiceName string const ( LogService = ServiceName("LogService" ) GradingService = ServiceName("GradingService" ) )
最后,得让web服务可以运行,在cmd文件夹下创建一个新的main函数,写入一样的逻辑代码:
服务启动器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package mainimport ( "context" "go_distributed_system_study/grades" "go_distributed_system_study/registry" "go_distributed_system_study/service" "fmt" stlog "log" ) func main () { host, port := "localhost" , "6000" serviceAddress := fmt.Sprintf("http://%v:%v" , host, port) r := registry.Registration{ ServiceName: registry.GradingService, ServiceURL: serviceAddress, } ctx, err := service.Start(context.Background(), host, port, r, grades.RegisterHandlers) if err != nil { stlog.Fatal(err) } <-ctx.Done() fmt.Println("grading service 服务停止了" ) }
这么下来,这个业务服务也就完成了,他们可以互相不干扰的进行服务注册,但是现在grade服务还不能去调用日志服务。所以我们还需要服务发现。
服务发现 服务发现作用能让grade服务可以请求log服务
去引用日志服务 首先肯定是要给服务的数据结构增加一些基本信息,这样才能使得服务有这些基本的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package registrytype Registration struct { ServiceName ServiceName ServiceURL string RequiredServices []ServiceName ServiceUpdateURL string } type ServiceName string const ( LogService = ServiceName("LogService" ) GradingService = ServiceName("GradingService" ) ) type patchEntry struct { Name ServiceName URL string } type patch struct { Added []patchEntry Removed []patchEntry }
接下来可以想想,一个服务如果还依赖着其他的服务。那么,当这个服务正要注册的时候,或者说要加入服务群体的时候。就会在:
1 func (registry *registry) add (reg Registration)
进行服务注册,那么这个时候如果服务还依赖其他服务,比如正要注册的grade服务还依赖log服务,这时候就正好可以去获取依赖。
修改后代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 package registryimport ( "bytes" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "sync" ) const ServerPort = ":3000" const ServicesURL = "http://localhost" + ServerPort + "/services" type registry struct { registrations []Registration mutex *sync.RWMutex } func (registry *registry) add (reg Registration) error { registry.mutex.Lock() registry.registrations = append (registry.registrations, reg) registry.mutex.Unlock() err := registry.sendRequireServices(reg) return err } func (registry *registry) sendRequireServices (reg Registration) error { registry.mutex.RLock() defer registry.mutex.RUnlock() var p patch for _, serviceReg := range registry.registrations { for _, reqService := range reg.RequiredServices { if serviceReg.ServiceName == reqService { p.Added = append (p.Added, patchEntry{ Name: serviceReg.ServiceName, URL: serviceReg.ServiceURL, }) } } } err := registry.sendPatch(p, reg.ServiceUpdateURL) if err != nil { return err } return nil } func (r registry) sendPatch (p patch, url string ) error { d, err := json.Marshal(p) if err != nil { return err } _, err = http.Post(url, "application/json" , bytes.NewBuffer(d)) if err != nil { return err } return nil } func (registry *registry) remove (url string ) error { for i := range reg.registrations { if reg.registrations[i].ServiceURL == url { registry.mutex.Lock() reg.registrations = append (reg.registrations[:i], reg.registrations[:i+1 ]...) registry.mutex.Unlock() return nil } } return fmt.Errorf("服务地址未发现: %s " , url) } var reg = registry{ registrations: make ([]Registration, 0 ), mutex: new (sync.RWMutex), } type RegistryService struct {}func (s RegistryService) ServeHTTP (w http.ResponseWriter, r *http.Request) { log.Println("等待接受请求:" ) switch r.Method { case http.MethodPost: dec := json.NewDecoder(r.Body) var r Registration err := dec.Decode(&r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } log.Printf("增加服务:%v ,该服务的地址是:%s \n" , r.ServiceName, r.ServiceURL) err = reg.add(r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } case http.MethodDelete: payload, err := ioutil.ReadAll(r.Body) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } url := string (payload) log.Printf("移除服务: %s" , url) err = reg.remove(url) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } default : w.WriteHeader(http.StatusMethodNotAllowed) return } }
这是一个找到所需要服务并将其注册的过程。
接着,grade服务会向注册中心请求这些服务,但是注册中心也需要地方去存储这些请求的服务。
log服务就会向grade服务提供服务,那么会需要一些数据结构去存储:
1 2 3 4 5 6 7 type providers struct { services map [ServiceName][]string mutex *sync.RWMutex }
然后去实现它的逻辑,总体修改后代码如下:
服务的提供者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 package registryimport ( "bytes" "encoding/json" "fmt" "log" "math/rand" "net/http" "net/url" "sync" ) func RegisterService (r Registration) error { serviceUpdateURL, err := url.Parse(r.ServiceUpdateURL) if err != nil { return err } http.Handle(serviceUpdateURL.Path, &serviceUpdateHandler{}) buf := new (bytes.Buffer) enc := json.NewEncoder(buf) err = enc.Encode(r) if err != nil { return err } res, err := http.Post(ServicesURL, "application/json" , buf) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("服务注册失败 " +"状态码为: %v" , res.StatusCode) } return nil } type serviceUpdateHandler struct {}func (suh serviceUpdateHandler) ServeHTTP (w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } dec := json.NewDecoder(r.Body) var p patch err := dec.Decode(&p) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } prov.Update(p) } func ShutdownService (url string ) error { req, err := http.NewRequest( http.MethodDelete, ServicesURL, bytes.NewBuffer([]byte (url))) if err != nil { return err } req.Header.Add("Content-Type" , "text/plain" ) res, err := http.DefaultClient.Do(req) if err != nil { return err } if res.StatusCode != http.StatusOK { return fmt.Errorf("服务取消失败,状态码为:%v" , res.StatusCode) } return nil } type providers struct { services map [ServiceName][]string mutex *sync.RWMutex } var prov = providers{ services: make (map [ServiceName][]string ), mutex: new (sync.RWMutex), } func (p *providers) Update (pat patch) { p.mutex.Lock() defer p.mutex.Unlock() for _, patchEntry := range pat.Added { if _, ok := p.services[patchEntry.Name]; !ok { p.services[patchEntry.Name] = make ([]string , 0 ) } p.services[patchEntry.Name] = append (p.services[patchEntry.Name], patchEntry.URL) } for _, patchEntry := range pat.Removed { if providerURLs, ok := p.services[patchEntry.Name]; ok { for i := range providerURLs { if providerURLs[i] == patchEntry.URL { p.services[patchEntry.Name] = append (providerURLs[:i], providerURLs[i+1 :]...) } } } } } func (p providers) get (name ServiceName) (string , error) { providers, ok := p.services[name] if !ok { return "" , fmt.Errorf("没有可提供服务的提供商: %v" , name) } idx := int (rand.Float32() * float32 (len (providers))) return providers[idx], nil } func GetProvider (name ServiceName) (string , error) { return prov.get(name) }
客户端的client log服务现在有服务端的逻辑,但是客户端的服务想使用这个client还是比较麻烦的,所以还需要对log服务有一个自己的client:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package logimport ( "bytes" "fmt" "go_distributed_system_study/registry" "net/http" stlog "log" ) func SetClientLogger (serviceURL string , clientService registry.ServiceName) { stlog.SetPrefix(fmt.Sprintf("[%v] - " , clientService)) stlog.SetFlags(0 ) stlog.SetOutput(&clientLogger{url: serviceURL}) } type clientLogger struct { url string } func (cl clientLogger) Write (data []byte ) (int , error) { b := bytes.NewBuffer([]byte (data)) res, err := http.Post(cl.url+"/log" , "text/plain" , b) if err != nil { return 0 , err } if res.StatusCode != http.StatusOK { return 0 , fmt.Errorf("Failed to send log message. Service responded with %d - %s" , res.StatusCode, res.Status) } return len (data), nil }
这样就可以让本地的日志服务写好日志后发送到服务器端保存
使main函数具有服务发现的功能 主要是使得两个启动器拥有新的功能:
grading service 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 package mainimport ( "context" "go_distributed_system_study/grades" "go_distributed_system_study/log" "go_distributed_system_study/registry" "go_distributed_system_study/service" "fmt" stlog "log" ) func main () { host, port := "localhost" , "6000" serviceAddress := fmt.Sprintf("http://%v:%v" , host, port) r := registry.Registration{ ServiceName: registry.GradingService, ServiceURL: serviceAddress, RequiredServices: []registry.ServiceName{registry.LogService}, ServiceUpdateURL: serviceAddress + "/services" , } ctx, err := service.Start(context.Background(), host, port, r, grades.RegisterHandlers) if err != nil { stlog.Fatal(err) } if logProvider, err := registry.GetProvider(registry.LogService); err == nil { fmt.Printf("发现日志服务: %s\n" , logProvider) log.SetClientLogger(logProvider, r.ServiceName) } <-ctx.Done() fmt.Println("grading service 服务停止了" ) }
log service 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package mainimport ( "context" "fmt" "go_distributed_system_study/log" "go_distributed_system_study/registry" "go_distributed_system_study/service" stlog "log" ) func main () { log.Run("./distribute.log" ) host, port := "localhost" , "4000" serviceAddress := fmt.Sprintf("http://%s:%s" , host, port) r := registry.Registration{ ServiceName: registry.LogService, ServiceURL: serviceAddress, RequiredServices: make ([]registry.ServiceName, 0 ), ServiceUpdateURL: serviceAddress + "/services" , } ctx, err := service.Start( context.Background(), host, port, r, log.RegisterHandler, ) if err != nil { stlog.Fatalln(err) } <-ctx.Done() fmt.Println("停止服务" ) }
接下来便可以测试了。
测试 按照:registryservice,logservice,gradingservice的顺序启动,测试结果如:
1 2 3 4 5 6 7 8 9 10 注册中心 的服务开始。按任意键停止. 等待接受请求: 增加服务:LogService ,该服务的地址是:http://localhost:4000 等待接受请求: 增加服务:GradingService ,该服务的地址是:http://localhost:6000 LogService 服务开始。按任意键停止. GradingService 服务开始。按任意键停止. 发现日志服务: http://localhost:4000
这么一来就完成了。
依赖变化 重新发现服务 可以从上述的情况下看到一些不那么方便的点,一是:启动必须按照顺序来,不能随意。二是:当log服务下线后,再上线的话不会被再次发现。这都是服务极为脆弱的表现。那么解决这个问题的最好方法是:使服务具有依赖变化时进行通知的功能。
可以在服务中更改,使其具备通知的功能,主要是有notify函数,总体修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 package registryimport ( "bytes" "encoding/json" "fmt" "io/ioutil" "log" "net/http" "sync" ) const ServerPort = ":3000" const ServicesURL = "http://localhost" + ServerPort + "/services" type registry struct { registrations []Registration mutex *sync.RWMutex } func (registry *registry) add (reg Registration) error { registry.mutex.Lock() registry.registrations = append (registry.registrations, reg) registry.mutex.Unlock() err := registry.sendRequireServices(reg) registry.notify(patch{ Added: []patchEntry{ patchEntry{ Name: reg.ServiceName, URL: reg.ServiceURL, }, }, }) return err } func (registry *registry) sendRequireServices (reg Registration) error { registry.mutex.RLock() defer registry.mutex.RUnlock() var p patch for _, serviceReg := range registry.registrations { for _, reqService := range reg.RequiredServices { if serviceReg.ServiceName == reqService { p.Added = append (p.Added, patchEntry{ Name: serviceReg.ServiceName, URL: serviceReg.ServiceURL, }) } } } err := registry.sendPatch(p, reg.ServiceUpdateURL) if err != nil { return err } return nil } func (r registry) notify (fullPatch patch) { r.mutex.RLock() defer r.mutex.RUnlock() for _, reg := range r.registrations { go func (reg Registration) { for _, reqService := range reg.RequiredServices { p := patch{Added: []patchEntry{}, Removed: []patchEntry{}} sendUpdate := false for _, added := range fullPatch.Added { if added.Name == reqService { p.Added = append (p.Added, added) sendUpdate = true } } for _, removed := range fullPatch.Removed { if removed.Name == reqService { p.Removed = append (p.Removed, removed) sendUpdate = true } } if sendUpdate { err := r.sendPatch(p, reg.ServiceUpdateURL) if err != nil { log.Println(err) return } } } }(reg) } } func (r registry) sendPatch (p patch, url string ) error { d, err := json.Marshal(p) if err != nil { return err } _, err = http.Post(url, "application/json" , bytes.NewBuffer(d)) if err != nil { return err } return nil } func (registry *registry) remove (url string ) error { for i := range reg.registrations { if reg.registrations[i].ServiceURL == url { registry.mutex.Lock() reg.registrations = append (reg.registrations[:i], reg.registrations[:i+1 ]...) registry.mutex.Unlock() return nil } } return fmt.Errorf("服务地址未发现: %s " , url) } var reg = registry{ registrations: make ([]Registration, 0 ), mutex: new (sync.RWMutex), } type RegistryService struct {}func (s RegistryService) ServeHTTP (w http.ResponseWriter, r *http.Request) { log.Println("等待接受请求:" ) switch r.Method { case http.MethodPost: dec := json.NewDecoder(r.Body) var r Registration err := dec.Decode(&r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } log.Printf("增加服务:%v ,该服务的地址是:%s \n" , r.ServiceName, r.ServiceURL) err = reg.add(r) if err != nil { log.Println(err) w.WriteHeader(http.StatusBadRequest) return } case http.MethodDelete: payload, err := ioutil.ReadAll(r.Body) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } url := string (payload) log.Printf("移除服务: %s" , url) err = reg.remove(url) if err != nil { log.Println(err) w.WriteHeader(http.StatusInternalServerError) return } default : w.WriteHeader(http.StatusMethodNotAllowed) return } }
测试1 接着再进行测试,可以看到,当log服务下线后,重新上线时,grading 服务就能够发现log服务了。
1 2 3 4 收到更新: {[{LogService http://localhost:4000 } {LogService http://localhost:4000 }] []} 发现日志服务: http://localhost:4000 收到更新: {[{LogService http://localhost:4000 }] []} 收到更新: {[{LogService http://localhost:4000 }] []}
服务下线告知 接着也容易,把remove方法里面添加下线告知的功能就行了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func (registry *registry) remove (url string ) error { for i := range reg.registrations { if reg.registrations[i].ServiceURL == url { registry.notify(patch{ Removed: []patchEntry{ { Name: registry.registrations[i].ServiceName, URL: registry.registrations[i].ServiceURL, }, }, }) registry.mutex.Lock() reg.registrations = append (reg.registrations[:i], reg.registrations[:i+1 ]...) registry.mutex.Unlock() return nil } } return fmt.Errorf("服务地址未发现: %s " , url) }
这样,这部分逻辑就完成了。接下来测试代码。
测试2 测试的步骤是先开启注册中心,再开启日志服务,后开始grade服务。然后使得日志服务停止,再重启。可以看到一系列的结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 注册中心 的服务开始。按任意键停止. 等待接受请求: 增加服务:LogService ,该服务的地址是:http://localhost:4000 等待接受请求: 增加服务:GradingService ,该服务的地址是:http://localhost:6000 移除服务: http://localhost:4000 等待接受请求: 增加服务:LogService ,该服务的地址是:http://localhost:4000 LogService 服务开始。按任意键停止. 收到更新: {[] []} http: Server closed 停止服务 LogService 服务开始。按任意键停止GradingService 服务开始。按任意键停止.收到更新: {[{LogService http ://localhost :4000}] []} 发现日志服务: http ://localhost :4000 收到更新: {[] [{LogService http ://localhost :4000}]} 收到更新: {[{LogService http ://localhost :4000}] []}
使用解除了耦合的网络接口,这也是Spring Cloud的微服务思想,同时也是分布式的一种类型。所以说分布式也没什么神奇之处,最核心的一处在于:把本地接口转化为了网络接口 。能够理解这一过程,也就理解了分布式的思想。