您的位置:  首页 > 技术 > 数据库 > 正文

浪潮云溪分布式数据库协议代码解析(1)

2022-05-19 19:00 https://my.oschina.net/u/5148943/blog/5528160 浪潮云溪数据库 次阅读 条评论

云溪数据库支持PostgreSQL protocol 3.0,用于客户端与服务端之间的信息通信,应用于连接认证及数据请求阶段。

PostgreSQL协议的消息通用格式如下图所示,包含1字节的消息类型,4字节的长度(不包括类型的长度),以及消息的内容。由于历史原因,startup消息不包含类型。

 


 

Part 1 - 连接认证阶段

 

 

1.用户使用客户端,通过云溪数据库 sql命令,尝试连接服务端时,客户端会获取连接命令的参数,生成URL,具体格式如下

postgres://<username>:<password>@<host>:<port>/<database>?<parameters>

其中,包含了当前用户的用户名和密码,节点的IP地址和端口,连接的数据库库名,以及额外的连接参数。

2.客户端会根据URL,建立与服务端之间的连接,发送一个startup消息。

func(c *Connector) open(ctx context.Context)(cn *conn, err error){    ...    cn.startup(o)    ...}

上述代码,将构建一个startup消息,该消息没有消息类型,包含了协议版本号,和连接参数等内容。

3.服务端接收解析startup消息,获得连接参数。

func(s *Server) ServeConn(ctx context.Context, conn net.Conn)error{    ...    var buf pgwirebase.ReadBuffer    n, err := buf.ReadUntypedMsg(conn)    if err !=nil{        return err    }    version, err := buf.GetUint32()    if err !=nil{        return err    }    ...    // get connection parameters    if sArgs, err = parseOptions(ctx, buf.Msg); err !=nil{        return sendErr(err)    }    ...}

4.服务端发送AuthenticationRequest消息,要求客户端进一步提供认证信息,以进行用户身份认证。​​​​​​​​​​​​​​​​​​​​​​​​​​​​

func authPassword(    c AuthConn,    tlsState tls.ConnectionState,    insecure bool,    hashedPassword []byte,    validUntil *tree.DTimestamp,    encryption string,    execCfg *sql.ExecutorConfig,    entry *hba.Entry,)(security.UserAuthHook,error){    if err := c.SendAuthRequest(authCleartextPassword,nil); err !=nil{        returnnil,err    }    // recevice password from client    password, err := c.ReadPasswordString()    ...}

认证请求消息中,除了消息类型’R’外,还包含认证方式,目前云溪数据库支持证书、口令和GSSAPI三种认证方式。证书认证不需要额外的认证信息,认证通过后直接发送AuthenticationOk消息,跳过5、6。

5.客户端收到AuthenticationRequest消息后,则会发送对应的认证信息,回应此消息,该回应的消息类型为’p’。​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​

func(cn *conn) startup(o values){    ...    for{        // recevice responses after sending startup        t, r := cn.recv()        switch t {        case'K':            cn.processBackendKeyData(r)        case'S':            cn.processParameterStatus(r)        case'R':            cn.auth(r, o)        case'Z':            cn.processReadyForQuery(r)            return        default:            errorf("unknown response forstartup: %q",t)        }    }}

func(cn *conn) auth(r *readBuf, o values){    switch code := r.int32(); code {    case0:        // OK    case3:        w := cn.writeBuf('p')        w.string(o["password"])        cn.send(w)        ...    case7:// GSSAPI, startup        ...        w := cn.writeBuf('p')        w.bytes(token)        cn.send(w)        ...    }}

 

6.服务端收到认证回应后,进行用户的身份认证。

7.服务端认证完成后,给客户端发送认证结果。成功,发送AuthenticationOk(‘R’),authType为0;失败,则发送ErrorResponse(‘E’),连接过程结束。​​​​​​​​​​​​​​

func(c *conn) handleAuthentication(    ctx context.Context,    insecure bool,    ie *sql.InternalExecutor,    auth *hba.Conf,    execCfg *sql.ExecutorConfig,)(authErr error){    ...    c.msgBuilder.initMsg(pgwirebase.ServerMsgAuth)    c.msgBuilder.putInt32(authOK)    return c.msgBuilder.finishMsg(c.conn)}

 

8.服务端认证完成后,将发送多条参数信息ParameterStatus(‘S’),包括server_version, client_encoding 和 DateStyle 等参数。每个参数,都会发送一条ParameterStatus消息。​​​​​​​

func(c *conn) serveImpl(    ctx context.Context,    draining func()bool,    sqlServer *sql.Server,    reserved mon.BoundAccount,    stopper *stop.Stopper,)error{    ...    sendStatusParam:=func(param, value string)error{        c.msgBuilder.initMsg(pgwirebase.ServerMsgParameterStatus)        c.msgBuilder.writeTerminatedString(param)        c.msgBuilder.writeTerminatedString(value)        return c.msgBuilder.finishMsg(c.conn)    }    ...    for _, param :=range statusReportParams {            value := connHandler.GetStatusParam(ctx, param)            if err := sendStatusParam(param, value); err !=nil{                return err            }        }    }    ...}

 

9.服务端发送ReadyForQuery(‘Z’),表示一切准备就绪,通知客户端可以发送SQL请求了。​​​​​​​

func(c *conn) serveImpl(    ctx context.Context,    draining func()bool,    sqlServer *sql.Server,    reserved mon.BoundAccount,    stopper *stop.Stopper,)error{    ...    // An initial readyForQuery message is part of the handshake.    c.msgBuilder.initMsg(pgwirebase.ServerMsgReady)    c.msgBuilder.writeByte(byte(sql.IdleTxnBlock))    if err := c.msgBuilder.finishMsg(c.conn); err !=nil{        return err    }    ...}​​​​​​​

至此,客户端与服务端之间,已经成功建立起连接,用户可以执行后续操作了。

展开阅读全文
  • 0
    感动
  • 0
    路过
  • 0
    高兴
  • 0
    难过
  • 0
    搞笑
  • 0
    无聊
  • 0
    愤怒
  • 0
    同情
热度排行
友情链接