项目背景

因为onlyoffice不支持集群部署,为了实现集群部署的目的,开发此组件将相同文档的请求定位到同一服务节点。解析http ws请求地址,获取docId,对集群节点作一致性哈希计算获取相同的服务节点,以达到协同编辑、回存等功能正常的目的。

项目功能模块

  • 节点测活
  • 获取文档唯一标识docId
  • 一致性哈希
  • 请求转发 http ws
  • ssl兼容
  • 多平台支持

开发

配置文件:

{
    "ssl":false,
    "node":[
        "10.1.15.66:9060"
    ],
    "path":[
        "(?<=/files/)[a-zA-Z._0-9]{1,}(?=\\..*/)",
        "(?<=/docs/)[a-zA-Z._0-9]{1,}(?=\\..*/)",
        "(?<=/doc/)[a-zA-Z._0-9]{1,}(?=\\..*/)"
    ],
    "header":[
        "docId"
    ],
    "checkPath" : "/themes.json",
    "checkSeconds": 5,
    "port":"8080"
}
参数类型解释
Sslbool是否使用https wss
Node[]stringoffice服务地址
Path[]string正则解析一致性哈希的key值
Header[]string请求头哈希《暂不支持》
CheckPathstring节点测活的API地址
CheckSecondsstring节点测活的API地址
Portstring代理服务启动端口

依赖库源码修改

fasthttp
//设置host,避免nginx重定向时跳转到的host是内部服务地址。
reverseproxy.go func (p *ReverseProxy) ServeHTTP(ctx *fasthttp.RequestCtx) {}

req.SetHost(string(req.Host()))
// req.SetHost(c.Addr)

调用方法:
ctx.Request.SetHost(string(ctx.Request.Host()))

https://github.com/yeqown/fasthttp-reverse-proxy/pull/35

源代码

main()方法

加载配置文件

根据配置文件判断启动的是http服务还是https服务,如果是https服务需要加载证书文件。

启动定时任务,定时监测节点是否正常( httpget(addr+checkPath) status code==200 )

ProxyHandler()方法,接受请求进行转发。

获取请求地址,进行path路径匹配,截取匹配到的docId。
使用docId进行一致性哈希,从配置文件中的node获取命中节点。由此节点进行转发请求。
如果节点异常返回异常信息。如果状态正常进行转发。
转发需要注意兼容http https ws wss。

main.go

package main

import (
	"crypto/tls"
	"strconv"
	"time"

	"github.com/fasthttp/websocket"
	rotatelogs "github.com/lestrrat-go/file-rotatelogs"
	"github.com/robfig/cron/v3"
	log "github.com/sirupsen/logrus"
	"github.com/valyala/fasthttp"
	proxy "github.com/yeqown/fasthttp-reverse-proxy/v2"
	"stathat.com/c/consistent"
)

func init() {
	path := "logs/proxy.log"
	/* 日志轮转相关函数
	`WithLinkName` 为最新的日志建立软连接
	`WithRotationTime` 设置日志分割的时间,隔多久分割一次
	WithMaxAge 和 WithRotationCount二者只能设置一个
	  `WithMaxAge` 设置文件清理前的最长保存时间
	  `WithRotationCount` 设置文件清理前最多保存的个数
	*/
	// 下面配置日志每隔 1 分钟轮转一个新文件,保留最近 3 分钟的日志文件,多余的自动清理掉。
	writer, _ := rotatelogs.New(
		path+".%Y%m%d%H%M",
		rotatelogs.WithLinkName(path),
		rotatelogs.WithMaxAge(time.Duration(48)*time.Hour),
		rotatelogs.WithRotationTime(time.Duration(48)*time.Hour),
	)
	log.SetOutput(writer)
	//log.SetFormatter(&log.JSONFormatter{})
}

var upgraders = &websocket.FastHTTPUpgrader{
	ReadBufferSize:  1024,
	WriteBufferSize: 1024,
	// 解决跨域问题
	CheckOrigin: func(r *fasthttp.RequestCtx) bool {
		return true
	},
}

//配置服务节点
var nodes []ServerNode

//路径正则匹配
var paths []string

//一致性哈希
var cons = consistent.New()

//测活检测接口地址,200即成功
var checkPath = ""

//使用协议,是否使用wss https
var ssl = false

func ProxyHandler(ctx *fasthttp.RequestCtx) {
	var reqPath string = string(ctx.Path())
	log.Info("uri: %s , upgrade: %s ", ctx.URI().String(), ctx.Request.Header.ConnectionUpgrade())
	//选取节点
	var serverNode = selectNode(reqPath)
	if !serverNode.active {
		log.Error("serverNode: " + serverNode.addr + " not active")
		ctx.Error("Server node is not active", fasthttp.StatusInternalServerError)
		return
	}
	//设置host,避免nginx重定向时跳转到的host是内部服务地址。
	ctx.Request.SetHost(string(ctx.Request.Host()))
	if ssl {
		if ctx.Request.Header.ConnectionUpgrade() {

			//wss
			proxy.DefaultUpgrader = upgraders

			tlsConfig := &tls.Config{InsecureSkipVerify: true}
			log.Info("wss://" + serverNode.addr + reqPath)
			optionWSURL := proxy.WithURL_OptionWS("wss://" + serverNode.addr + reqPath)
			optionWSTLS := proxy.WithDialer_OptionWS(&websocket.Dialer{TLSClientConfig: tlsConfig})

			proxyServer, err := proxy.NewWSReverseProxyWith(optionWSURL, optionWSTLS)

			if err != nil {
				// Error handling
			}
			proxyServer.ServeHTTP(ctx)
		} else {
			//https
			//忽略证书验证
			tlsConfig := &tls.Config{InsecureSkipVerify: true}
			opts := proxy.WithTLSConfig(tlsConfig)
			log.Info("https://" + serverNode.addr + reqPath)

			var proxyServer = proxy.NewReverseProxy(serverNode.addr, opts)

			proxyServer.ServeHTTP(ctx)
		}
	} else {
		if ctx.Request.Header.ConnectionUpgrade() {
			log.Info("ws://" + serverNode.addr + reqPath)
			//ws
			proxy.DefaultUpgrader = upgraders
			proxyServer := proxy.NewWSReverseProxy(serverNode.addr, reqPath)
			proxyServer.ServeHTTP(ctx)
		} else {
			//http
			log.Info("http://" + serverNode.addr + reqPath)
			var proxyServer = proxy.NewReverseProxy(serverNode.addr)
			proxyServer.ServeHTTP(ctx)
		}

	}
}

func main() {
	//读取配置文件,加载至本地缓存
	var proxyConf = ReadFile()
	checkPath = proxyConf.CheckPath
	var addrs = proxyConf.Node
	paths = proxyConf.Path
	ssl = proxyConf.Ssl

	var len int = len(addrs)
	nodes = make([]ServerNode, len)
	for i := range addrs {
		item := &addrs[i]
		cons.Add(*item)
		var node = ServerNode{addr: *item, active: false}
		nodes[i] = node
	}

	//启动定时任务,节点测活
	crontab := cron.New(cron.WithSeconds()) //精确到秒
	//定义定时器调用的任务函数
	//定时任务
	spec := "*/" + strconv.Itoa(proxyConf.CheckSeconds) + " * * * * ?" //cron表达式,每五秒一次
	// 添加定时任务,
	crontab.AddFunc(spec, testNode)
	// 启动定时器
	crontab.Start()
	// 定时任务是另起协程执行的,这里使用 select 简答阻塞.实际开发中需要
	// 根据实际情况进行控制
	// select {} //阻塞主线程停止

	//监听端口,转发请求
	proxy.DefaultUpgrader = upgraders

	if ssl {
		log.Info("start ssl server")
		//https
		server := fasthttp.Server{
			// Our handler will be the muxer
			Handler: ProxyHandler,
			// Setup TLS
			TLSConfig: &tls.Config{
				MinVersion: tls.VersionTLS12,
			},
		}

		if err := server.ListenAndServeTLS(":"+proxyConf.Port, "certs/site.crt", "certs/site.key"); err != nil {

			log.Fatalf("Error in ListenAndServeTLS: %s", err)
		}
	} else {
		log.Info("start server")
		//http
		// 8080为监听端口
		if err := fasthttp.ListenAndServe(":"+proxyConf.Port, ProxyHandler); err != nil {
			log.Fatal(err)
		}
	}
}


conf.go

package main

import (
	"encoding/json"
	"io/ioutil"

	log "github.com/sirupsen/logrus"
)

//父类
type ProxyConf struct {
	Ssl          bool     `json:"ssl"`
	Node         []string `json:"node"`
	Path         []string `json:"path"`
	Header       []string `json:"header"`
	CheckPath    string   `json:"checkPath"`
	CheckSeconds int      `json:"checkSeconds"`
	Port         string   `json:"port"`
}

func ReadFile() ProxyConf {
	var conf ProxyConf

	// 读取JSON文件内容 返回字节切片
	bytes, _ := ioutil.ReadFile("conf.json")

	// 打印时需要转为字符串
	log.Errorln("加载配置文件:" + string(bytes))

	// 将字节切片映射到指定结构上
	json.Unmarshal(bytes, &conf)

	return conf
}

type ServerNode struct {
	addr   string
	active bool
}

check.go

package main

import (
	"crypto/tls"
	"fmt"
	"math/rand"
	"net/http"
	"time"

	"github.com/dlclark/regexp2"
	log "github.com/sirupsen/logrus"
)

func checkServer(ssl bool, addr string) bool {
	//设置超时时间
	client := http.Client{
		Timeout: 1 * time.Second,
		Transport: &http.Transport{
			TLSClientConfig: &tls.Config{
				InsecureSkipVerify: true,
			},
		},
	}
	var checkUrl = addr + checkPath
	if ssl {
		checkUrl = "https://" + checkUrl
	} else {
		checkUrl = "http://" + checkUrl
	}
	//内网检测
	resp, err := client.Get(checkUrl) // 注意使用 * 间接引用

	// If there is an error or non-200 status, exit with 1 signaling unsuccessful check.
	if err != nil || resp.StatusCode != 200 {
		return false
	}
	return true
}

func testNode() {
	for i := range nodes {
		item := &nodes[i]
		//ping port
		var active = checkServer(ssl, item.addr)
		var node = ServerNode{addr: *&item.addr, active: active}
		if !active {
			log.Errorln(" 服务不可用:", node.addr)
		}
		nodes[i] = node
	}
}

func selectNode(reqPath string) ServerNode {
	if len(nodes) == 1 {
		return nodes[0]
	}
	var addr = nodes[rand.Intn(len(nodes))].addr
	for i := range paths {
		item := &paths[i]
		//ping port
		re := regexp2.MustCompile(string(*item), 0)
		if m, _ := re.FindStringMatch(reqPath); m != nil {
			// the whole match is always group 0
			docId := m.String()
			if docId != "" {
				var err error = nil
				addr, err = cons.Get(docId)
				log.Infoln("请求地址:%s ,匹配规则:%s ,解析docId:%s ,转发至节点:%s", reqPath, *item, docId, addr)
				if err != nil {
					fmt.Println(err)
					log.Errorln("一致性哈希错误:%s, docId: %s .", err, docId)
				}
				break
			}
		}
	}
	for i := range nodes {
		node := &nodes[i]
		if node.addr == addr {
			return *node
		}
	}
	return ServerNode{}
}