项目背景
因为onlyoffice不支持集群部署,为了实现集群部署的目的,开发此组件将相同文档的请求定位到同一服务节点。解析http ws请求地址,获取docId,对集群节点作一致性哈希计算获取相同的服务节点,以达到协同编辑、回存等功能正常的目的。
项目功能模块
- 节点测活
- 获取文档唯一标识docId
- 一致性哈希
- 请求转发 http ws
- ssl兼容
- 多平台支持
开发
配置文件:
{
"ssl":false,
"node":[
"10.1.15.66:9060"
],
"path":[
"(?<=/files/)[a-zA-Z._0-9]{1,}(?=\\..*/)",
"(?<=/docs/)[a-zA-Z._0-9]{1,}(?=\\..*/)",
"(?<=/doc/)[a-zA-Z._0-9]{1,}(?=\\..*/)"
],
"header":[
"docId"
],
"checkPath" : "/themes.json",
"checkSeconds": 5,
"port":"8080"
}
参数 | 类型 | 解释 |
---|---|---|
Ssl | bool | 是否使用https wss |
Node | []string | office服务地址 |
Path | []string | 正则解析一致性哈希的key值 |
Header | []string | 请求头哈希《暂不支持》 |
CheckPath | string | 节点测活的API地址 |
CheckSeconds | string | 节点测活的API地址 |
Port | string | 代理服务启动端口 |
依赖库源码修改
fasthttp
//设置host,避免nginx重定向时跳转到的host是内部服务地址。
reverseproxy.go func (p *ReverseProxy) ServeHTTP(ctx *fasthttp.RequestCtx) {}
req.SetHost(string(req.Host()))
// req.SetHost(c.Addr)
调用方法:
ctx.Request.SetHost(string(ctx.Request.Host()))
https://github.com/yeqown/fasthttp-reverse-proxy/pull/35
源代码
main()方法
加载配置文件
根据配置文件判断启动的是http服务还是https服务,如果是https服务需要加载证书文件。
启动定时任务,定时监测节点是否正常( httpget(addr+checkPath) status code==200 )
ProxyHandler()方法,接受请求进行转发。
获取请求地址,进行path路径匹配,截取匹配到的docId。
使用docId进行一致性哈希,从配置文件中的node获取命中节点。由此节点进行转发请求。
如果节点异常返回异常信息。如果状态正常进行转发。
转发需要注意兼容http https ws wss。
main.go
package main
import (
"crypto/tls"
"strconv"
"time"
"github.com/fasthttp/websocket"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
proxy "github.com/yeqown/fasthttp-reverse-proxy/v2"
"stathat.com/c/consistent"
)
func init() {
path := "logs/proxy.log"
/* 日志轮转相关函数
`WithLinkName` 为最新的日志建立软连接
`WithRotationTime` 设置日志分割的时间,隔多久分割一次
WithMaxAge 和 WithRotationCount二者只能设置一个
`WithMaxAge` 设置文件清理前的最长保存时间
`WithRotationCount` 设置文件清理前最多保存的个数
*/
// 下面配置日志每隔 1 分钟轮转一个新文件,保留最近 3 分钟的日志文件,多余的自动清理掉。
writer, _ := rotatelogs.New(
path+".%Y%m%d%H%M",
rotatelogs.WithLinkName(path),
rotatelogs.WithMaxAge(time.Duration(48)*time.Hour),
rotatelogs.WithRotationTime(time.Duration(48)*time.Hour),
)
log.SetOutput(writer)
//log.SetFormatter(&log.JSONFormatter{})
}
var upgraders = &websocket.FastHTTPUpgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// 解决跨域问题
CheckOrigin: func(r *fasthttp.RequestCtx) bool {
return true
},
}
//配置服务节点
var nodes []ServerNode
//路径正则匹配
var paths []string
//一致性哈希
var cons = consistent.New()
//测活检测接口地址,200即成功
var checkPath = ""
//使用协议,是否使用wss https
var ssl = false
func ProxyHandler(ctx *fasthttp.RequestCtx) {
var reqPath string = string(ctx.Path())
log.Info("uri: %s , upgrade: %s ", ctx.URI().String(), ctx.Request.Header.ConnectionUpgrade())
//选取节点
var serverNode = selectNode(reqPath)
if !serverNode.active {
log.Error("serverNode: " + serverNode.addr + " not active")
ctx.Error("Server node is not active", fasthttp.StatusInternalServerError)
return
}
//设置host,避免nginx重定向时跳转到的host是内部服务地址。
ctx.Request.SetHost(string(ctx.Request.Host()))
if ssl {
if ctx.Request.Header.ConnectionUpgrade() {
//wss
proxy.DefaultUpgrader = upgraders
tlsConfig := &tls.Config{InsecureSkipVerify: true}
log.Info("wss://" + serverNode.addr + reqPath)
optionWSURL := proxy.WithURL_OptionWS("wss://" + serverNode.addr + reqPath)
optionWSTLS := proxy.WithDialer_OptionWS(&websocket.Dialer{TLSClientConfig: tlsConfig})
proxyServer, err := proxy.NewWSReverseProxyWith(optionWSURL, optionWSTLS)
if err != nil {
// Error handling
}
proxyServer.ServeHTTP(ctx)
} else {
//https
//忽略证书验证
tlsConfig := &tls.Config{InsecureSkipVerify: true}
opts := proxy.WithTLSConfig(tlsConfig)
log.Info("https://" + serverNode.addr + reqPath)
var proxyServer = proxy.NewReverseProxy(serverNode.addr, opts)
proxyServer.ServeHTTP(ctx)
}
} else {
if ctx.Request.Header.ConnectionUpgrade() {
log.Info("ws://" + serverNode.addr + reqPath)
//ws
proxy.DefaultUpgrader = upgraders
proxyServer := proxy.NewWSReverseProxy(serverNode.addr, reqPath)
proxyServer.ServeHTTP(ctx)
} else {
//http
log.Info("http://" + serverNode.addr + reqPath)
var proxyServer = proxy.NewReverseProxy(serverNode.addr)
proxyServer.ServeHTTP(ctx)
}
}
}
func main() {
//读取配置文件,加载至本地缓存
var proxyConf = ReadFile()
checkPath = proxyConf.CheckPath
var addrs = proxyConf.Node
paths = proxyConf.Path
ssl = proxyConf.Ssl
var len int = len(addrs)
nodes = make([]ServerNode, len)
for i := range addrs {
item := &addrs[i]
cons.Add(*item)
var node = ServerNode{addr: *item, active: false}
nodes[i] = node
}
//启动定时任务,节点测活
crontab := cron.New(cron.WithSeconds()) //精确到秒
//定义定时器调用的任务函数
//定时任务
spec := "*/" + strconv.Itoa(proxyConf.CheckSeconds) + " * * * * ?" //cron表达式,每五秒一次
// 添加定时任务,
crontab.AddFunc(spec, testNode)
// 启动定时器
crontab.Start()
// 定时任务是另起协程执行的,这里使用 select 简答阻塞.实际开发中需要
// 根据实际情况进行控制
// select {} //阻塞主线程停止
//监听端口,转发请求
proxy.DefaultUpgrader = upgraders
if ssl {
log.Info("start ssl server")
//https
server := fasthttp.Server{
// Our handler will be the muxer
Handler: ProxyHandler,
// Setup TLS
TLSConfig: &tls.Config{
MinVersion: tls.VersionTLS12,
},
}
if err := server.ListenAndServeTLS(":"+proxyConf.Port, "certs/site.crt", "certs/site.key"); err != nil {
log.Fatalf("Error in ListenAndServeTLS: %s", err)
}
} else {
log.Info("start server")
//http
// 8080为监听端口
if err := fasthttp.ListenAndServe(":"+proxyConf.Port, ProxyHandler); err != nil {
log.Fatal(err)
}
}
}
conf.go
package main
import (
"encoding/json"
"io/ioutil"
log "github.com/sirupsen/logrus"
)
//父类
type ProxyConf struct {
Ssl bool `json:"ssl"`
Node []string `json:"node"`
Path []string `json:"path"`
Header []string `json:"header"`
CheckPath string `json:"checkPath"`
CheckSeconds int `json:"checkSeconds"`
Port string `json:"port"`
}
func ReadFile() ProxyConf {
var conf ProxyConf
// 读取JSON文件内容 返回字节切片
bytes, _ := ioutil.ReadFile("conf.json")
// 打印时需要转为字符串
log.Errorln("加载配置文件:" + string(bytes))
// 将字节切片映射到指定结构上
json.Unmarshal(bytes, &conf)
return conf
}
type ServerNode struct {
addr string
active bool
}
check.go
package main
import (
"crypto/tls"
"fmt"
"math/rand"
"net/http"
"time"
"github.com/dlclark/regexp2"
log "github.com/sirupsen/logrus"
)
func checkServer(ssl bool, addr string) bool {
//设置超时时间
client := http.Client{
Timeout: 1 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
var checkUrl = addr + checkPath
if ssl {
checkUrl = "https://" + checkUrl
} else {
checkUrl = "http://" + checkUrl
}
//内网检测
resp, err := client.Get(checkUrl) // 注意使用 * 间接引用
// If there is an error or non-200 status, exit with 1 signaling unsuccessful check.
if err != nil || resp.StatusCode != 200 {
return false
}
return true
}
func testNode() {
for i := range nodes {
item := &nodes[i]
//ping port
var active = checkServer(ssl, item.addr)
var node = ServerNode{addr: *&item.addr, active: active}
if !active {
log.Errorln(" 服务不可用:", node.addr)
}
nodes[i] = node
}
}
func selectNode(reqPath string) ServerNode {
if len(nodes) == 1 {
return nodes[0]
}
var addr = nodes[rand.Intn(len(nodes))].addr
for i := range paths {
item := &paths[i]
//ping port
re := regexp2.MustCompile(string(*item), 0)
if m, _ := re.FindStringMatch(reqPath); m != nil {
// the whole match is always group 0
docId := m.String()
if docId != "" {
var err error = nil
addr, err = cons.Get(docId)
log.Infoln("请求地址:%s ,匹配规则:%s ,解析docId:%s ,转发至节点:%s", reqPath, *item, docId, addr)
if err != nil {
fmt.Println(err)
log.Errorln("一致性哈希错误:%s, docId: %s .", err, docId)
}
break
}
}
}
for i := range nodes {
node := &nodes[i]
if node.addr == addr {
return *node
}
}
return ServerNode{}
}