Spark Streaming:高级数据源

一.Spark Streaming接收Flume数据

1.基于Flume的Push模式

    Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用.Flume可以将数据推送到改receiver.

(1)第一步:Flume的配置文件

(2)第二步:Spark Streaming程序

(3)第三步:注意除了需要使用Flume的lib的jar包以外,还需要以下jar包:
1
spark-streaming-flume_2.1.0.jar
(4)第四步:测试
  • 启动Spark Streaming程序
  • 启动Flume
  • 拷贝日志文件到/root/training/logs目录
  • 观察输出,采集到数据

2.基于Custom Sink的Pull模式

    不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink。Flume将数据推送到sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。

    这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的sink。

以下为配置步骤:

(1)第一步:Flume的配置文件

(2)第二步:Spark Streaming程序

(3)第三步:需要的jar包
  • 将Spark的jar包拷贝到Flume的lib目录下
  • 下面的这个jar包也需要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath
1
spark-streaming-flume-sink_2.1.0.jar
(4)第四步:测试
  • 启动Flume
  • 在IDEA中启动FlumeLogPull
  • 将测试数据拷贝到/root/training/logs
  • 观察IDEA中的输出

二.Spark Streaming接收Kafka数据

Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。

Kafka

1.搭建ZooKeeper(Standalone):

(1)配置/root/training/zookeeper-3.4.10/conf/zoo.cfg文件
1
2
dataDir=/root/training/zookeeper-3.4.10/tmp
server.1=spark81:2888:3888
(2)在/root/training/zookeeper-3.4.10/tmp目录下创建一个myid的空文件
1
echo 1 > /root/training/zookeeper-3.4.6/tmp/myid

2.搭建Kafka环境(单机单broker):

(1)修改server.properties文件

(2)启动Kafka
1
bin/kafka-server-start.sh config/server.properties &

出现以下错误:

需要修改bin/kafka-run-class.sh文件,将这个选项注释掉。

(3)测试Kafka
  • 创建Topic
    1
    bin/kafka-topics.sh --create --zookeeper spark81:2181 -replication-factor 1 --partitions 3 --topic mydemo1
  • 发送消息
    1
    bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
  • 接收消息
    1
    bin/kafka-console-consumer.sh --zookeeper spark81:2181 --topic mydemo1

3.搭建Spark Streaming和Kafka的集成开发环境

    由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程。
下面是依赖的pom.xml文件:

4.基于Receiver的方式

    这个方法使用了Receivers来接收数据。Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。

(1)开发Spark Streaming的Kafka Receivers

(2)测试
  • 启动Kafka消息的生产者
    1
    bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
  • 在IDEA中启动任务,接收Kafka消息

5.直接读取方式

    和基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。

(1)开发Spark Streaming的程序

(2)测试
  • 启动Kafka消息的生产者
    1
    bin/kafka-console-producer.sh --broker-list spark81:9092 --topic mydemo1
  • 在IDEA中启动任务,接收Kafka消息

打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2021 Movle
  • 访问人数: | 浏览次数:

请我喝杯咖啡吧~

支付宝
微信