博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《Spark机器学习》笔记——Spark回归模型(最小二乘回归、决策树回归,模型性能评估、目标变量变换、参数调优)
阅读量:2493 次
发布时间:2019-05-11

本文共 12203 字,大约阅读时间需要 40 分钟。

数据集说明:

数据集下载地址

http://archive.ics.uci.edu/ml/machine-learning-databases/00275/Bike-Sharing-Dataset.zip

=========================================

hour.csv和day.csv都有如下属性,除了hour.csv文件中没有hr属性以外
- instant: 记录ID
- dteday : 时间日期
- season : 季节 (1:春季, 2:夏季, 3:秋季, 4:冬季)
- yr : 年份 (0: 2011, 1:2012)
- mnth : 月份 ( 1 to 12)
- hr : 当天时刻 (0 to 23)
- holiday : 当天是否是节假日(extracted from http://dchr.dc.gov/page/holiday-schedule)
- weekday : 周几
- workingday : 工作日 is 1, 其他 is 0.
+ weathersit : 天气
- 1: Clear, Few clouds, Partly cloudy, Partly cloudy
- 2: Mist + Cloudy, Mist + Broken clouds, Mist + Few clouds, Mist
- 3: Light Snow, Light Rain + Thunderstorm + Scattered clouds, Light Rain + Scattered clouds
- 4: Heavy Rain + Ice Pallets + Thunderstorm + Mist, Snow + Fog
- temp : 气温 Normalized temperature in Celsius. The values are divided to 41 (max)
- atemp: 体感温度 Normalized feeling temperature in Celsius. The values are divided to 50 (max)
- hum: 湿度 Normalized humidity. The values are divided to 100 (max)
- windspeed: 风速Normalized wind speed. The values are divided to 67 (max)
- casual: 临时用户数count of casual users
- registered: 注册用户数count of registered users
- cnt: 目标变量,每小时的自行车的租用量,包括临时用户和注册用户count of total rental bikes including both casual and registered
=========================================

使用sed 1d hour.csv > hour_noheader.csv去掉第一行

使用jupyter notebook进入开发环境

代码:

from pyspark import SparkContext#导入Spark的Python库sc = SparkContext("local","bike")#本地运行。path = "file:///home/chenjie/bike/hour_noheader.csv"#已将数据集放在此路径下,并已去掉第一行的头信息raw_data = sc.textFile(path)#加载数据records = raw_data.map(lambda x: x.split(","))#用,隔开,获得记录num_data = raw_data.count()#得到总记录数first = records.first()#得到第一行print first#打印第一行print num_data#打印总总记录数def get_mapping(rdd, idx):    return rdd.map(lambda fields: fields[idx]).distinct().zipWithIndex().collectAsMap()#以上定义了一个映射函数:首先将第idx列的特征值去重,然后对每个值使用zipWithIndex函数映射到一个唯一的索引,这样就组成了一个RDD的键值映射#键是变量,值是索引print records.map(lambda fields: fields[2]).distinct().collect()#输出结果为[u'1', u'3', u'2', u'4']#表明第三列(季节)只有1,3,2,4四种取值print "Mapping of first categorical feasture column: %s" % get_mapping(records, 2)#Mapping of first categorical feasture column: {u'1': 0, u'3': 1, u'2': 2, u'4': 3}#将1,3,2,4映射到0,1,2,3mappings = [get_mapping (records, i) for i in range(2,10) ]#将记录中的每一列都这样映射cat_len = sum(map(len, mappings))#类型变量的长度num_len = len(records.first()[11:15])#实数变量的长度total_len = num_len + cat_len#总长度print "Feature vector length for categorical features : %d" % cat_lenprint "Feature vector length for numerical features : %d" % num_lenprint "Total feature vector length : %d" % total_len#1、为线性模型创建特征向量from pyspark.mllib.regression import LabeledPointimport numpy as np#以下函数为线性模型创建特征向量def extract_features(record):    cat_vec = np.zeros(cat_len)#先新建一个cat_len长度的0向量    i = 0    step = 0    for field in record[2 : 9]:        m = mappings[i]#某属性的特征map        idx = m[field]#得到下标        cat_vec[idx + step] = 1 #将cat_vec向量的对应位置置为1        i = i + 1        step = step + len(m)#步数往后走    num_vec = np.array([float(field) for field in record[10 : 14]])#取出实数属性组成实数向量    return np.concatenate((cat_vec, num_vec))#将类别向量和实数向量合并def extract_label(record):    return float(record[-1])data = records.map(lambda r : LabeledPoint(extract_label(r), extract_features(r)))first_point = data.first()print "Raw data :" + str(first[2:]) print "Label:" + str(first_point.label)print "Linear Model feature vector:\n" + str(first_point.features)print "Linear Model feature vector length :" + str(len(first_point.features))#以下部分代码创建决策树的特征向量def extract_features_dt(record):    return np.array(map(float, record[2 : 14]))data_dt = records.map(lambda r : LabeledPoint(extract_label(r), extract_features_dt(r)))first_point_dt = data_dt.first()print "Decision Tree feature vector: " + str(first_point_dt.features)print "Decision Tree feature vector length :" + str(len(first_point_dt.features))#以下部分使用线性回归训练模型from pyspark.mllib.regression import LinearRegressionWithSGDfrom pyspark.mllib.tree import DecisionTreelinear_model = LinearRegressionWithSGD.train(data, iterations = 10, step = 0.1, intercept = False)true_vs_predicted = data.map(lambda p : (p.label, linear_model.predict(p.features)))print "Linear Model predictions " + str(true_vs_predicted.take(5))#以下部分使用决策树训练模型dt_model = DecisionTree.trainRegressor(data_dt, {})preds = dt_model.predict(data_dt.map(lambda p : p.features))actual = data.map(lambda p : p.label)true_vs_predicted_dt = actual.zip(preds)print "Decision Tree predictions :" + str(true_vs_predicted_dt.take(5))print "Decision Tree depth :" + str(dt_model.depth())print "Decision Tree number of nodes :" + str(dt_model.numNodes())#均方误差def squared_error(actual, pred):    return (pred - actual) ** 2#平均绝对误差def abs_error(actual, pred):    return np.abs(pred - actual)#均方根对数误差def squared_log_error(actual, pred):    return (np.log(pred + 1) - np.log(actual + 1)) ** 2#以下部分得到线性回归的三个指标mse = true_vs_predicted.map(lambda (t, p): squared_error(t, p)).mean()print "Linear Model - Mean Squared Error : %2.4f" % msemae = true_vs_predicted.map(lambda (t, p): abs_error(t, p)).mean()print "Linear Model - Mean Absolute Error: %2.4f" % maermsle = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean())print "Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle#以下部分得到决策树的三个指标mse_dt = true_vs_predicted_dt.map(lambda (t, p) : squared_error(t, p)).mean()print "Decision Tree - Mean Squared Error : %2.4f" % mse_dtmae_dt = true_vs_predicted_dt.map(lambda (t, p) : abs_error(t, p)).mean()print "Decision Tree - Mean Absolute Error: %2.4f" % mae_dtrmsle_dt = np.sqrt(true_vs_predicted_dt.map(lambda (t, p): squared_log_error(t, p)).mean())print "Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle_dt#观察目标变量targets = records.map(lambda r : float(r[-1])).collect()from matplotlib import pyplot as pltplt.hist(targets, bins=40, color='green', normed=True)fig = plt.gcf()fig.set_size_inches(16,10)plt.show()#变幻目标变量——取对数log_targets = records.map(lambda r : np.log(float(r[-1]))).collect()plt.hist(log_targets, bins=40, color='green', normed=True)fig = plt.gcf()fig.set_size_inches(16,10)plt.show()#开方sqrt_targets = records.map(lambda r : np.sqrt(float(r[-1]))).collect()plt.hist(sqrt_targets, bins=40, color='green', normed=True)fig = plt.gcf()fig.set_size_inches(16,10)plt.show()data_log = data.map(lambda lp : LabeledPoint(np.log(lp.label), lp.features))model_log = LinearRegressionWithSGD.train(data_log, iterations=10, step=0.1)true_vs_predicted_log = data_log.map(lambda p : (np.exp(p.label), np.exp(model_log.predict(p.features))))mse_log = true_vs_predicted_log.map(lambda (t, p): squared_error(t, p)).mean()print "Linear Model - Mean Squared Error : %2.4f" % mse_logmae_log = true_vs_predicted_log.map(lambda (t, p): abs_error(t, p)).mean()print "Linear Model - Mean Absolute Error: %2.4f" % mae_logrmsle_log = np.sqrt(true_vs_predicted.map(lambda (t, p): squared_log_error(t, p)).mean())print "Linear Model - Root Mean Squared Log Error: %2.4f" % rmsle_logprint "未对原数据进行对数操作时:\n" + str(true_vs_predicted.take(3)) print "对原数据进行对数操作时:\n" + str(true_vs_predicted_log.take(3)) data_dt_log = data_dt.map(lambda lp : LabeledPoint(np.log(lp.label), lp.features))dt_model_log = DecisionTree.trainRegressor(data_dt_log, {})preds_log = dt_model_log.predict(data_dt_log.map(lambda p : p.features))actual_log = data_dt_log.map(lambda p: p.label)true_vs_predicted_dt_log = actual_log.zip(preds_log).map(lambda (t, p) : (np.exp(t), np.exp(p)))mse_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): squared_error(t, p)).mean()print "Decision Tree - Mean Squared Error : %2.4f" % mse_log_dtmae_log_dt = true_vs_predicted_dt_log.map(lambda (t, p): abs_error(t, p)).mean()print "Decision Tree - Mean Absolute Error: %2.4f" % mae_log_dtrmsle_log_dt = np.sqrt(true_vs_predicted_dt_log.map(lambda (t, p): squared_log_error(t, p)).mean())print "Decision Tree - Root Mean Squared Log Error: %2.4f" % rmsle_log_dtprint "未对原数据进行对数操作时:\n" + str(true_vs_predicted_dt.take(3)) print "对原数据进行对数操作时:\n" + str(true_vs_predicted_dt_log.take(3)) data_with_idx = data.zipWithIndex().map(lambda (k, v) : (v, k))test = data_with_idx.sample(False, 0.2, 42)train = data_with_idx.subtractByKey(test)train_data = train.map(lambda (idx, p) : p)test_data = test.map(lambda (idx, p) : p)train_size = train_data.count()test_size = test_data.count()print "训练集大小:%d" % train_sizeprint "测试集大小:%d" % test_sizeprint "总共大小:%d" % num_dataprint "训练集大小 + 测试集大小:%d" %  (train_size + test_size)data_with_idx_dt = data_dt.zipWithIndex().map(lambda (k, v) : (v, k))test_dt = data_with_idx_dt.sample(False, 0.2, 42)train_dt = data_with_idx_dt.subtractByKey(test_dt)train_data_dt = train_dt.map(lambda (idx, p) : p)test_data_dt = test_dt.map(lambda (idx, p) : p)train_size_dt = train_data_dt.count()test_size_dt = test_data_dt.count()print "训练集大小:%d" % train_size_dtprint "测试集大小:%d" % test_size_dtprint "总共大小:%d" % num_dataprint "训练集大小 + 测试集大小:%d" %  (train_size_dt + test_size_dt)def evaluate(train, test, iterations, step, regParam, regType, intercept):    model= LinearRegressionWithSGD.train(train, iterations, step, regParam=regParam, regType=regType, intercept=intercept)    tp = test.map(lambda p : (p.label, model.predict(p.features)))    rmsle = np.sqrt(tp.map(lambda (t, p) : squared_log_error(t, p)).mean())    return rmsle#迭代次数对性能的影响params = [1, 5, 10, 20, 50, 100]metrics = [evaluate(train_data, test_data, param, 0.01, 0.0, 'l2', False) for param in params]#注意这里是字母L的小写l不是1print paramsprint metricsplt.plot(params, metrics)fig = plt.gcf()plt.xlabel("iterations")plt.ylabel("RMSLE")plt.show()params = [0.01, 0.025, 0.05, 0.1, 1.0]metrics = [evaluate(train_data, test_data, 10,param, 0.0, 'l2', False) for param in params]#注意这里是字母L的小写l不是1#步长对性能的影响print paramsprint metricsplt.plot(params, metrics)fig = plt.gcf()plt.xlabel("step")plt.ylabel("RMSLE")plt.show()params = [0.1, 0.01, 0.1, 1.0, 5.0, 10.0, 20.0]metrics = [evaluate(train_data, test_data, 10, 0.1, param, 'l2', False) for param in params]#注意这里是字母L的小写l不是1#正则化参数对性能的影响print paramsprint metricsplt.plot(params, metrics)fig = plt.gcf()plt.xlabel("regParam")plt.xscale('log')plt.ylabel("RMSLE")plt.show()params = [0.1, 0.01, 0.1, 1, 10.0, 100.0, 1000.0]metrics = [evaluate(train_data, test_data, 10, 0.1, param, 'l1', False) for param in params]#注意这里是字母L的小写l不是1print paramsprint metricsplt.plot(params, metrics)fig = plt.gcf()plt.xlabel("regParam")plt.xscale('log')plt.ylabel("RMSLE")plt.title("when regType change to l1 from l2")plt.show()model_l1 = LinearRegressionWithSGD.train(train_data, 10, 0.1, regParam=1.0, regType='l1', intercept=False)model_l1_10 = LinearRegressionWithSGD.train(train_data, 10, 0.1, regParam=10.0, regType='l1', intercept=False)model_l1_100 = LinearRegressionWithSGD.train(train_data, 10, 0.1, regParam=100.0, regType='l1', intercept=False)print "L1(1.0)权重向量中0个书目:" + str(sum(model_l1.weights.array == 0))print "L1(10.0)权重向量中0个书目:" + str(sum(model_l1_10.weights.array == 0))print "L1(100.0)权重向量中0个书目:" + str(sum(model_l1_100.weights.array == 0))params = [False, True]metrics = [evaluate(train_data, test_data, 10, 0.1, 1.0, 'l2', param) for param in params]#注意这里是字母L的小写l不是1print paramsprint metricsplt.bar(params, metrics)fig = plt.gcf()plt.xlabel("intercept")plt.ylabel("RMSLE")plt.show()def evaluate_dt(train, test, maxDepth, maxBins):    model = DecisionTree.trainRegressor(train, {}, impurity='variance', maxDepth=maxDepth, maxBins=maxBins)   # print model     preds = model.predict(test.map(lambda p : p.features))    actual = test.map(lambda p : p.label)    tp = actual.zip(preds)    rmsle = np.sqrt(tp.map(lambda (t, p) : squared_log_error(t, p)).mean())    return rmsle    params = [1, 2, 3, 4, 5, 10, 20]metrics = [evaluate_dt(train_data_dt, test_data_dt, param, 32) for param in params]print paramsprint metricsplt.plot(params, metrics)fig = plt.gcf()plt.title("maxDepth's influence on DecisionTree")plt.xlabel("maxDepth")plt.ylabel("RMSLE")plt.show()params = [2, 4, 8, 16, 32, 64, 100]metrics = [evaluate_dt(train_data_dt, test_data_dt, 5, param) for param in params]print paramsprint metricsplt.plot(params, metrics)fig = plt.gcf()plt.title("maxBins's influence on DecisionTree")plt.xlabel("maxBins")plt.ylabel("RMSLE")plt.show()

你可能感兴趣的文章
学习正则表达式
查看>>
linux高级IO
查看>>
angualarjsdemo
查看>>
【C#】解析C#中JSON.NET的使用
查看>>
PyQt中从RAM新建QIcon对象 / Create a QIcon from binary data
查看>>
HTML5拖放API
查看>>
JS中原型链的理解
查看>>
oracle服务器和客户端字符集的查看和修改
查看>>
看完此文再不懂区块链算我输,用Python从零开始创建区块链
查看>>
C/S框架-WebService架构用户凭证(令牌)解决方案
查看>>
UVA 11149.Power of Matrix-矩阵快速幂倍增
查看>>
ajax post 请求415\ 400 错误
查看>>
使用 CSS 用户选择控制选择
查看>>
PHP程序性能优化的50种方法
查看>>
css3 动画的播放、暂停和重新开始
查看>>
IOS 上传ipa文件失败
查看>>
eclipse Android 开发基础 Activity 窗体 界面
查看>>
怎样玩转千万级别的数据
查看>>
input输入框修改后自动跳到最后一个字符
查看>>
Windows与Linux之间海量文件的传输与Linux下大小写敏感问题
查看>>