符合中小企业对网站设计、功能常规化式的企业展示型网站建设
本套餐主要针对企业品牌型网站、中高端设计、前端互动体验...
商城网站建设因基本功能的需求不同费用上面也有很大的差别...
手机微信网站开发、微信官网、微信商城网站...
本篇文章为大家展示了如何实现Spark Streaming和Kafka整合,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。
我们提供的服务有:成都做网站、网站制作、成都外贸网站建设、微信公众号开发、网站优化、网站认证、渝北ssl等。为超过千家企事业单位解决了网站和推广的问题。提供周到的售前咨询和贴心的售后服务,是有科学管理、有技术的渝北网站制作公司
最近完成了Spark Streaming和Kafka的整合工作,耗时虽然不长,但是当中还是遇到了不少的坑,记录下来,大家方便绕行。
先说一下环境:
Spark 2.0.0 kafka_2.11-0.10.0.0
之前的项目当中,已经在pom当中添加了需要的Spark Streaming的依赖,这次只需要添加Spark Streaming Kafka的以来就行了,问题来了。首先是我之前添加的Spark Streaming的依赖:
然后是找到的spark streaming对kafka的支持依赖:
请注意2个version部分,好像差的有点多。不管了,照着例子写写看,果然报了各种class not found的错误。基本可以判断是版本差异造成的问题。
可是,在http://mvnrepository.com上找不到更高版本的依赖怎么办呢?
考虑了一下,只有一个办法了,下载spark源码,自行编译打包需要的jar包。
在github上找到spark项目,clone下来,懒病又犯了,也没仔细看当中的说明,直接就clean compile等等。结果又是各种报错。好吧,好好看看吧,github上给了个地址:http://spark.apache.org/docs/latest/building-spark.html,照着做就没问题了。
然后把项目当中pom里面对streaming kafka的依赖删掉,引入我们自己生成的jar包:
spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar
然后贴上代码:
val conf = new SparkConf().setAppName("kafkastream").setMaster("spark://master:7077").
set("spark.driver.host", "192.168.1.142").
setJars(List("/src/git/msgstream/out/artifacts/msgstream_jar/msgstream.jar",
"/src/git/msgstream/lib/kafka-clients-0.10.0.0.jar",
"/src/git/msgstream/lib/kafka_2.11-0.10.0.0.jar",
"/src/git/msgstream/lib/spark-streaming-kafka-0-10_2.11-2.1.0-SNAPSHOT.jar"))
val ssc = new StreamingContext(conf, Seconds(2))
val topics = List("woozoom")
val kafkaParams = Map(("bootstrap.servers", "master:9092,slave01:9092,slave02:9092"),
("group.id", "sparkstreaming"), ("key.deserializer", classOf[StringDeserializer]),
("value.deserializer", classOf[StringDeserializer]))
val preferredHosts = LocationStrategies.PreferConsistent
val offsets = Map(new TopicPartition("woozoom", 0) -> 2L)
val lines = KafkaUtils.createDirectStream[String, String](
ssc,
preferredHosts,
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offsets))
lines.foreachRDD(rdd => {
rdd.foreach(x => {
println(x)
})
})
ssc.start()
ssc.awaitTermination()
上面标红的部分,是需要注意的,而这些本来我也是不会写的,后来去到spark源码找到test代码
/src/git/spark/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
测试,通过!!!
上述内容就是如何实现Spark Streaming和Kafka整合,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。