阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Riak集群监控详解

114次阅读
没有评论

共计 13914 个字符,预计需要花费 35 分钟才能阅读完成。

公司的 Riak 版本是 2.0.4,目前已根据 CMDB 三级业务部署了十几套集群,大部分是跨机房部署。监控采集分为两个大的维度,第一个维度是单机,也就是「IP: 端口」;第二个维度是集群,也就是所有节点指标的统计结果。本文主要介绍采集的指标和采集程序。

一、采集的指标

1、吞吐量指标

1.1 单机

采集方法:

/usr/sbin/riak-admin status
指标 功能
node_gets 某节点前一分钟处理的 GET 请求数量,包括该节点上非本地虚拟节点处理的 GET 请求
node_puts 某节点前一分钟处理的 PUT 请求数量,包括该节点上非本地虚拟节点处理的 PUT 请求

 

 

 

1.2 集群

指标 功能 统计方法
node_gets_total 集群前一分钟处理的 GET 请求数量 SUM(node_gets)
node_puts_total 集群前一分钟处理的 PUT 请求数量 SUM(node_puts)

 

 

 

2、延迟指标

2.1 单机

采集方法:

/usr/sbin/riak-admin status
指标 功能
node_get_fsm_time_mean 客户端发起 GET 请求到收到响应时间间隔的均值
node_get_fsm_time_median 客户端发起 GET 请求到收到响应时间间隔的中值
node_get_fsm_time_95 客户端发起 GET 请求到收到响应时间间隔的 95 百分位值
node_get_fsm_time_100 客户端发起 GET 请求到收到响应时间间隔的 100 百分位值
node_put_fsm_time_mean 客户端发起 PUT 请求到收到响应时间间隔的均值
node_put_fsm_time_median 客户端发起 PUT 请求到收到响应时间间隔的中值
node_put_fsm_time_95 客户端发起 PUT 请求到收到响应时间间隔的 95 百分位值
node_put_fsm_time_100 客户端发起 PUT 请求到收到响应时间间隔的 100 百分位值

 

 

 

 

 

 

2.2 集群

指标 功能 统计方法
node_get_fsm_time_mean_avg 客户端发起 GET 请求到收到响应时间间隔的均值 AVG(node_get_fsm_time_mean)
node_put_fsm_time_mean_avg 客户端发起 PUT 请求到收到响应时间间隔的均值 AVG(node_put_fsm_time_mean)

 

3、Erlang 资源使用情况指标(单机)

采集方法:

/usr/sbin/riak-admin status
指标 功能
sys_process_count Erlang 进程的数量
memory_processes 分配给 Erlang 进程的内存总量(单位 bytes)
memory_processes_used Erlang 进程使用的内存总量(单位 bytes)

 

 

 

4、Riak 负荷 / 健康指标

4.1 单机

采集方法:

/usr/sbin/riak-admin status
指标 功能
read_repairs 某节点前一分钟处理的读取修复操作数量
node_get_fsm_siblings_mean 某节点前一分钟所有 GET 操作处理的兄弟数据数量均值
node_get_fsm_siblings_median 某节点前一分钟所有 GET 操作处理的兄弟数据数量中值
node_get_fsm_siblings_95 某节点前一分钟所有 GET 操作处理的兄弟数据数量 95 百分位值
node_get_fsm_siblings_100 某节点前一分钟所有 GET 操作处理的兄弟数据数量 100 百分位值
node_get_fsm_objsize_mean 某节点前一分钟流经 GET_FSM 的对象大小均值
node_get_fsm_objsize_median 某节点前一分钟流经 GET_FSM 的对象大小中值
node_get_fsm_objsize_95 某节点前一分钟流经 GET_FSM 的对象大小 95 百分位值
node_get_fsm_objsize_100 某节点前一分钟流经 GET_FSM 的对象大小 100 百分位值

 

 

 

 

 

 

 

4.2 集群

指标 功能 统计方法
read_repairs_total 集群前一分钟处理的读取修复操作数量 SUM(read_repairs)
node_get_fsm_siblings_mean_avg 集群前一分钟所有 GET 操作处理的兄弟数据数量均值 AVG(node_get_fsm_siblings_mean)
node_get_fsm_objsize_mean_avg 集群前一分钟流经 GET_FSM 的对象大小均值 AVG(node_get_fsm_objsize_mean)

 

5、其他

5.1 LevelDB 合并错误(单机)

采集方法:

find /data1/riak/data/leveldb -name "LOG" -exec grep -l 'Compaction error' {} \; | wc -l

5.2 LevelDB 读取块操作错误(单机)

采集方法:

/usr/sbin/riak-admin status
指标 功能
leveldb_read_block_error LevelDB 读取块操作错误数量

 

 

5.3 节点存活状态(单机)

采集方法:

/usr/sbin/riak-admin member-status | grep `ifconfig | grep "inet addr:10" | awk -F':' '{print $2}' | awk '{print $1}'`

输出如下,valid 表示节点正常

valid       9.0%      --      'riak@10.1.80.114'

5.4 Riak Error Log(单机)

Riak 日志路径:/data1/riak/logs 
采集文件:/data1/riak/logs/* 
采集时间段:最近一分钟 
采集内容:最近一分钟发生的错误数 
采集示例:grep error -rn /data1/riak/logs | wc -l 
说明:这个采集需要程序处理下逻辑,在此不给出完整的采集方法

  二、采集程序

1、Riak 监控系统设计

 DBA 通过前台页面根据 CMDB 三级业务添加 / 卸载 Riak 集群监控,根据 CMDB 的 ip 添加 Riak 单机监控(单机属于集群,不能单独存在,可增量添加单机监控),填写 ip 和端口,配置阈值、负责人等信息

1)数据库设计

mysql> use riakMonitor
show tabReading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+---------------------------+
| Tables_in_riakMonitor     |
+---------------------------+
| riakClusterConf           | 
| riakClusterDisplay        | 
| riakClusterStatus         | 
| riakClusterStatusTemplate | 
| riakSingleConf            | 
| riakSingleDisplay         | 
| riakSingleStatus          | 
| riakSingleStatusTemplate  | 
+---------------------------+
8 rows in set (0.00 sec)

Template 表作为历史库表模板,历史库按月分库,按 ip 分表

2) 单机 Agent 设计

  • Agent 会通过自动调度平台下发到目标机器,Crond 周期是 1 分钟,直接上报到 mysql 数据库。运行时间超过 45s 会被调度平台 kill
  • 如果检测不到 riak 或者命令出错则会发送 rtx 告警给 admins + dba, 系统错误会发送给 admins

3) 集群汇聚设计

  • 集群数据根据节点 agent 上报数据在 50s 的时候 select 出当前一分钟的数据计算汇聚入库
  • 程序每分钟都会清除 clusterStatus 的数据,如果 agent 在本分钟上报心跳异常或者上报时间不在集群程序运行前(50s),cluster 则不会统计该 ip 数据,但平均值计算时的除数会算上该 ip(+1)
  • 集群计算同时会写进历史库,并创建历史表

4) CGI 接口设计(NodeJs)

  • 异步接收 agent 上报的数据,根据 redis 的 ip 列表转换成 ip1
  • 如果 redis 获取的 ip1 不存在 singleConf 表中则会拒绝上报,返回 3003 错误
  • 上报成功会入 singleStatus 和历史库,并创建历史表

5) 代码列表

CGI :  /data/riakMonitor   # daemon
agent: /home/opd/script/riakMonitor  #  crond
analyzer: /opdData/opdOnline/script/kmc/riakMonitor/analyzer  # crond
1、从 CMDB 更新 single/cluster conf 数据  
2、同步 conf 和 display
3、解析 status 数据到 display
4、异常数据写入
5、告警
riakTool: /opdData/opdOnline/script/kmc/riakMonitor/riakTool  # daemon
每分钟第 50s 运行一次
1、获取监控集群和集群的 ip,计算结果并汇聚
2、操作 redis,将集群数据入历史库

2、采集程序部分代码 (单机,python2.4)

1) 采集指标函数

def getRiakMeta():
    thisFuncName = str(sys._getframe().f_code.co_name)
    cmdStr = "/usr/sbin/riak-admin status"
    cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
    if 0 != cmdCode:
        msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
        logger.error(msgTxt)
        sendRtx(MYCONF.riakAdmins, thisFuncName+" %s Fail:" % cmdStr)
        return 1

    data["node_gets"] = data["node_puts"] = data["node_get_fsm_time_mean"] = data["node_get_fsm_time_median"] = 0
    data["node_get_fsm_time_95"] = data["node_get_fsm_time_100"] = data["node_put_fsm_time_mean"] = 0
    data["node_put_fsm_time_median"] = data["node_put_fsm_time_95"] = data["node_put_fsm_time_100"] = 0
    data["sys_process_count"] = data["memory_processes"] = data["memory_processes_used"] = 0
    data["read_repairs"] = data["node_get_fsm_siblings_mean"] = data["node_get_fsm_siblings_median"] = 0
    data["node_get_fsm_siblings_95"] = data["node_get_fsm_siblings_100"] = data["node_get_fsm_objsize_mean"] = 0
    data["node_get_fsm_objsize_median"] = data["node_get_fsm_objsize_95"] = data["node_get_fsm_objsize_100"] = 0
    data["leveldb_read_block_error"] = 0

    riakItemInfo = cmdStdout.split('\n')
    for each in riakItemInfo:
        eachInfo = each.split(" : ")
        if 2 == len(eachInfo):
            itemKey = eachInfo[0]
            itemValue = eachInfo[1].replace('<<"', '').replace('">>', '')
            if itemKey in data: 
                logger.debug("%s:%s" % (itemKey, itemValue))
                try:
                    data[itemKey] = str(round(float(itemValue), 2))
                except ValueError:
                    data[itemKey] = itemValue
                except:
                    raise
    
    cmdStr = """ find /data1/riak/data/leveldb -name"LOG"-exec grep -l'Compaction error'{} \; | wc -l """
    cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
    if 0 != cmdCode:
        msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
        logger.error(msgTxt)
        sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
        return 1
    data["leveldb_compaction_error"] = cmdStdout #不用转 int
    
    cmdStr = "/usr/sbin/riak-admin member-status | grep %s" % data["mainIp"]
    cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
    logger.debug(cmdStdout)
    if 0 != cmdCode:
        msgTxt = "[%s] %s failed" % (thisFuncName, cmdStr)
        logger.error(msgTxt)
        sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
        return 1
    if cmdStdout.strip().startswith('valid'):
        data["is_active"] = 1
    else:
        data["is_active"] = 0

    data["riak_error_log"] = 0
    riakLogPath = "/data1/riak/logs/"
    if not os.path.isdir(riakLogPath):
        msgTxt = "[%s] %s not exists" % (thisFuncName, riakLogPath)
        logger.error(msgTxt)
        sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
        return 1
    riakLogInfo = os.listdir(riakLogPath)
    reportTimeSec = time.mktime(time.strptime(data["report_time"], "%Y-%m-%d %H:%M:%S"))
    for each in riakLogInfo:
        logger.debug("fileName: "+each)
        eachFile = os.path.join(riakLogPath, each)
        if os.path.isfile(eachFile):
            try:
                eachFd = open(eachFile, 'r')
            except IOError, e:
                msgTxt = "I/O error({}): {}".format(e.errno, e.strerror)
                logger.error(msgTxt)
                sendRtx(MYCONF.riakAdmins, thisFuncName+" Fail:" + msgTxt)
                return 1
            else:
                for eachLine in eachFd: #从头读,怕文件太大撑爆内存
                    if "error" in eachLine: #2016-03-20 04:57:09.704 [info] <0.19012.49>@riak_kv_index_h
                        eachInfo = eachLine.split(' ')
                        try:
                            eachTimeStr = "%s %s" % (eachInfo[0], eachInfo[1][:-4])
                            eachTimeSec = time.mktime(time.strptime(eachTimeStr, "%Y-%m-%d %H:%M:%S"))
                            if reportTimeSec - 60 <= eachTimeSec < reportTimeSec:
                                logger.debug(eachLine)
                                data["riak_error_log"] += 1
                            elif eachTimeSec >= reportTimeSec:
                                break
                        except:
                            msgTxt = "file(%s) format wrong " % eachFile
                            logger.error(msgTxt)
                            break
                            #sendRtx(MYCONF.riakAdmins, thisFuncName+"Fail:" + msgTxt)
                            #eachFile.close()
                            #return 1
                eachFd.close()
    return 0

2)  上报和失败重传函数

def report2server(content, retry):
    '''上报到入库程序,根据 ip 求余获取优先的 server,如果上报失败会遍历 server 列表'''
    thisFuncName = ""
    try:
        thisFuncName = str(sys._getframe().f_code.co_name)
        pos = data["ip"] % len(MYCONF.reportServer)
        serverKeys = MYCONF.reportServer.keys()
        serverKeys.sort()
        serverKeys = serverKeys[pos:] + serverKeys[:pos]
        for serverId in serverKeys:
            cmdStr = "/usr/bin/curl -s --connect-timeout %d -m %d -d'%s&reTry=%d'%s" %(MYCONF.curlConnectTimeout, MYCONF.curlMaxTimeout, content, retry, MYCONF.reportServer[serverId])
            cmdCode, cmdStdout, cmdStderr = getCmdResult(cmdStr)
            logger.info(cmdStr + "\ncmdCode:" + str(cmdCode) + "\n" + cmdStdout + cmdStderr)
            if 0 == cmdCode:
                return 0
        return 1
    except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
        return 1

def reportScheduler(reportRecord=0):
    '''reportRecord = 0 表示上报 data 中采集的新数据,reportRecord = 1 表示从 reportFailFile 里面获取最新的一条数据上报到 server,然后需要处理 reportFailFile'''
    thisFuncName = ""
    try:
        thisFuncName = str(sys._getframe().f_code.co_name)
        if 1 == reportRecord: # 从上报失败文件中获取最后一条数据,上报之
            if not reportFail.has_section("index"): #这里不要去 add_section("index") 该谁 add 谁 add 去
                return 0
            if not reportFail.has_option("index", "index") or "" == reportFail.get("index", "index").strip():
                return 0
            indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip())
            index = indexVec[-1]
            if "" == index:
                msgTxt = reportFail.get("index", "index").strip()
                sendRtx(MYCONF.admins, thisFuncName + "[系统错误] index.index 末尾有多余的逗号 " + msgTxt)
                return 1
            if not reportFail.has_option("content", index + "_c") or not reportFail.has_option("content", index + "_t"): 
                # _c 是内容  _t 是重试次数
                msgTxt = "content sector 缺少 %s_c 或 %s_t" %(index, index)
                sendRtx(MYCONF.admins, thisFuncName + "[系统错误] " + msgTxt)
                return 1
            content = reportFail.get("content", index + "_c")
            retry = reportFail.getint("content", index + "_t")
            retry += 1
            code = report2server(content, retry)
            if 0 == code: # 发送成功
                indexVec.remove(index)
                if indexVec:
                    reportFail.set("index", "index", ",".join(indexVec))
                else:
                    reportFail.set("index", "index", "")
                reportFail.remove_option("content", index + "_c")
                reportFail.remove_option("content", index + "_t")
            elif retry > MYCONF.maxRetry: # 重发失败,且超过最大重试次数
                indexVec.remove(index)
                if indexVec:
                    reportFail.set("index", "index", ",".join(indexVec))
                else:
                    reportFail.set("index", "index", "")
                reportFail.remove_option("content", index + "_c")
                reportFail.remove_option("content", index + "_t")
            else: # 重发失败,更新 _t (retry) 字段
                reportFail.set("content", index + "_t", retry)
        else: # 发送新数据
            index = data["report_time"].replace(" ", "").replace("-", "").replace(":", "")
            content = urllib.urlencode(data)
            retry = 0
            code = report2server(content, retry)
            if 0 == code:
                return 0
            if not reportFail.has_section("index"):
                reportFail.add_section("index")
                reportFail.set("index", "index", index)
                reportFail.add_section("content")
                reportFail.set("content", index + "_c", content)
                reportFail.set("content", index + "_t", retry)
            else:
                indexVec = MYCONF.splitRe.split(reportFail.get("index", "index").strip())
                indexVec.append(index)
                if len(indexVec) > MYCONF.maxFailRecord: # 超过最大 fail record 数
                    reportFail.set("index", "index", ",".join(indexVec[len(indexVec) - MYCONF.maxFailRecord:]))
                    reportFail.set("content", index + "_c", content)
                    reportFail.set("content", index + "_t", retry)
                    for i in range(0, len(indexVec) - MYCONF.maxFailRecord):
                        delIndex = indexVec[i]
                        reportFail.remove_option("content", delIndex + "_c")
                        reportFail.remove_option("content", delIndex + "_t")
                else:
                    reportFail.set("index", "index", ",".join(indexVec))
                    reportFail.set("content", index + "_c", content)
                    reportFail.set("content", index + "_t", retry)
        return 0
    except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
        return 1

3) 获取 shell 命令输出函数

def getCmdResult(cmdStr):
    '''获取 shell 命令的返回码,标准输出,标准错误'''
    #child = subprocess.Popen(cmdStr, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True)
    #cmdStdout, cmdStderr = child.communicate()
    #cmdCode = child.wait() 
    #return (cmdCode, cmdStdout, cmdStderr)
    thisFuncName = str(sys._getframe().f_code.co_name)
    nowTime = int(time.time())
    tmpstdout = os.path.join(MYCONF.basePath, "cmd.stdout.%d.tmp" % nowTime)
    tmpstderr = os.path.join(MYCONF.basePath, "cmd.stderr.%d.tmp" % nowTime)
    if "debug" == MYCONF.role:
        msgTxt = "[%d]Run Cmd: %s" % (nowTime, cmdStr)
        logger.debug(msgTxt)

    cmdStr = "(%s) 1>%s 2>%s" %(cmdStr, tmpstdout, tmpstderr)
    cmdCode = os.system(cmdStr) >> 8
    cdmStdout = cmdStderr = ""
    try:
        outfd = open(tmpstdout)
        cmdStdout = outfd.read()
        errfd = open(tmpstderr)
        cmdStderr = errfd.read()
    except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        sendRtx(MYCONF.admins, thisFuncName + " Fail:" + msgTxt)
        cmdCode = 110
    else:
        outfd.close()
        errfd.close()
        os.remove(tmpstderr)
        os.remove(tmpstdout)
    return (cmdCode, cmdStdout, cmdStderr)

4) 读 / 写 Cache 函数

def readLastCache():
    global lastCache
    lastCache = ConfigParser.ConfigParser()
    if not os.path.isfile(MYCONF.lastCacheFile):
        try:
            fd = open(MYCONF.lastCacheFile, "w")
        except IOError, e:
            logger.error("I/O error({}): {}".format(e.errno, e.strerror))
            return 1
        else:
            fd.close()
    lastCache.readfp(open(MYCONF.lastCacheFile), "rb")
    return 0

def writeCache():
    thisFuncName = ""
    try:
        thisFuncName = str(sys._getframe().f_code.co_name)
        lastCache.write(open(MYCONF.lastCacheFile, 'w'))
        return 0
    except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        logger.error(msgTxt)
    return 1

5)  读 / 写失败记录

def readFailRecord():
    global reportFail
    reportFail = ConfigParser.ConfigParser()
    if not os.path.isfile(MYCONF.lastReportFailFile):
        try:
            fd = open(MYCONF.lastReportFailFile, "w")
        except IOError, e:
            logger.error("I/O error({}): {}".format(e.errno, e.strerror))
            return 1
        else:
            fd.close()
    reportFail.readfp(open(MYCONF.lastReportFailFile), "rb")
    return 0

def writeFailRecord():
    thisFuncName = ""
    try:
        thisFuncName = str(sys._getframe().f_code.co_name)
        reportFail.write(open(MYCONF.lastReportFailFile, 'w'))
        return 0
    except:
        exceptmsg = StringIO.StringIO()
        traceback.print_exc(file=exceptmsg)
        msgTxt = exceptmsg.getvalue()
        logger.error(msgTxt)
    return 1

6)main 函数

def main():
    data["osType"] = 0 # 0 表示 linux
    data["version"] = MYCONF.version # 当前程序的自定义版本号
    data["report_time"] = time.strftime("%Y-%m-%d %H:%M:00") #上报时间,由于目前基础监控是分钟级监控粒度,因此秒取 00
    
    initLog()
    logger.info('='*80)
    if 0 == checkLastPid() and 0 == readLastCache() and 0 == getLoginIp():
        readFailRecord() # 读取早迁采集周期上报失败,需要重传的数据
        reportScheduler(reportRecord=1) #从 fail record 中选取最近的一条信息上报给服务器
        if 0 == getRiakMeta():
            reportScheduler(reportRecord=0)
        writeFailRecord()
        writeCache()
    logger.info('='*80)
    logging.shutdown()
    return

3、添加 / 卸载监控

1) 添加监控

添加监控需要先添加集群(不支持先添加 IP),添加集群会默认把所有 IP 都添加监控(前台将在 clusterConf 新增记录, 并在 singleConf 增加对应的 ip 记录,然后调用调度平台,检测 ip 是否已经安装)如果该集群在 CMDB 里面新增 Ip,则需要手动添加监控(前台提供新增监控节点,插入 singleConf)

2) 卸载监控

(1) 卸载监控可以卸载整个集群的监控(将 clusterConf needMonitor 置 0,同步将 singleConf 的 needMonitor 都置 0,然后调用
调度平台  卸载集群下的所有机器,如果该 ip 存在其他集群并且需要监控,则不用调用  调度平台  卸载)也可以卸载单个节点的监控 (前台将 singleConf 的 needMonitor 置 0,调用 调度平台,同样判断 ip 是否存在其他集群)(2) 添加卸载监控部由前台调用
调度平台  接口,并修改数据库(插入数据或者更新 need_monitor) (3) Single/cluster dislplay 表会同步 conf 表的数据,只保留 need_monitor= 1 的数据

4、CMDB 数据同步

后台一直同步 CMDB 的数据和 conf 表的数据,如果不在 CMDB 的则需要删掉 conf 里面的数据,不管 needMonitor 的值为多少。删除三级业务的话只需要删除 clusterConf 表对应的记录,single 会自动同步外键(尝试调用
调度平台卸载接口,卸载掉被删除的三级业务 ID 下面的所有已安装监控的 IP)

5、前台展示

1)集群状态展示

Riak 集群监控详解

 

Riak 集群监控详解

2) 单机节点状态展示

Riak 集群监控详解

 

Riak 集群监控详解

 

本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-11/136829.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-22发表,共计13914字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中