Fork me on GitHub

读书笔记-《坚持,一种可以养成的习惯》

一.为什么你不能坚持

1.如刷牙般轻松,这就是习惯

所谓习惯就是“不依赖意志或毅力,把自己想要持续的事情引导到如每天刷牙般轻松的状态”

2.大脑自然的习惯

(1)人的意识分为表意识和无意识

(2)习惯就是把重复的行动化为无意识的行动(自动化)。对于重复的行动,我们不使用表意识,而是将其化为无意识的自动状态,这就是所谓的习惯

3.产生“3分钟热度”的“习惯引力”究竟是什么?

(1)为什么我们无法把自己想要持续的事情转化为习惯呢?

  • 因为人类具有“对抗新变化,维持现状的倾向”的特点

(2)习惯引力具有两种功能:

  • 一是抵抗新变化
  • 二是维持现状

4.持续多久之后能够“习惯化”

(1)习惯分为三种
  • 一是 行为习惯
  • 二是 身体习惯
  • 三是 思考习惯
(2)行为习惯
  • 行为习惯即每天规律的行为,
  • 比如读书,记日记,整理,节约,记录家庭收支等
  • 培养行为习惯的时间大约需要一个月
(3)身体习惯
  • 身体习惯是与身体节奏相关的习惯,
  • 例如,减肥,运动,早起,戒烟,肌力训练等
  • 培养身体习惯的时间大约需要三个月
(4)思考习惯
  • 思考习惯是与思考能力相关的习惯,
  • 例如,逻辑性思考能力,创意能力,正面思考等习惯
  • 培养思考习惯的时间大约六个月

5.培养行为习惯的三个阶段

(1)阶段一,反抗期(1~7天):马上就想放弃,42%的人失败
(2)阶段二,不稳定期(8~21天):被预定事项或他人影响:40%的人失败
(3)阶段三,倦怠期(22~30天):逐渐感到厌烦,18%的人失败

6.以长期的眼光培养习惯

(1)按时间分习惯:
  • 短期习惯
  • 中期习惯
  • 长期习惯
(2)短期习惯
  • 培养短期习惯就是培养“马上就可以看到结果的习惯”
  • 例如整理,存钱,不看电视,减少开电子邮件的次数等
  • 必须培养这样让行动变得高效的习惯
(3)中期习惯
  • 中期习惯包括时间管理,写日记,考职业证等
(4)长期习惯
  • 长期习惯包括阅读,拓展人脉,健康管理等

7.七十项习惯清单建立“年度计划”

(1)自我投资

阅读/影音学习/写日记/认识新朋友/学习专业知识/考资格证/参加研讨会、读书会/把在路上的时间转变为学习时间/利用博客、电子报发布信息/订阅刊物/重新检视人生计划/把一年的目标写在纸上

(2)金钱

存钱/节约/投资/填写家庭收支簿/不赌博/请他人吃饭/捐款

(3)心灵成长(压力、动力)

每天要说积极向上的话/冥想/每天写一件感恩的事/早上泡澡/每天都有一件期待的事/每周做一件有趣的事/整理/一天做三次深呼吸/听喜欢的音乐/问有建设性的问题/一天少做一件事(工作清单或备忘录)

(4)运用时间

不看电视/拟定第二天的计划/限定看电子邮件的次数/拒绝聚餐的邀约/杂事统一处理/先处理最重要的三件事/列工作清单/严守下班时间/提早进公司/一次只集中在一件事上/不断改善对时间的运用

(5)人际关系

经常称呼对方的名字/每天都要称赞他人/一天有 40% 的时间保持笑容/大声地与人打招呼/成为倾听的人/原谅他人/写交换日记/每天与重要的人交谈 10 分钟以上/不说抱怨、不满的话/先说结论/以双赢的目标思考

(6)健康、美

吃健康食品/把白米换成糙米/每天刷 3 次牙/吃天然食物/每天睡满 7 个小时/一天喝 2 升水/每天晒太阳 30 分钟/不喝酒/讲究穿着/均衡摄取营养★/饮食以蔬果为主★/戒烟★/肌力训练★/做有氧运动★/限制热量摄取★/按摩/做伸展运动

列表中出现★者,为需要三个月时间培养的身体习惯。

1
2
3
4
5
6
7
8
9
10
11
12
13
填写“习惯的年度计划”
第一个月_____
第二个月_____
第三个月_____
第四个月_____
第五个月_____
第六个月_____
第七个月_____
第八个月_____
第九个月_____
第十个月_____
第十一个月_____
第十二个月_____

8.“培养习惯之旅”的注意事项与指引

(1)前提一 每天坚持行动
(2)前提二 一定要坚持到底

二.顺利培养习惯的三个阶段

1.反抗期:在暴风雨中前行

(1)预防失败的“习惯培养三原则”
  • 原则一 锁定一项习惯(不要同时培养多项习惯)
  • 原则二 坚持有效的行动(行动规则越简单越好),复杂的事情容易失败,简单的事物容易坚持
  • 原则三 不要太在意结果
(2)克服反抗期的具体对策之一:从小地方开始
(a)不论事情有多么复杂,把焦点放在自己能够做的事情上
(b)会失败的人从一开始就很容易把行动的难度定的太高,于是随着热情的下降,行动就会变得麻烦而无趣
(c)效果:从小地方开始会得到两种效果
  • 第一种是行动压力较小,
  • 第二种是引发行动的动力
(d)方法:从小地方开始,有两种方法可行
  • 方法一,细分时间,比如只用5分钟整理,只花15分钟阅读,只花3分钟写日记,只跑15分钟,总的来说,就是在坚持习惯的时候,降低坚持的时间,以降低坚持习惯的难度
  • 方法二,细分步骤,比如只整理一个房间,只读一页书,写一行日记,走路而不是跑步,同理,降低坚持习惯的难度
(e)重点
  • 重点一,设定容易执行的步骤
  • 重点二,抛开不足感
  • 一定要每天执行
(3)克服反抗期的具体对策之二:简单记录
(a)效果:
  • 能够客观的分析并了解问题
  • 减少行动的不确定性
  • 提高动力
(b)方法:
  • 方法一,思考记录内容(要记录哪些项目),比如只记录完成目标o,和未完成目标x,处理o和x之外,记录行动的内容,树枝
  • 思考记录的媒介(要记录到哪里?),比如纸张,电子媒介
(c)重点:
  • 不要过于繁琐
  • 一定要每天记录

2.不稳定期:要建立“持续行动的机制”

(1)在不稳定期会出现以下症状:
  • 在已安排好的时间内插入其他事情而荒废计划
  • 因为加班或个人私事导致计划中断
  • 因为天气或突发事件导致多日无法持续行动”
(2)若想度过不稳定期,需要建立能够持续的机制
(3)提高行动的难度,进入不稳定期后要把难度提高到自己本来要求的高度
(4)不稳定期的三大对策
  • 对策一 行为模式化
  • 对策二 设定例外规则
  • 对策三 设定持续开关
(5)对策一:行为模式化
(a)就是指把自己想要培养的习惯化为固定的模式(固定的时间,固定的地点,固定的做法),并认真执行
(b)效果:
  • 效果一,培养节奏感
  • 效果二,不容易忘记
(c)方法
  • 方法一:时间:决定星期几,几点开始
  • 方法二:内容:决定数量与方法
  • 方法三:地点:决定地点,例如在上班途中,办公室,家中等等
(d)重点
  • 重点一:尽量找不被侵犯的地方
  • 重点二:考虑一举两得的做法
  • 重点三:每天持续行动
(6)对策二:设定例外规则
(a)“例外规则”是对不规律发生的事件预先制定应对规则的弹性应对机制
(b)设定例外规则并不是未来宠溺自己,而是未来让计划保持弹性
(c)效果
  • 效果一,有弹性的执行计划
  • 效果二,减少压力
(d)方法
  • 方法一,考虑例外状况,A:身体状况(疲倦,提不起劲,感觉不舒服,感冒等),B:气候(太热,太冷,下雨,下雪等),C:预定事项(突然加班,聚餐等)
  • 方法二,考虑应对方法,A:从小地方入手(减少习惯难度),B:替换(第二天加班完成目标,或另找一天),C:设定特别的日子
(e)重点:
  • 重点一:假设可能发生的例外状况
  • 重点二:一边尝试,一边变更“例外规则”
(7)对策三:设定持续开关
(a)“持续开关”是善于培养计划的人为了能够持续行动所设计的一些巧妙的方法
(b)积极行动的动力来源分为“产生快感”和“回避痛苦”两种
(c)12种持续开关分为“糖果型开关”和“处罚型开关”两大类
(d)效果:
  • 效果一,能够获得动力
  • 效果二,建立能够持续的机制
(e)糖果型开关:利用快感,推动自己的行动
编号 类型 定义
1 奖励 通过奖励突破眼前的困难
2 被称赞 塑造被称赞的气氛以提升干劲
3 游戏 以游戏开启行动,提升自己的热情
4 理想模式 设定理想目标,让自己进步
5 仪式 举行小小的仪式,驱除怠惰的心情
6 去除障碍 去除阻碍行动的因素,减轻压力
(f)处罚型开关,利用惩罚,推动自己的行动
编号 类型 定义
1 损益计算 投资时塑造失败就会亏损的环境
2 结交朋友 结交培养相同习惯的朋友,不允许自己安逸
3 对大众宣布 对大众发表宣言,塑造后无退路的状态
4 处罚游戏 利用处罚游戏击退借口
5 设定目标 设定目标,引发达成目标的欲望
6 强制力 通过与他人约定,塑造严苛的环境,时间限制等,逼迫自己进入不得不做的状况
(g)重点:
  • 重点一 了解自己擅长的事
  • 重点二 不同习惯有不同的“开关”,比如戒烟的开关事“对大众宣布”,慢跑的开关是“游戏”

3.倦怠期:“习惯引力”最后的反抗

(1)倦怠期会出现以下几种症状:
  • 感觉厌烦提不起劲
  • 感受不到培养习惯的意义
  • 因一成不变而产生空虚感
(2)倦怠期的对策有两个:
  • 对策一:添加变化
  • 对策二:计划培养下一个习惯
(3)对策一:添加变化
(a)效果:
  • 效果一,以崭新的心情重新出发
  • 效果二,产生动力
(b)方法:
  • 方法一,改变内容,环境,例如改变学习教材,改变学习环境等
  • 方法二,使用“持续开关”
(c)重点:
  • 重点一,以“一举两得”的角度思考
  • 重点二,准备多种选择
  • 重点三,不要轻易改变模式或规则
(4)对策二:计划下一项习惯
(a)在倦怠期最后阶段计划下一项习惯:
  • 就是为了建立习惯的连贯
(b)效果
  • 效果一,可以看到现在培养习惯的经过
  • 效果二,提高培养习惯的能力
(c)方法
  • 方法一,把目标倒过来计算,针对优先度高的习惯,对习惯清单排序
  • 方法二,拟定计划
(d)重点:
  • 重点一:排列优先级
  • 重点二:一定是在一个习惯倦怠后期才开始另一个习惯

三.十二个“持续开关”,让你远离失败

1.开关类型

(1)糖果型开关:利用快感,推动自己的行动
编号 类型 定义
1 奖励 通过奖励突破眼前的困难
2 被称赞 塑造被称赞的气氛以提升干劲
3 游戏 以游戏开启行动,提升自己的热情
4 理想模式 设定理想目标,让自己进步
5 仪式 举行小小的仪式,驱除怠惰的心情
6 去除障碍 去除阻碍行动的因素,减轻压力
(2)处罚型开关,利用惩罚,推动自己的行动
编号 类型 定义
1 损益计算 投资时塑造失败就会亏损的环境
2 结交朋友 结交培养相同习惯的朋友,不允许自己安逸
3 对大众宣布 对大众发表宣言,塑造后无退路的状态
4 处罚游戏 利用处罚游戏击退借口
5 设定目标 设定目标,引发达成目标的欲望
6 强制力 通过与他人约定,塑造严苛的环境,时间限制等,逼迫自己进入不得不做的状况

2.糖果型开关一:奖励

(1)重点:
  • 重点一,思考奖励与习惯的相关型,相关性越强,促进行动的力量越大
  • 重点二,设定喜欢的事物,设定培养某个习惯后,就可以得到喜欢的事物

3.糖果型开关二:被称赞

(1)重点:
  • 重点一,选择称赞自己的对象,对不同的习惯,选择不同的适合的对象
  • 重点二,告知对方要求,请对方定期称赞自己,告诉对方这样会给自己带来干劲;自己主动要求对方反馈(“我已经瘦了3斤了,你觉得我又没有改变”)

4.糖果型开关三:游戏

(1)重点:

  • 重点一,想一些有趣的创意
  • 尝试后留下有用的创意

5.糖果型开关四:理想模式

(1)重点:

  • 重点一,找到理想的人物,东西
  • 重点二,视觉化,贴他们的海报,壁纸等,让自己看得见

6.糖果型开关五:仪式

(1)重点:

  • 重点一,设定小小的仪式,把仪式视为一个小行动,先踏出这一步
  • 重点二,思考仪式与习惯的关联性,举行仪式的目的在于调整心态及热身,选择与习惯相关的行动最后
  • 重点三,设定仪式使其成为习惯

7.糖果型开关:去除障碍

(1)重点:

  • 重点一,列出障碍,妨行动的因素(人,活动等)
  • 重点二,思考解决对策
  • 重点三,提出自己的主张,如果其他人是阻碍因素的话,要清楚拒绝或向对方提出要求,让对方了解情况,请对方尊重自己的选择

8.处罚型开关:损益计算

(1)重点:

  • 重点一,下决心投资,至少我们会因为想要回收资本而坚持下去
  • 谨慎选择投资对象
  • 分期付款也有效

9.处罚型开关:结交朋友

(1)重点:

  • 重点一,谨慎选择伙伴,不要选择“马上会放弃”,“不遵守约定”的伙伴
  • 重点二,寻找支持者,选择身边因自己的习惯而受益的人,并请求得到对方的支持,这样会破除自己贪图安逸的心态
  • 重点三,通过教室或社团建立朋友群,找到志同道合的朋友

10.处罚型开关:对大众宣布

(1)重点

  • 重点一,对大众或特定的人“发表宣言”
  • 重点二,意识到“监视点”
  • 重点三,与其他的“开关”并用

11.处罚型开关:处罚游戏

(1)重点:

  • 重点一,下决心处罚自己,一定要重重的的处罚
  • 重点二,与自己约定或与其他人约定
  • 重点三,留下契约书

12.处罚型开关:设定目标

(1)重点:

  • 重点一,设定大目标,长期目标,以5年,10年为期限
  • 重点二,设定小目标,以半年,三个月,一个月,十天为期限,设定小目标
  • 与其他“开关”并用

13.处罚型开关:强制力

(1)重点:

  • 重点一,雇佣专家
  • 塑造有限的环境

四.习惯清单模版

1.五分钟整理

A 先生的习惯清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
内容与目标
习惯内容:每天早上从七点开始,花 15 分钟整理。
目标:三个月后,在整齐清洁的家与办公室中过舒适的生活。
三年后:提高工作效率,提升工作成果,产生自信。
反抗期的对策
①以婴儿学步开始:
每天花 5 分钟整理。
*虽然没有确定固定时段,不过选择早上整理比较好。
②简单记录
在 30 天的确认清单上填“○”。
*将确认清单贴在冰箱上。
不稳定期的对策
①模式化
在早上七点到七点十五分之间执行。
②设定例外规则
疲倦或加班时:早上只花 5 分钟时间整理,或晚上花 15 分钟整理。
③设定持续开关
用计时器、换上专门的服装(仪式)。
打开电视之前整理(去除障碍)。
结束后喝一杯咖啡(奖励)。
播放喜欢听的音乐(游戏)。
倦怠期的对策
①添加变化
一周挑一天作为整理日。
邀请朋友来家里玩(强制力)。
与女朋友一边比赛一边整理(朋友加游戏)。
②计划下一项习惯
睡前冥想 15 分钟(消除压力、追寻自我)。
2.学英语–利用“例外规则”减少行动的变动性

B 先生的习惯清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
内容与目标
内容:每天晚上十一点开始,花 30 分钟进行英文阅读练习。
目标:三个月后,自己可以读英文书。
三年后:取得美国 CPA 资格,成功升职为组长。
反抗期的对策
①以“婴儿学步”开始
每天一定要翻开英文教科书,至少读一页内容。
*就算只读一分钟也好。
②简单记录
在教科书上贴一张 30 天的记录表。
不稳定期的对策
①行为模式化
回家后花 30 分钟学习。
*晚上十一点到十一点半最为理想。
②设定例外规则
若晚上十一点以后才回家:中午用餐时至少要学习 10 分钟(如果事先知
道)/最少要读一页的教科书(如果事先不知道)。
③设定持续开关
不开电视(去除障碍)。
在教材上投资十万日元(损益计算)。
对主管提出宣言“一年后 TOEIC 要考八百分“(对大众宣布)。
倦怠期的对策
①添加变化
每周两天阅读莎士比亚原文(奖励)。
安排英文老师的私人指导课程(强制力)。
②计划下一项习惯
培养听英文的习惯(每天 30 分钟)。
3.节约–“习惯化原则”引领你走向成功

C 小姐的习惯清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
内容与目标
内容:以一天 2000 日元的花费过日子。
目标:三个月后,能够存下 20% 的薪水。
三年后:在夏威夷举办婚礼。
反抗期的对策
①以“婴儿学步”开始
一周的零食花销控制在 500 日元以下。
②简单记录
使用记事本记录花销。
*记录早上出门前钱包里的钱减去回家时钱包里的余额。
不稳定期的对策
①模式化
餐费 1300 日元、零食 300 日元、饮料 400 日元为标准。
②设定例外规则
聚餐:列为其他预算项目。
花费超支:在一星期的预算范围中调整。
③设定持续开关
钱包里只放 2000 日元(强制力)。
放纵日:一周吃一顿 2000 日元的午餐(奖励)。
计算一年后、三个月后、一个月后的节约效果(设定目标)。
倦怠期的对策
①添加变化
成功做到的那天存 500 日元在存钱罐里(游戏)。
如果当周花费超出预算,第二周就不能吃零食(处罚游戏)。
②计划下一项习惯
挑战减肥的习惯。
4.减肥–三个月后苗条不反弹

D 先生的习惯清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
内容与目标
习惯内容:培养一天摄取 2000 卡路里的饮食习惯。
目标:六个月后,体重减轻 10 公斤,健康检查的所有异常数值都回归正常。
三年后:保持 62 公斤的理想体重,通过运动达到身体脂肪含量为 16% 的
健康状态。
反抗期的对策
①以“婴儿学步”开始
每天晚餐的热量控制在 500 卡路里以下。
*早餐、中餐不限制。
②简单记录
记录早餐、午餐、晚餐摄取的热量。
*购买食物热量表,记录大概的目标。
每天记录体重。
*体重计前方贴上记录用的纸张。
不稳定期的对策
①逐渐提高门槛
第 4、第 5 周:2600 卡路里热量。
第 6、第 7 周:每天摄取 2300 卡路里热量。
②模式化
早上七点、中午十二点、晚上七点用餐。
③设定例外规则
聚餐:解禁。
假日:可多摄取 200 卡路里。
④设定持续开关
6 个月体重减少 10 公斤(设定目标)。
贴上具有高人气的艺人相片(理想目标)。
没达到目标那天就要跑步 30 分钟(处罚游戏)。
聚餐只喝乌龙茶(去除障碍)。
稳定期的对策
①遵守自己设定的门槛
“继续维持每天摄取 2000 卡路里热量。
②享受小成长
把体重的变化做成图表。
问旁人自己外表的变化(被称赞)。
倦怠期的对策
①添加变化
制作食谱并自己下厨做饭(游戏)。
购买减肥后才穿得下的西装(损益计算)。
②计划下一项习惯
培养跑步的习惯。
5.早起–把成长“视觉化”

E 先生的习惯清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
内容与目标
习惯内容:每天早上七点半上班。
目标:六个月后,工作效率提高 30%,每天晚上七点下班,在家吃晚饭,
让家庭气氛和谐美好。
三年后:早上五点起床、跑步,然后与太太、小孩悠闲地享用早餐。
反抗期的对策
①以“婴儿学步”开始
每天七点起床。
*晚上 12 点睡觉。
②简单记录
每天在家里的日历上填上“○”和“X”。
填写上班时间和下班时间。
不稳定期的对策
①逐渐提高门槛
第 4、第 5 周:八点上班(六点半起床)。
第 6、第 7 周:七点半上班(六点起床)。
②模式化
早上六点起床、晚上十一点半睡觉(晚上八点前下班)。
③设定例外规则
深夜回家、聚餐时:跟往常一样九点上班。
④设定持续开关
对部下宣布上班时间(对大众宣布)。
每周进行一次早晨会议(强制力)。
请太太配合睡觉、起床的时间(结交朋友)。
如果没做到就付给太太 1000 日元(处罚游戏)。
稳定期的对策
①遵守自己设定的门槛
每天早上一定要七点半上班(六点起床)。
②享受小成长
确实感受到工作效率的提升。
把工作时间的变化视觉化。
倦怠期的对策
①添加变化
让部下也在七点半上班(习惯的朋友)。
每周与妻子吃一次自助早餐(奖励)。
②计划下一项习惯
培养倾听的习惯。
6.戒烟–利用“去除障碍”的方式赶走香烟的诱惑

F 先生的习惯清单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
内容与目标
习惯内容:完全戒烟。
目标:六个月后:身体变得轻盈,健康检查的结果没有让人担忧的指标。
省去抽烟的时间,工作效率提高,减少加班。
三年后:戒烟省下 88 万日元,一家三口一起去夏威夷旅行。
反抗期的对策
①以“婴儿学步”开始
从一天抽 40 根烟减少为一天抽 30 根烟。
②简单记录
每天在记事本上记录早、晚抽烟的数量。
不稳定期的对策
①逐渐提高门槛
第 4、第 5 周:从每天抽 30 根烟减少为每天抽 15 根烟。
第 6、第 7 周:从每天抽 15 根烟减少为每天最多抽 5 根烟。
②模式化
抽烟时间:上午九点到下午六点,不在家里抽烟。
③设定例外规则
如果抽多了的话:第二天的烟量减少。
④设定持续开关
可以吃自己爱吃的东西(奖励)。
对同事/家人发表戒烟宣言(对大众宣布)。
请太太称赞自己(被称赞)。
尽量避免聚会、张贴自己肺部的相片(去除障碍)。
稳定期的对策
①遵守自己制定的门槛
完全断绝香烟。
毫无例外,一根也不行。
②享受小成长
感受身体发生的变化。
倦怠期的对策
①添加变化
用省下来的香烟钱买喜欢的鱼竿(奖励)。
把每天省下的 800 日元存入储蓄罐(游戏)。
找同事一起戒烟(结交朋友)。
②计划下一项习惯
减肥。

读书笔记-《关键20小时 快速学会任何技能》

一.作者自序

1.要“学会”而非“学精”

2.要“质量”而非“数量”

  • 充分练习是快速学习技能的关键

3.什么是“快速技能习得”

快速技能习得分为4个步骤

1
2
3
4
5
6
7
(1)分解步骤:把技能最最大程度的细分,分成若干小步骤

(2)充分学习:对每个小步骤进行充分的学习,以便进行灵活的练习,并在练习中自我修正

(3)克服困难:克服在练习过程中出现的生理,心理或情绪上的障碍

(4)集中练习:至少用20小时集中学习最重要的小步骤

4.技能习得与技能学习

5.技能学习的真正意义

  • 技能学习很重要,关于技能的理论知识是极其重要的,只有学好了理论,才能在实践中自我修改和自我完善

6.技能习得与技能训练

  • 技能习得和技能训练也有很大不同,技能训练是在技能习得之后,重复练习以达到提高技能的目的
  • 真正的技能习得需要达到一定的训练量,否则就没有效果

7.技能习得和教育,资格认证

  • 技能习得需要长时期持续,专注的练习,同时也需要自己创造性的,灵活的,自由的梳理属于自己的目标
  • “创造,灵活,自由”是快速习得技能的关键因素

8.神经生理学:大脑的可塑性和肌肉的记忆

(1)僵固式思维模式:人物天赋就是能力本身
(2)成长式思维模式:真正哺育天赋的是反复的练习和长期的坚持
  • 只要肯练习,人的大脑是具备提高技能的功能的。

9.技能习得分为三大步骤

1
2
3
4
5
(1)早起认知。了解自己即将学习的技能是什么,探索研究,想想整个过程,把技能细分为几个可控的小步骤

(2)中期联想:训练,注意环境反馈,根据反馈调整方法

(3)后期自主训练

二.快速习得技能的10个方法

1.选择方向

  • 优先选择自己感兴趣的方向,项目

2.集中精力

  • 把精力放在一门技能的学习上

3.制定目标

  • 制定学习目标让我们有机会去想象我们学成以后会是什么样子
  • 一旦未来努力的方向确定下来,就更容易达到目标
  • 把目标定的越轻松,掌握相关技能的速度就越快
  • 降低标准,快速习得技能的目的并不是要尽善尽美,成为世界大师,而是要在坚固能力和效率的同时,快速提高技能

4.分解技能

  • 将技能分解成为一个个更小的,更确定的步骤
  • 学会将步骤进行筛选,把关键的步骤先找出来,再集中时间和精力去学习

5.获取工具

  • 设法弄清楚,学习这门技能需要什么样的条件,需要使用什么样的学习用具
  • 只有获得了必备的工具,我们才可能最大限度的利用时间充分学习

6.扫除障碍

(1)有些因素会造成障碍:
  • 训练前的准备工作:训练前找不到工具,或者没有合适的工具
  • 使用临时训练工具
  • 环境干扰:例如开着的电视,突然响起的电话,刚刚收到的电子邮件
  • 情绪障碍:例如害怕,怀疑,害羞
(2)靠意志力去克服这些障碍不见的是明智的做法,最好是事先清除这些障碍

7.腾出时间

  • 筛选出低效的时间,并将它淘汰掉
  • 一旦训练开始,就别停下来

8.及时反馈

  • 及时反馈是指尽可能快的了解你在技能习得中的表现
  • 快速反馈有助于快速习得技能

9.计时训练

  • 分段计时训练,设置好时间,比如20分钟,然后准备开始练习
  • 制定规则,一旦计时开始,中途就不要停下来
  • 持续训练的时间越长,技能习得就越快
  • 每天腾出3~5次这样的分段计时训练,短时间内就会看到明显的进步

10.数量与速度

  • 刚开始习得技能时,不要寻求尽善尽美,因为那样会让人产生挫败感
  • 应该保证训练量和训练速度,这才是重要的
  • 刚开始学习时,不要盲目追求质量,相反,必要的训练量和训练速度才是制胜法宝,练得多,练的快,才能学的快
  • 训练时要报纸良好的状态,只要做的至少在80%~90%的训练时间内持续保持良好的训练状态,就能加快技能习得

11.沉浸法

沉浸法是指通过完全改变周围学习环境的方式,达到专注持久学习的目的,例如想学法语,那就搬到法国住上几个月

三.有效学习的10个方法

1.收集信息

  • 实践前,查阅一下和这门技能相关的信息是十分必要的

2.克服困难

  • 在习得技能过程中会遇到对有的概念,方法理解不了的地方,产生困惑,此时克服困难继续学习,之后会把前面的困惑一一解开的

3.关联类比

  • 在学习的过程中,会出现理论和方法,将它们成为“心智模式”
  • 有时在学习过程中会遇到新的事物好像是自己熟悉的,此时将此事物成为“心智勾连”
  • 自己找到的心智模式和心智勾连越多,就越容易在技能训练中获益

4.逆向思维

  • 学习新技能之前,不幻想自己学的多完美,多设想最坏的结局,这种思维方式称为“违反直觉的思维方式”,它有助于我们深入了解技能

5.咨询交流

  • 在学习技能之前,有必要和内行聊一聊,这样我们可以提前预知技能训练中的每个阶段会遇到的问题,消除对技能学习的疑虑和误解

6.排除干扰

  • 我们应该尽量排除电子干扰和生物干扰
  • 电视,手机,网络属于电子干扰因素
  • 家人,同事,宠物属于生物干扰因素
  • 干扰因素越少,技能习得就越有效

7.间隔重复

  • 间隔重复有助于大脑将所学知识进行巩固并长时保存
  • 对于记忆有难度的信息,更要经常复习

8.创建定式

  • 大多数技能学习都有一套固定的模式:确定项目,着手准备,坚持学习等
  • 建立一套简单的定式可以让自己比较轻松的了解其中关键的环节
  • 罗列清单方便我们记忆学习要点,使技能训练的流程更加系统化

9.预期测试

(1)预期测试是指依靠已知经验,再尝试实践前假设接下来会发生什么变化或者产生的结果
(2)预期测试结果随着四个因素的变化而变化
1
2
3
4
(1)观察:你最近在关注什么
(2)经历:你对这个领域了解多少
(3)假设:怎样做才会更进一步
(4)测试:下一步你有什么新尝试

10.尊重生理

  • 足够的睡眠等等,尊重生理

SparkStreaming实战:处理来自flume pull方式发来的数据

1.需求:

处理来自flume pull方式发来的数据

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>

</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.1.0</version>
</dependency>

</dependencies>
(2)option2

option2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#定义agent名,source,channel,sink的名称
a1.sources=r1
a1.channels =c1
a1.sinks=k1

#具体定义source
a1.sources.r1.type= spooldir
a1.sources.r1.spoolDir= /opt/TestFolder/logs
a1.sources.r1.fileSuffix = .COMPLETED

#具体定义channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity = 100

#具体定义sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channels =c1
a1.sinks.k1.hostname=192.168.31.132
a1.sinks.k1.port=1234

#组装source, channel,sink

a1.sources.r1.channels = c1
a1.sinks.k1.channel =c1
(3)FlumeLogPul.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevel

object FlumeLogPull {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("FlumeLogPull").setMaster("local[2]")
val ssc = new StreamingContext(conf,Seconds(1))

val flumeEvent = FlumeUtils.createPollingStream(ssc, "192.168.31.211", 1234,StorageLevel.MEMORY_ONLY_SER)

val lineDStream = flumeEvent.map( e => {
new String(e.event.getBody.array)
})

lineDStream.print()

ssc.start()
ssc.awaitTermination()
}
}

3.将spark-streaming-flume-sink_2.11-2.1.0.jar拷贝到flume的jar目录下

4.运行:

(1)运行SparkStreaming程序:
(2)开启flume
1
bin/flume-ng agent -n a1 -c conf -f myconf/option2 -Dflume.root.logger=INFO,console

5.结果:

SparkStreaming实战:处理来自flume push方式发来的数据

1.需求:

SparkStreaming处理来自flume push方式发来的数据,即flume将数据推给spark Streaming

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>

</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.1.0</version>
</dependency>

</dependencies>
(2)flume文件option

option

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#bin/flume-ng agent -n a1 -f myconf/option -c conf -Dflume.root.logger=INFO,console
#定义agent名,source,channel,sink的名称
a1.sources=r1
a1.channels =c1
a1.sinks=k1

#具体定义source
a1.sources.r1.type= spooldir
a1.sources.r1.spoolDir= /opt/TestFolder/logs

#具体定义channel1
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity = 100

#具体定义sink
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channels =c1
a1.sinks.k1.hostname=192.168.31.211
a1.sinks.k1.port=1236

#组装source, channel,sink

a1.sources.r1.channels = c1
a1.sinks.k1.channel =c1
(3) MyFlumeStream.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils

object MyFlumeStream {

def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("MyFlumeStream").setMaster("local[2]")

val ssc = new StreamingContext(conf,Seconds(3))

//创建 flume event 从 flume中接收push来的数据 ---> 也是DStream
//flume将数据push到了 ip 和 端口中
val flumeEventDstream = FlumeUtils.createStream(ssc, "192.168.1.121", 1236)

val lineDStream = flumeEventDstream.map( e => {
new String(e.event.getBody.array)
})

lineDStream.print()

ssc.start()
ssc.awaitTermination()
}
}

3.运行:

(1)运行SparkStreaming程序:
(2)开启flume
1
bin/flume-ng agent -n a1 -c conf -f jobconf/option -Dflume.root.logger=INFO,console

4.向/opt/TestFolder/logs中添加数据,查看结果:

1
cp a.txt /opt/TestFolder/logs

添加数据

5.查看结果

结果

SparkStreaming实战:处理RDD队列流

1.需求:

利用SparkStreaming处理RDD队列流

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
(2)RDDQueueStream.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import scala.collection.mutable.Queue
import org.apache.spark.rdd.RDD

object RDDQueueStream {

def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("RDDQueueStream").setMaster("local[2]")

val ssc = new StreamingContext(conf,Seconds(1))

//需要一个RDD队列
val rddQueue = new Queue[RDD[Int]]()


for( i <- 1 to 3){
rddQueue += ssc.sparkContext.makeRDD(1 to 10)

Thread.sleep(5000)
}

//从队列中接收数据 创建DStream
val inputDStream = ssc.queueStream(rddQueue)

val result = inputDStream.map(x=>(x,x*2))

result.print()

ssc.start()
ssc.awaitTermination()

}
}

3.运行:

4.结果:

SparkStreaming实战:处理文件流

1.需求:

利用SparkStreaming处理文件流:

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
(2)FileStreaming.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package day1211

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

object FileStreaming {
def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//local[2]代表开启两个线程
val conf = new SparkConf().setAppName("MyNetwordWordCount").setMaster("local[2]")

//接收两个参数,第一个conf,第二个是采样时间间隔
val ssc = new StreamingContext(conf, Seconds(3))

//监控目录 如果文件系统发生变化 就读取进来
val lines = ssc.textFileStream("/Users/macbook/TestInfo/test_file_stream")

lines.print()

ssc.start()
ssc.awaitTermination()
}
}

3.运行

4.结果:

image.png

SparkStearming实战:集成Spark SQL,使用SQL语句进行WordCount

1.需求:

集成Spark SQL,使用SQL语句进行WordCount

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
(2)MyNetwordWordCountWithSQL.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.SparkSession

object MyNetwordWordCountWithSQL {

def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setMaster("local[2]").setAppName("MyNetwordWordCountWithSQL")

val ssc = new StreamingContext(conf,Seconds(5))

val lines = ssc.socketTextStream("192.168.1.121",1235,StorageLevel.MEMORY_ONLY)

val words = lines.flatMap(_.split(" "))

//集成Spark SQL 使用SQL语句进行WordCount
words.foreachRDD( rdd => {

val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()

import spark.implicits._
val df1 = rdd.toDF("word")

df1.createOrReplaceTempView("words")

spark.sql("select word , count(1) from words group by word").show()
})

ssc.start()
ssc.awaitTermination()
}

}

3.运行:

4.结果:

Spark Streaming实战:设置检查点,写一个wordcount程序并计数,计算端口号1235中的信息

1.需求:

用spark Streaming写一个wordcount程序,计算发往端口号1235中的信息(单词计数)

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>

</dependency>
</dependencies>
(2)MyTotalNetworkWordCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package day1211

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

object MyTotalNetworkWordCount {

def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setMaster("local[2]").setAppName("MyTotalNetworkWordCount")

val ssc = new StreamingContext(conf,Seconds(3))

//设置检查点目录,保存之前的状态信息
ssc.checkpoint("hdfs://192.168.1.121:9000/TestFile/chkp0826")

val lines = ssc.socketTextStream("192.168.1.121", 1235, StorageLevel.MEMORY_ONLY)

val words = lines.flatMap(_.split(" "))

val wordPair = words.map((_,1))

/**
* 两个参数:
* 第一个参数:当前的值是多少
* 第二个参数:之前的结果是多少
*/
val addFunc = (curreValues:Seq[Int],previousValues:Option[Int]) => {
//进行累加运算
// 1、把当前值的序列进行累加
val currentTotal = curreValues.sum

//2、在之前的值上再累加

Some( currentTotal + previousValues.getOrElse(0) )

}

//进行累加运算
val total = wordPair.updateStateByKey(addFunc)

total.print()

ssc.start()
ssc.awaitTermination()

}
}

3.运行程序,并往端口号1235发送信息:

4.结果:

Spark Streaming实战:窗口操作,每10秒,把过去30秒的数据取出来(读取端口号1235中的数据)

1.需求:

窗口操作,每10秒,把过去30秒的数据取出来

  • 窗口长度:30秒
  • 滑动距离:10秒

2.代码:

(1)pom.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
(2)MyNetWorkWordCountByWindow.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package day1211

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

/**
* 需求:每10秒,把过去30秒的数据取出来
*
* 窗口长度:30秒
*
* 滑动距离:10秒
*
*/
object MyNetWorkWordCountByWindow {

def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf().setAppName("MyNetWorkWordCountByWindow").setMaster("local[2]")

val ssc = new StreamingContext(conf,Seconds(1))

val lines = ssc.socketTextStream("192.168.1.121", 1235, StorageLevel.MEMORY_ONLY)

val words = lines.flatMap(_.split(" ")).map((_,1))

/**
* reduce By Key And Window
* 三个参数
* 1、要进行什么操作
* 2、窗口的大小
* 3、窗口滑动的距离
*/

val result = words.reduceByKeyAndWindow((x:Int,y:Int)=>(x+y),Seconds(30),Seconds(10))

result.print()

ssc.start()
ssc.awaitTermination()
}

}

3.运行:往端口中发送数据

4.结果

image.png

Spark Streaming实战:写一个wordcount程序,统计从netcat中向端口发送的数据

1.需求:

通过spark streaming统计端口号1234中的信息

2.编写代码:

(1)添加依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>

记得去掉spark-streaming中的provided

(2)MyNetwordWordCount.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package day1211
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel

/**
* 知识点汇总:
* 1、创建StreamingContext-->本质,核心:创建DStream
*
* 2、DStream的表现形式:就是一个RDD
* 操作DSteam和操作RDD是一样的。
*
* 3、使用DStream把连续的数据流编程不连续的RDD
*/
object MyNetwordWordCount {

def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "/Users/macbook/Documents/hadoop/hadoop-2.8.4")
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//local[2]代表开启两个线程
val conf = new SparkConf().setAppName("MyNetwordWordCount").setMaster("local[2]")

//接收两个参数,第一个conf,第二个是采样时间间隔
val ssc = new StreamingContext(conf,Seconds(3))

//创建DStream 从netcat服务器上接收数据 因为接收字符串,所以使用textStream
val lines = ssc.socketTextStream("192.168.1.121", 1234, StorageLevel.MEMORY_ONLY)

val words = lines.flatMap(_.split(" "))

val wordCount = words.map((_,1)).reduceByKey(_+_)
// val wordCount = words.transform(x => x.map((_,1))).reduceByKey(_+_)

wordCount.print()

ssc.start()

//等待任务结束
ssc.awaitTermination()

}

}

3.运行此程序,同时用netcat向1234端口号中发送信息,

1

4.结果:

1

  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信