用户画像和留存预测模型

目标

1.spark从hive获取数据对用户特征进行处理写入hbase
2.保留30天用户特征数据,用于模型训练(每天训练一次,离线预测)

用户画像设计

用户画像通俗说则是根据用户的行为给用户打标签,我们将用户行为划分为以下几个维度,分别是:
基础信息: 用户年龄,性别,等级等相关信息

环境信息: 用户使用app时的地域,设备,网络情况(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)

行为偏好: 用户常玩的app类型, 登录时间,在线时长等(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)

付费偏好: 用户充值额度, 付费次数, 付费商品类型, 付费时间段等等(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)

社交特征: 如果存在社交系统, 可以统计一下好友聊天, 好友开局等信息(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)

策略玩法特征: 具体游戏(或其他)应用核心内容的玩法, 使用等信息(可分为统计日,最近7日,最近30日,累计四个时间维度对特征进行统计)

调整数值特征: 可以通过该一系列特征来调整付费,留存的字段

用户画像Hbase表结构: user_profile

列簇 字段
key (rowKey关键字段) time 统计日 game_id 游戏ID uid用户id
info (基础信息) $game_sex 游戏性别[man, woman] $vip vip等级 $level 等级
env_tag (环境偏好特征) $address 地址分布[.*] $device 设备分布 [huawei, oppo, apple, ...] $os 系统分布[android, ios] $sim 运营商分布 [yidong, liantong, ...]
favor_tag (行为偏好特征) $game 常玩游戏类型[tcg, rpg, mmo, ...] $login_time 登录时段分布[00~23] $online_time 在线时长分布[d+]
pay_tag (付费偏好特征) $pay_amount 充值额度分布[count、max、min、avg] $pay_frequency 付费次数分布[count、max、min、avg] $pay_money 支付方式分布(基于金额) [wechat, applepay, ...] $pay_count 支付方式分布(基于次数) [wechat, applepay, ...]
social_tag (社交偏好特征) $new_friend_count 新增好友[count、max、min、avg] $chat_count 好友聊天次数[count、max、min、avg] $chat_count_friend 游戏好友聊天次数[d+] $chat_count_wechat 微信好友聊天次数[d+]
strategy_tag (策略偏好特征) $pvp_count 竞技次数[count、max、min、avg] $pve_count 副本次数[count、max、min、avg] $role_type 角色类型使用 [ap, ad, other] $fight_type 战斗类型 [pvp, pve]
adjust_tag (调整数值特征) $allowance 破产补贴

[图片上传失败...(image-5e7d98-1643355969865)]

Hive数据结构

hive 
show create table  action  (此为表名, 我这边是action表) 

输入输出格式为ORC, Presto针对这种格式的数据做了优化查询, 如果是impala查询则使用parquet格式。

CREATE TABLE `action`(                             
   `uid` string,                                    
   `uid_type` string,                               
   `agent` string,                                  
   `ip` string,                                     
   `timestamp` timestamp,                           
   `time` timestamp,                                
   `year` string,                                   
   `month` string,                                  
   `week` string,                                   
   `hour` string,                                   
   `minute` string,                                 
   `properties` map<string,string>)                 
 PARTITIONED BY (                                   
   `game_id` int,                                   
   `timezone` string,                               
   `event` string,                                  
   `day` date)                                      
 ROW FORMAT SERDE                                   
   'org.apache.hadoop.hive.ql.io.orc.OrcSerde'      
 WITH SERDEPROPERTIES (                             
   'colelction.delim'=',',                          
   'field.delim'='\t',                              
   'mapkey.delim'=':',                              
   'serialization.format'='\t')                     
 STORED AS INPUTFORMAT                              
   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'  
 OUTPUTFORMAT                                      
   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' 
 LOCATION                                           
   'hdfs://slaves01:8020/warehouse/tablespace/managed/hive/event.db/action' 
 TBLPROPERTIES (                                   
   'auto-compaction'='true',                       
   'bucketing_version'='2',                        
   'compaction.file-size'='128MB',                 
   'sink.partition-commit.delay'='0s',             
   'sink.partition-commit.policy.kind'='metastore,success-file',  
   'sink.partition-commit.trigger'='process-time',  
   'sink.shuffle-by-partition.enable'='true',       
   'transient_lastDdlTime'='1642571371')

Spark2Hbase逻辑处理

1.获取游戏列表和对应时区(或者传入字符串数组)

2.创建用户画像表user_profile(设置TTL为30天,压缩方式为snappy)

3.计算不同特征tag, 写入用户画像表

源代码

├── pom.xml
└── src
    └── main
        ├── resources
        │   ├── application.conf      # 默认配置
        │   ├── code2area.csv
        │   ├── hive-site.xml               # 将/usr/hdp/current/spark2-client/conf/hive-site.xml拷贝过来
        │   └── reference.conf
        └── scala
            ├── com
            │   └── carol
            │       └── bigdata
            │           ├── App.scala               # 程序主入口 
            │           ├── Config.scala        # 配置参数
            │           ├── Task.scala          # 程序主要任务
            │           ├── constant
            │           │   └── KVConstant.scala
            │           ├── task                        # 具体任务包
            │           │   └── feature
            │           │       ├── CalEnvTag.scala                     # 计算环境偏好特征
            │           │       ├── CalFavorTag.scala                   # 计算行为偏好特征
            │           │       ├── CalPayTag.scala                     # 计算付费特征
            │           │       └── cag
            │           │           ├── calAdjustTag.scala    # 计算调整特征(此处由用户自行填写)
            │           │           ├── calSocialTag.scala      # 计算社交特征
            │           │           └── calStrategyTag.scala  # 计算策略特征
                │           │   └── label
            │           │       └── calRetentionLabel.scala   # 计算留存标签
            │           │   └── model   
            │           │       ├── algo           # 算法模型
            │           │       │   ├── DT.scala
            │           │       │   ├── LR.scala
            │           │       │   ├── ModelTrait.scala   # 定义算法模型接口
            │           │       │   ├── ModelUtil.scala
            │           │       │   ├── RF.scala
            │           │       │   └── SVM.scala
            │           │       ├── feature
            │           │       │   └── FeatureUtil.scala  # 特征工程
            │           │       └── train
            │           │           └── TrainRetention.scala   # 留存预测训练Demo
            │           └── utils
            │               ├── FeatureUtil.scala           # 计算特征公共函数
            │               ├── Flag.scala                      # 命令行参数
            │               ├── FuncUtil.scala
            │               ├── HBaseFilter.scala           # hbase建表,读过滤操作
            │               ├── HBaseUtil.scala             # hbase建表,读写操作
            │               ├── RddReader.scala             # 读取hive/hdfs转换为RDD
            │               └── TimeUtil.scala        # 时间处理
            └── org
                └── apache
                    └── hadoop
                        └── hive
                            └── shims
                                └── ShimLoader.java    # hadoop3不支持,在源码93行,加入对case 3 version的支持
                        └── spark
                            └── ml
                                └── feature
                                    └── VectorDisassembler.scala  # 将合并列拆分


样本设计

标签设计

假设此处需要预测活跃留存, 以便调整相关特征提升用户留存率。key为hbase表的rowkey相关字段, retention为标签字段, active_r1代表用户在次日仍然活跃则为1, 否则为0, 因此我们将此模型抽象为一个二分类模型。(TTL 30days)

列簇 字段
key time 统计日 game_id 游戏ID uid 用户ID
retention active_r1 次日活跃留存[0,1] active_r2 2日活跃留存[0,1] active_r3 3日活跃留存[0,1] active_r7 7日活跃留存[0,1] active_r15 15日活跃留存[0,1] active_r30 30日活跃留存[0,1]
prediction pred_active_r1 次日活跃留存[0,1] pred_active_r2 2日活跃留存[0,1] pred_active_r3 3日活跃留存[0,1] pred_active_r7 7日活跃留存[0,1] pred_active_r15 15日活跃留存[0,1] pred_active_r30 30日活跃留存[0,1]
image-20220128132303934.png

样本标签表

根据用户画像和标签表, 合并成样本标签表,进行特征工程处理, 其中合并原则为以每天标签表为左表join用户画像表。(在特征工程中合并训练)

time统计, game_id 游戏ID, uid 用户ID, 用户特征(env_tag, favor_tag,...), 实际标签(active_r1 0/1)

算法模型

特征工程

将特征列类型划分为int, double, map,string列, 分别对不同的列标记tag或分段, 也可以对数值列进行标准化。如图所示, 最后一列是次日留存的标签, 前面是随意摘取的特征列, 这里使用的是spark的hash特征向量化。

image-20220128152812746.png

模型封装

定义模型初始化, 训练, 预测, 保存接口, 具体的算法实现接口即可。

package com.carol.bigdata.task.model.algo

import java.io.File

import org.apache.spark.ml.classification.{Classifier, OneVsRest}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.DataFrame


trait ModelTrait {
    // 未实现函数,子类集成必须实现,已实现函数子类可以直接使用

    def init(params: Map[String, Any]): Unit

    // 构建pipeline模型
    def buildPipeline(featuresCol: String = "features",
                      labelCol: String = "label",
                      rawPredictionCol: String = "rawPrediction",
                      predictionCol: String = "prediction",
                      objective: String = "binary",
                      numClass: Int = 2): Pipeline


    // 交叉验证
    def buildValidator(pipeline: Pipeline,
                       seed: Int = 1,
                       numFolds: Int = 2,
                       parallelNum: Int = 2,
                       objective: String = "binary"): CrossValidator


    // 基于二分类器构建任意分类的PipeLine
    def buildOvrTree(binaryModel: Classifier[_, _, _],
                     featuresCol: String = "features",
                     labelCol: String = "label",
                     rawPredictionCol: String = "rawPrediction",
                     predictionCol: String = "prediction",
                     objective: String = "binary"): OneVsRest = {

        // 构造多分类器
        val ovrTree: OneVsRest = new OneVsRest()
            .setClassifier(binaryModel)
            .setFeaturesCol(featuresCol)
            .setLabelCol(labelCol)
            .setPredictionCol(predictionCol)

        ovrTree

    }


    // 基于pipeline和gridBuilder构建交叉验证器
    def buildValidatorFromGrid(pipeline: Pipeline,
                               gridBuilder: ParamGridBuilder,
                               seed: Int = 1,
                               numFolds: Int = 2,
                               parallelNum: Int = 2,
                               objective: String = "binary"): CrossValidator = {
        // 交叉验证
        val paramGrid: Array[ParamMap] = gridBuilder.build()
        // 评估器
        val evaluator = {
            if (objective == "binary")
                new BinaryClassificationEvaluator
            else new MulticlassClassificationEvaluator
        }
        // 交叉验证模型
        val cv: CrossValidator = new CrossValidator()
            .setEstimator(pipeline)
            .setEstimatorParamMaps(paramGrid)
            .setEvaluator(evaluator)
            .setSeed(seed)
            .setNumFolds(numFolds)          // Use 3+ in practice
            .setParallelism(parallelNum)    // Evaluate up to 2 parameter settings in parallel
            //.setCollectSubModels(true)    // specified to collect all validated models
        cv
    }


    // 获取最优超参
    def getBestParams(crossModel: CrossValidatorModel): Map[String, Any] = {
        val bestParamsMap = crossModel.getEstimatorParamMaps.zip(crossModel.avgMetrics).maxBy(_._2)._1
        bestParamsMap.toSeq.map(pair => (pair.param.name, pair.value)).toMap
    }


    // 保存模型
    def saveModel(pipeline: PipelineModel, modelPath: String = "model"): Unit = {
        // save到本地货HDFS,供PipelineModel加载
        println(s"pipeline model saving...")
        pipeline.write.overwrite.save(modelPath)
        println(s"pipeline model save success to $modelPath")
    }


    // 评估模型
    def evalModel(evalData: DataFrame, objective: String = "binary"): Unit = {
        val evaluator = {
            if (objective == "binary")
                new BinaryClassificationEvaluator
            else new MulticlassClassificationEvaluator
        }
        val accuracy: Double = evaluator.evaluate(evalData)
        println("accuracy:", accuracy)
    }


    // 更新微调参数
    def updateTuneParams(bestParamsMap: ParamMap): Unit
    def updateTuneParamsFromCV(crossModel: CrossValidatorModel,
                               maxBy: Boolean = true,
                               objective: String = "binary"): Unit

}

模型应用

将调整数值特征划分为一系列段位, 将当日活跃用户左连用户画像特征并按段位遍历组成新的特征标签表, 使用训练好的模型进行预测, 选择用户留下来的分段区间, 从小到大进行排列, 写入离线预测表。

系列文章

第一篇: Ambari自动化部署
第二篇: 数据埋点设计和SDK源码
第三篇: 数据采集和验证方案
第四篇: ETL实时方案: Kafka->Flink->Hive
第五篇: ETL用户数据处理: kafka->spark->kudu
第六篇: Presto分析模型SQL和UDF函数
第七篇: 用户画像和留存预测

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,647评论 18 139
  • 转自千峰王溯老师 1、用户画像项目简介 1.1 什么是用户画像 所谓的用户画像就是给用户贴一些标签,通过标签说明用...
    如虎添阅读 4,587评论 0 16
  • 理解数据 对用户行为的分析可以简单地理解为用户画像,关于我自己对用户画像的理解: 我们每个人都有自己的微信成员,给...
    许瀚阅读 757评论 0 1
  • 一. Java基础部分.................................................
    wy_sure阅读 3,810评论 0 11
  • 标签数据开发是用户画像体系中最重要的一环,主要包括离线标签开发、实时标签开发、用户特征库开发、人群计算、打通数据服...
    一只森林鹿Luluzeng阅读 12,125评论 0 55