网创优客建站品牌官网
为成都网站建设公司企业提供高品质网站建设
热线:028-86922220
成都专业网站建设公司

定制建站费用3500元

符合中小企业对网站设计、功能常规化式的企业展示型网站建设

成都品牌网站建设

品牌网站建设费用6000元

本套餐主要针对企业品牌型网站、中高端设计、前端互动体验...

成都商城网站建设

商城网站建设费用8000元

商城网站建设因基本功能的需求不同费用上面也有很大的差别...

成都微信网站建设

手机微信网站建站3000元

手机微信网站开发、微信官网、微信商城网站...

建站知识

当前位置:首页 > 建站知识

go语言kafak,go语言kafka

聊聊golang的zap的ZapKafkaWriter

本文主要研究一下golang的zap的ZapKafkaWriter

让客户满意是我们工作的目标,不断超越客户的期望值来自于我们对这个行业的热爱。我们立志把好的技术通过有效、简单的方式提供给客户,将通过不懈努力成为客户在信息化领域值得信任、有价值的长期合作伙伴,公司提供的服务项目有:域名与空间、虚拟空间、营销软件、网站建设、黄冈网站维护、网站推广。

WriteSyncer内嵌了io.Writer接口,定义了Sync方法;Sink接口内嵌了zapcore.WriteSyncer及io.Closer接口;ZapKafkaWriter实现Sink接口及zapcore.WriteSyncer接口,其Write方法直接将data通过kafka发送出去。

golang的回调和接口

最近写了个kafka的接收消息的功能,需要使用回调处理收到的消息。

一个是基本的回调,一个是使用接口功能实现回调,对接口是个很好的学习。

1.正常回调

kafka的接收消息处。收到消息后,使用传入的Onmessage进行处理。

调用kafka接收消息的单元,并在调用方写好回调

在调用方实现回调需要执行的方法

感觉还是使用基本回调相对简单点,接口就当学习了。

另外跨包的接口的方法要大写!定位了好久发现个入门的问题。

Golang kafka简述和操作(sarama同步异步和消费组)

一、Kafka简述

1. 为什么需要用到消息队列

异步:对比以前的串行同步方式来说,可以在同一时间做更多的事情,提高效率;

解耦:在耦合太高的场景,多个任务要对同一个数据进行操作消费的时候,会导致一个任务的处理因为另一个任务对数据的操作变得及其复杂。

缓冲:当遇到突发大流量的时候,消息队列可以先把所有消息有序保存起来,避免直接作用于系统主体,系统主题始终以一个平稳的速率去消费这些消息。

2.为什么选择kafka呢?

这没有绝对的好坏,看个人需求来选择,我这里就抄了一段他人总结的的优缺点,可见原文

kafka的优点:

1.支持多个生产者和消费者2.支持broker的横向拓展3.副本集机制,实现数据冗余,保证数据不丢失4.通过topic将数据进行分类5.通过分批发送压缩数据的方式,减少数据传输开销,提高吞高量6.支持多种模式的消息7.基于磁盘实现数据的持久化8.高性能的处理信息,在大数据的情况下,可以保证亚秒级的消息延迟9.一个消费者可以支持多种topic的消息10.对CPU和内存的消耗比较小11.对网络开销也比较小12.支持跨数据中心的数据复制13.支持镜像集群

kafka的缺点:

1.由于是批量发送,所以数据达不到真正的实时2.对于mqtt协议不支持3.不支持物联网传感数据直接接入4.只能支持统一分区内消息有序,无法实现全局消息有序5.监控不完善,需要安装插件6.需要配合zookeeper进行元数据管理7.会丢失数据,并且不支持事务8.可能会重复消费数据,消息会乱序,可用保证一个固定的partition内部的消息是有序的,但是一个topic有多个partition的话,就不能保证有序了,需要zookeeper的支持,topic一般需要人工创建,部署和维护一般都比mq高

3. Golang 操作kafka

3.1. kafka的环境

网上有很多搭建kafka环境教程,这里就不再搭建,就展示一下kafka的环境,在kubernetes上进行的搭建,有需要的私我,可以发yaml文件

3.2. 第三方库

github.com/Shopify/sarama // kafka主要的库*github.com/bsm/sarama-cluster // kafka消费组

3.3. 消费者

单个消费者

funcconsumer(){varwg sync.WaitGroup  consumer, err := sarama.NewConsumer([]string{"172.20.3.13:30901"},nil)iferr !=nil{      fmt.Println("Failed to start consumer: %s", err)return}  partitionList, err := consumer.Partitions("test0")//获得该topic所有的分区iferr !=nil{      fmt.Println("Failed to get the list of partition:, ", err)return}forpartition :=rangepartitionList {      pc, err := consumer.ConsumePartition("test0",int32(partition), sarama.OffsetNewest)iferr !=nil{        fmt.Println("Failed to start consumer for partition %d: %s\n", partition, err)return}      wg.Add(1)gofunc(sarama.PartitionConsumer){//为每个分区开一个go协程去取值formsg :=rangepc.Messages() {//阻塞直到有值发送过来,然后再继续等待fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))        }deferpc.AsyncClose()        wg.Done()      }(pc)  }  wg.Wait()}funcmain(){  consumer()}

消费组

funcconsumerCluster(){  groupID :="group-1"config := cluster.NewConfig()  config.Group.Return.Notifications =trueconfig.Consumer.Offsets.CommitInterval =1* time.Second  config.Consumer.Offsets.Initial = sarama.OffsetNewest//初始从最新的offset开始c, err := cluster.NewConsumer(strings.Split("172.20.3.13:30901",","),groupID, strings.Split("test0",","), config)iferr !=nil{      glog.Errorf("Failed open consumer: %v", err)return}deferc.Close()gofunc(c *cluster.Consumer){      errors := c.Errors()      noti := c.Notifications()for{select{caseerr := -errors:            glog.Errorln(err)case-noti:        }      }  }(c)formsg :=rangec.Messages() {      fmt.Printf("Partition:%d, Offset:%d, key:%s, value:%s\n", msg.Partition, msg.Offset,string(msg.Key),string(msg.Value))      c.MarkOffset(msg,"")//MarkOffset 并不是实时写入kafka,有可能在程序crash时丢掉未提交的offset}}funcmain(){goconsumerCluster()}

3.4. 生产者

同步生产者

packagemainimport("fmt""github.com/Shopify/sarama")funcmain(){  config := sarama.NewConfig()  config.Producer.RequiredAcks = sarama.WaitForAll//赋值为-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。config.Producer.Partitioner = sarama.NewRandomPartitioner//写到随机分区中,默认设置8个分区config.Producer.Return.Successes =truemsg := sarama.ProducerMessage{}  msg.Topic =`test0`msg.Value = sarama.StringEncoder("Hello World!")  client, err := sarama.NewSyncProducer([]string{"172.20.3.13:30901"}, config)iferr !=nil{      fmt.Println("producer close err, ", err)return}deferclient.Close()  pid, offset, err := client.SendMessage(msg)iferr !=nil{      fmt.Println("send message failed, ", err)return}  fmt.Printf("分区ID:%v, offset:%v \n", pid, offset)}

异步生产者

funcasyncProducer(){  config := sarama.NewConfig()  config.Producer.Return.Successes =true//必须有这个选项config.Producer.Timeout =5* time.Second  p, err := sarama.NewAsyncProducer(strings.Split("172.20.3.13:30901",","), config)deferp.Close()iferr !=nil{return}//这个部分一定要写,不然通道会被堵塞gofunc(p sarama.AsyncProducer){      errors := p.Errors()      success := p.Successes()for{select{caseerr := -errors:iferr !=nil{              glog.Errorln(err)            }case-success:        }      }  }(p)for{      v :="async: "+ strconv.Itoa(rand.New(rand.NewSource(time.Now().UnixNano())).Intn(10000))      fmt.Fprintln(os.Stdout, v)      msg := sarama.ProducerMessage{        Topic: topics,        Value: sarama.ByteEncoder(v),      }      p.Input() - msg      time.Sleep(time.Second *1)  }}funcmain(){goasyncProducer()select{      }}

3.5. 结果展示-

同步生产打印:

分区ID:0,offset:90

消费打印:

Partition:0,Offset:90,key:,value:Hello World!

异步生产打印:

async:7272async:7616async:998

消费打印:

Partition:0,Offset:91,key:,value:async:7272Partition:0,Offset:92,key:,value:async:7616Partition:0,Offset:93,key:,value:async:998


分享标题:go语言kafak,go语言kafka
文章转载:http://bjjierui.cn/article/hddijg.html

其他资讯