视频回放: https://www.bilibili.com/video/av14672079/
统计数据在 LeanTicket 的价值 00:00:00
通过用户提交工单和技术支持回复工单,我们可以收集一些指标,如:
- 新增工单数和活跃工单数
- 不同分类的工单量和响应速度
- 不同技术支持人员工单量和响应速度
可以通过上面的指标,我们可以做出一些改进:
- 技术支持压力较大(回复较慢)的部分可能是人力或文档投入不足,有针对性的改进可以提高技术支持效率。
- 工单量较大的分类(产品)可能存在改进空间,完善后可以提高产品质量。
统计功能的难点 00:04:14
- 数据量大带来的问题:统计功能经常会查询并计算大量的数据,可能因为查询到内存的数据量过大导致内存溢出(OOM),或者因为 LeanCloud 的 query 一次最多查询 1000 条记录,要遍历全表需要一些技巧。
- 统计需求多样:统计结果经常需要多种纬度,或者多样化的结果,不过这更多是和业务需求紧密相关,本主题的重点不在此,所以不做过多讨论。
本主题希望解决的问题:一个典型规模(如几万用户,单表几十万或百万记录)的 LeanCloud 应用,如何使用存储服务和云引擎,使用较小的代价完成数据统计功能。
基本思路 00:07:36
虽然统计的需求可能是多样的,但是对于一个业务模型相对稳定的应用(比如工单系统),最常用和普遍的统计指标是相对稳定的,我们可以通过程序将这些统计方式固化下来。
并且可以根据一定粒度(比如小时,天)将数据预先汇总并保存,汇总更大粒度的数据时只需要合并粒度较小的汇总数据,避免一次性查询和计算大量原始数据所带来的较高的查询和计算成本。
本主题使用的代码 https://github.com/leancloud/ticket ,版本 a029f58。
案例:每日活跃工单统计 00:10:03
对于工单系统,一个粒度较小的统计汇总指标——每日活跃工单统计。有了这样的数据,计算周、月活跃工单统计就很容易了。
对于「每日活跃工单统计」的关注点:
- 活跃工单的定义:当天如果有回复,即为活跃工单。
- 计算时间:每日凌晨计算前一天的数据,可以由云引擎定时任务触发云函数来执行。
- 统计内容:分别汇总出「分类」,「提交人」,「责任人」,「工单状态」,「回复次数」等数据。
具体实现(代码):
AV.Cloud.define('statsDaily', (req, res) => {
...
return getActiveTicket(start, end, authOptions)
.then((data) => {
return removeDailyStats(start, authOptions)
.then(() => {
data.date = start
return new AV.Object('StatsDaily')
.setACL(getStatsAcl())
.save(data, authOptions)
})
})
.then(res.success)
.catch(res.error)
})
上面定义了一个云函数,由云引擎的定时器每天凌晨触发。
技巧:将统计函数实现为幂等
因为统计函数可能执行耗时较长,涉及到的计算很多,出现异常的可能性较大;或者因为统计算法调整导致之前的计算结果无效。种种原因导致希望统计函数能重复执行,所以在保存数据之前会先尝试清理掉已有的数据:
const removeDailyStats = (date, authOptions) => {
return new AV.Query('StatsDaily')
.equalTo('date', date)
.destroyAll(authOptions)
}
具体的统计活跃工单的方法在 getActiveTicket
方法,在看该方法之前,我们先看一个辅助函数: reduceAVObject
。
技巧:边查询边汇总
reduceAVObject
函数的作用是需要查询大量数据并汇总,在这个过程中尽量减少内存占用,防止 OOM 。基本思想就是查出来一部分数据就汇总一部分数据,然后再查询再汇总。之前查询的结果集因为不再持有引用就可以被垃圾回收,内存中每次只保存一少部分原数据和一个汇总数据。
具体实现(代码):
const reduceAVObject = (query, fn, accumulator, authOptions) => {
query.limit(1000)
.ascending('createdAt')
const innerFn = () => {
return query.find(authOptions)
.then((datas) => {
_.forEach(datas, (data) => {
accumulator = fn(accumulator, data)
})
if (datas.length !== 1000) {
return accumulator
} else {
query.greaterThan('createdAt', _.last(datas).get('createdAt'))
return innerFn()
}
})
}
return innerFn()
}
该方法主要是三个参数(第四个参数只是为了 ACL 鉴权):
-
query
:AV.Query
对象,调用方设置好自己的查询条件,在reduceAVObject
中会重复查询多次,每次查询会重新设置query.greaterThan('createdAt', newTime)
查询范围,递归进行数据遍历。
-
fn
:一个函数,作用是将「汇总对象」和结果集中的一个对象进行合并,结果作为新的「汇总对象」再和遍历结果集中下一个元素进行合并:
_.forEach(datas, (data) => {
accumulator = fn(accumulator, data)
})
整个 reduceAVObject
的处理过程就是:
根据 query 查询 1000 条记录。
遍历结果集,使用 accumulator
不断的和结果集中的元素合并(汇总)。
-
结果集数组长度是否等于 1000:
- 如果不是,说明
query
匹配的结果已经全部查询完成,此时 accumulator
为最终汇总的对象,返回。
- 如果是,说明
query
受到 limit 1000
的限制,修改查询条件,递归调用:
query.greaterThan('createdAt', _.last(datas).get('createdAt'))
return innerFn()
技巧:使用 createdAt 做范围查询代替 skip 参数
在 reduceAVObject
更改查询条件递归调用时,并没有使用 skip
参数,而是用 createdAt
做范围查询,这样对大量数据比较靠后的「分页查询」性能不会恶化。因为在 skip
较大时(比如十几万),虽然跳过的数据不会在最终的结果集中,但是为了确定哪些记录要跳过,在数据库中还是会筛选出来这部分数据,也就意味着大量的数据被查到的意义只是为了「跳过」,跳过的结果集越大,整个查询的效率就会越低。
因为 createdAt
默认有索引,所以如果使用 createdAt
作时间范围查询(用上一次结果集的头或末尾记录的 createdAt
),通过索引可以直接定位到要查询的开始或者结束位置,不存在「先查询再跳过」的过程,所以「页数」较大时查询性能也不会恶化。
了解了 reduceAVObject
的处理方式之后,getActiveTicket
函数就很好理解了(代码):
const getActiveTicket = (start, end, authOptions) => {
const query = new AV.Query('Reply')
.greaterThanOrEqualTo('createdAt', start)
.lessThan('createdAt', end)
.include('ticket')
return reduceAVObject(query, (result, reply) => {
...
return result
}, {
activeTickets: [],
})
.then(({activeTickets}) => {
...
})
}
处理流程:
- 根据业务需求定义
query
对象。
- 调用
reduceAVObject
函数,并定义汇总数据的初始值 activeTickets: []
和数据合并方法来合并数据:
return reduceAVObject(query, (result, reply) => {
const ticket = reply.get('ticket')
let ticketInfo = _.find(result.activeTickets, {objectId: ticket.id})
if (!ticketInfo) {
ticketInfo = {
objectId: ticket.id,
categoryId: ...
...
replyCount: 0
}
result.activeTickets.push(ticketInfo)
}
ticketInfo.replyCount++
return result
}
- 整理合并数据并最终返回:
.then(({activeTickets}) => {
return {
tickets: _.map(activeTickets, 'objectId'),
replyCount: ...
...
}
})
最终,我们会在数据表中保存这样的结果:
{
"assignees": {
"583bc7f061ff4b00xxxxxxxx": 2,
"5757b78b5bbb5000xxxxxxxx": 2,
...
},
"date": "2017-09-05T16:00:00.000Z",
"replyCount": 999,
"ACL": {
"role:customerService": {
"read": true
}
},
"joinedCustomerServices": {
"583bc7f061ff4b00xxxxxxxx": 3,
...
},
"categories": {
"564ea39800b0ee7fxxxxxxxx": 4,
...
},
"authors": {
"58733781128fe100xxxxxxxx": 1,
...
},
"tickets": [
"59873e6cac502e00xxxxxxxx",
...
],
"statuses": {
"120": 99,
"160": 88,
"250": 77,
"280": 66
},
"objectId": "59b03795a0bb9f00xxxxxxxx",
"createdAt": "2017-09-06T17:59:49.519Z",
"updatedAt": "2017-09-06T17:59:49.519Z"
}
案例:单工单统计 00:32:41
对于工单系统,另一个粒度较小的统计汇总指标——单工单统计,得到每个工单的「首次响应时间」、「平均响应时间」等信息,然后就可以通过时间或者分类等条件筛选多个工单,合并这些工单的统计信息即可获取对应维度的统计信息。
对于「每日活跃工单统计」的关注点:
- 统计时间:在每个工单关闭时统计是非常好的时间点,这样可以天然的将统计任务分散开,防止集中计算带来的 CPU 和内存高峰。对于未完成的工单,可以每天定时将所有未完成的工单进行统计,在工单完成时会再次触发统计,将之前统计的数据覆盖。
- 统计内容:
- 首次响应时间:用户提交工单之后,距离客服第一次回复的时间。
- 平均响应时间:在一个工单中,每次用户回复到客服反馈的平均时间。因为可能有多个客服参与,所以根据客服分组。
具体实现(代码):
AV.Cloud.define('statsTicket', (req, res) => {
...
exports.statsTicket(ticketId, authOptions)
.then((data) => {
...
return new AV.Object('StatsTicket')
.setACL(getStatsAcl())
.save(data, authOptions)
...
})
.then(res.success)
.catch(res.error)
})
该云函数会在 Ticket
状态变更为「关闭」时调用(代码):
AV.Cloud.afterUpdate('Ticket', (req) => {
...
if (ticket.updatedKeys.includes('status')
&& ticketClosedStatuses().includes(ticket.get('status'))) {
AV.Cloud.run('statsTicket', {ticketId: ticket.id})
.catch(errorHandler.captureException)
}
..
}
也会通过定时任务触发,将所有「未关闭」的工单进行统计(代码):
AV.Cloud.define('statsOpenedTicket', (req, res) => {
res.success()
forEachAVObject(new AV.Query('Ticket')
.containedIn('status', ticketOpenedStatuses())
, (ticket) => {
return AV.Cloud.run('statsTicket', {ticketId: ticket.id})
.catch((err) => {
console.log('err >>', ticket.id, err)
})
}, {useMasterKey: true})
})
在 statsOpenedTicket
中调用了 forEachAVObject
方法,该方法和 reduceAVObject
方法类似(只是没有「汇总对象」),目标都是用较少的内存占用来遍历一个极大的结果集(代码):
const forEachAVObject = (query, fn, authOptions) => {
query.limit(1000)
.descending('createdAt')
const innerFn = () => {
return query.find(authOptions)
.then((datas) => {
if (datas.length === 0) {
return
}
return Promise.each(datas, fn)
.then(() => {
query.lessThan('createdAt', _.last(datas).get('createdAt'))
return innerFn()
})
})
}
return innerFn()
}
具体统计单个工单信息的实现(代码):
exports.statsTicket = (ticketId, authOptions) => {
return getTicketAndTimeline(ticketId, authOptions)
.then(({ticket, timeline}) => {
const firstReplyStats = new FirstReplyStats(ticket)
const replyTimeStats = new ReplyTimeStats(ticket)
_.forEach(timeline, (replyOrOpsLog) => {
firstReplyStats.forEachReplyOrOpsLog(replyOrOpsLog)
replyTimeStats.forEachReplyOrOpsLog(replyOrOpsLog)
})
return {
ticket,
firstReplyStats: firstReplyStats.result(),
replyTimeStats: replyTimeStats.result(),
}
})
}
这个方法分为两部分:
- 获取指定工单的时间线(所有回复和操作记录)
getTicketAndTimeline
,该函数的操作就是一些普通的查询和数据操作,不详细展开。
- 将工单的回复和操作记录提供给不同的统计项进行统计,比如现有的
firstReplyStats
和 replyTimeStats
就是分别汇总「首次响应时间」和「平均响应时间」的。
每个统计项写成一个 Class,并提供两个方法:
-
forEachReplyOrOpsLog
:外层会遍历一个工单的时间线对象,并调用该方法,该方法内部统计项自己汇总相关的指标。
-
result
:外层通过该方法获取统计结果,该方法还可以做一些计算收尾工作。
统计项具体的汇总逻辑详见 FirstReplyStats
和 ReplyTimeStats
这两个 Class 的定义。这样以后增加新的统计项只要增加一个类似的 Class 即可。这样组织外层负责循环,并调用所有统计项的相关方法,每个统计项自己决定如何统计,代码相对清晰。
最终,我们会在数据表中保存这样的结果:
{
"ACL": {
"role:customerService": {
"read": true
}
},
"ticket": {
"__type": "Pointer",
"className": "Ticket",
"objectId": "59afcdcb128fe1xxxxxxxx"
},
"firstReplyStats": {
"ticketId": "59afcdcb128fe100xxxxxxxx",
"userId": "5753ffc3207703006xxxxxxx",
"firstReplyTime": 1664394
},
"replyTimeStats": [
{
"ticketId": "59afcdcb128fe100xxxxxxxx",
"userId": "5753ffc320770300xxxxxxxx",
"replyCount": 1,
"replyTime": 1664394
},
{
"ticketId": "59afcdcb128fe100xxxxxxxx",
"userId": "583bc7f061ff4b00xxxxxxxx",
"replyCount": 4,
"replyTime": 188141639
}
],
"objectId": "59bf437161ff4b3fxxxxxxxx",
"createdAt": "2017-09-18T03:54:25.364Z",
"updatedAt": "2017-09-18T03:54:25.364Z"
}
案例:周统计 01:00:10
有了上面「每日活跃工单统计」和「单工单统计」,我们可以使用时间维度合并记录,生成「周统计」。
具体实现(代码):
AV.Cloud.define('getStats', (req) => {
let {start, end, timeUnit} = req.params
...
const dateRanges = getDateRanges(start, end, timeUnit)
return Promise.map(dateRanges, ({start, end}) => {
return getDailyAndTicketStatses(start, end, authOptions)
.then(({dailyStatses, ticketStatses}) => {
const result = _.reduce(dailyStatses, (result, statsDaily) => {
// 汇总逻辑
}, {
// 初始化汇总结果
})
result.firstReplyTimeByUser = ...
...
return result
})
})
})
getDateRanges
的作用是把传入的一个大的时间跨度分成不同的时间区间,比如:
console.log(getDateRanges(new Date('2017-09-01'), new Date('2017-09-10'), 'week'))
// output:
// [ { start: 2017-09-01T00:00:00.000Z,
// end: 2017-09-08T00:00:00.000Z },
// { start: 2017-09-08T00:00:00.000Z,
// end: 2017-09-10T00:00:00.000Z } ]
然后通过 getDailyAndTicketStatses
方法获取需要的统计信息,再合并统计信息并生成最终结果。
在 getDailyAndTicketStatses
方法有一个细节(代码):
const getDailyAndTicketStatses = (start, end, authOptions) => {
...
return new AV.Query('StatsDaily')
...
.then((dailyStatses) => {
const ticketIds = _.chain(dailyStatses)
.map(stats => stats.get('tickets'))
.flatten()
.uniq()
.value()
return getTicketStats(ticketIds, authOptions)
.then((ticketStatses) => {
return {dailyStatses, ticketStatses}
})
})
}
上面的代码先通过时间段查询「每日活跃工单统计」数据,再取出其中的 ticketId
列表,通过列表查询所有「单工单统计」数据。此时需要注意,有可能 ticketId
列表非常大,比如几千上万个,不能直接拼接一个 query.containedIn
进行查询,很可能遇到 URI 超长的问题。建议这样处理(代码):
const getTicketStats = (ticketIds, authOptions) => {
return Promise.all(_.map(_.chunk(ticketIds, 50), (ids) => {
return new AV.Query('StatsTicket')
.containedIn('ticket', ids.map(id => new AV.Object.createWithoutData('Ticket', id)))
.find(authOptions)
}))
.then(_.flatten)
}
先通过 _.chunk
方法把一个大数组切分成多个小数组,然后分批查询,再将多个小的结果集通过 _.flatten
合并成一个大结果集返回。
最终,调用 getStats
云函数会获取到类似这样的结果:
{
"result": [
{
"date": "2017-09-07T16:00:00.000Z",
"assignees": {
"583bc7f061ff4b00xxxxxxxx": 9,
"5757b78b5bbb5000xxxxxxxx": 8,
...
},
"authors": {
"5757b7812e958a00xxxxxxxx": 7,
...
},
"categories": {
"564ea39800b0ee7fxxxxxxxx": 6,
...
},
...
"replyCount": 224,
"tickets": [
"59ccccba17d00900xxxxxxxx",
...
],
"firstReplyTimeByUser": [
{
"userId": "5757b7837db2a200xxxxxxxx",
"replyTime": 41615901,
"replyCount": 3
},
...
],
"replyTimeByUser": [ ... ],
"firstReplyTime": 704285510,
"firstReplyCount": 100,
"replyTime": 5381664170
},
...
]
}
客户端展现 01:13:28
客户端会调用 getStats
云函数,获取对应时间区间的统计数据并展现在页面,具体代码在 ./modules/CustomerServiceStats.js ,主要就是绘制表格和图表,这里就不展开介绍了。