<p id="rrtt5"></p><ruby id="rrtt5"></ruby>

      系統城裝機大師 - 固鎮縣祥瑞電腦科技銷售部宣傳站!

      當前位置:首頁 > server > anz > 詳細頁面

      云服務器(Linux)安裝部署Kafka的詳細過程

      時間:2022-11-16來源:www.1999hs.com作者:電腦系統城

      云服務器(Linux)安裝部署Kafka

      前期準備

      kafka的安裝需要依賴于jdk,需要在服務器上提前安裝好該環境,這里使用用jdk1.8。

      下載安裝包

      官網地址:

      較新的版本已自帶Zookeeper,無需額外下載。這里使用3.2.0做演示。

      注意要下載Binary downloads標簽下的tgz包,Source download標簽下的包為源碼。無法直接運行,需要編譯。

      上載安裝包到云服務器

      使用ssh連接工具將kafka_2.12-3.2.0.tgz這個包上傳到云服務器上的一個目錄。

      打開命令行,進入到放有壓縮包的目錄,執行

      1 tar -zxvf kafka_2.12-3.2.0.tgz

      配置kafka

      然后使用cd命令進入到/kafka_2.12-3.2.0/config/下,使用

      1 vi server.properties

      編輯配置文件。

      刪除listeners和advertised前方的#號,改成如下配置:

      1
      2
      3
      4
      5
      listeners=PLAINTEXT://云服務器內網ip:9092(本地訪問用本地ip)
      # 如果要提供外網訪問則必須配置此項
      advertised.listeners=PLAINTEXT://云服務器公網ip:9092(若要遠程訪問需配置此項為云服務器的公網ip)
      # zookeeper連接地址,集群配置格式為ip:port,ip:port,ip:port
      zookeeper.connect=云服務器公網ip:2181

      開放云服務器端口

      在云服務器控制臺內進入安全組頁面,添加兩條新的入站規則,tcp/9092和tcp/2181

      開放linux防火墻端口

      先查看使用的防火墻類型iptables/firewalld

      iptables操作命令

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      1.打開/關閉/重啟防火墻
       
      開啟防火墻(重啟后永久生效):chkconfig iptables on
       
      關閉防火墻(重啟后永久生效):chkconfig iptables off
       
      開啟防火墻(即時生效,重啟后失效):service iptables start
       
      關閉防火墻(即時生效,重啟后失效):service iptables stop
       
      重啟防火墻:service iptables restartd
       
      2.查看打開的端口
       
      /etc/init.d/iptables status
      3.開啟端口
       
      iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
      4.保存并重啟防火墻
      /etc/rc.d/init.d/iptables save
      /etc/init.d/iptables restart

      Centos7默認安裝了firewalld,如果沒有安裝的話,可以使用 yum install firewalld firewalld-config進行安裝。

      操作指令如下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      1.啟動防火墻
       
      systemctl start firewalld
      2.禁用防火墻
       
      systemctl stop firewalld
      3.設置開機啟動
       
      systemctl enable firewalld
      4.停止并禁用開機啟動
       
      sytemctl disable firewalld
      5.重啟防火墻
       
      firewall-cmd --reload
       
      6.查看狀態
       
      systemctl status firewalld或者 firewall-cmd --state
      7.在指定區域打開端口(記得重啟防火墻)
       
      firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

      打開tcp/9092和tcp/2181這兩個端口后,重啟防火墻,并查看開放的端口確實生效。

      啟動kafka服務

      cd命令進入kafka_2.12-3.2.0目錄下,執行

      1 bin/zookeeper-server-start.sh config/zookeeper.properties

      啟動zookeeper,不加-daemon方便排除啟動錯誤,新建一個shell窗口,進入該目錄再執行

      1 bin/kafka-server-start.sh config/server.properties

      啟動kafka,若打印日志未報錯,若未出現error日志,說明啟動成功。

      測試單機連通性

      1
      2
      3
      4
      5
      6
      7
      8
      9
      查詢kafka下所有的topic
      bin/kafka-topics.sh --list --zookeeper ip:port
      因為kafka使用zookeeper作為配置中心,一些topic信息需要查詢該kafka對應的zookeeper
      創建topic
      bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
      開啟生產者
      bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
      開啟消費者
      bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test

      Springboot連接kafak

      在pom.xml文件中引入kafka依賴

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      <dependency>
                  <groupId>org.springframework.kafka</groupId>
                  <artifactId>spring-kafka</artifactId>
                  <version>2.9.0</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                  <version>3.2.0</version>
              </dependency>

      在application.yml配置文件中配置kafka

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      server:
        port: 8080
       
      spring:
        kafka:
          bootstrap-servers: 云服務器外網ip地址:9092
          producer: # 生產者
            retries: 3 # 設置大于0的值,則客戶端會將發送失敗的記錄重新發送
            batch-size: 16384
            buffer-memory: 33554432
            acks: 1
            # 指定消息key和消息體的編解碼方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer:
            group-id: default-group
            enable-auto-commit: false
            auto-offset-reset: earliest
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          listener:
            # 當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交
            # RECORD
            # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后提交
            # BATCH
            # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交
            # TIME
            # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交
            # COUNT
            # TIME | COUNT 有一個條件滿足時提交
            # COUNT_TIME
            # 當每一批poll()的數據被消費者監聽器(ListenerConsumer)處理之后, 手動調用Acknowledgment.acknowledge()后提交
            # MANUAL
            # 手動調用Acknowledgment.acknowledge()后立即提交,一般使用這種
            # MANUAL_IMMEDIATE
            ack-mode: manual_immediate

      生產者

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      @RestController
      public class KafkaController {
          private final static String TOPIC_NAME = "test-topic";
       
          @Autowired
          private KafkaTemplate<String, String> kafkaTemplate;
       
          @RequestMapping("/send")
          public String send(@RequestParam("msg") String msg) {
              kafkaTemplate.send(TOPIC_NAME, "key", msg);
              return String.format("消息 %s 發送成功!", msg);
          }
      }

      消費者

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      @Component
      public class DemoConsumer {
          /**
           * @param record record
           * @KafkaListener(groupId = "testGroup", topicPartitions = {
           * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
           * @TopicPartition(topic = "topic2", partitions = "0",
           * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
           * },concurrency = "6")
           * //concurrency就是同組下的消費者個數,就是并發消費數,必須小于等于分區總數
           */
          @KafkaListener(topics = "test-topic", groupId = "testGroup1")
          public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
              String value = record.value();
              System.out.println("testGroup1 message: " + value);
              System.out.println("testGroup1 record: " + record);
              //手動提交offset,一般是提交一個banch,冪等性防止重復消息
              // === 每條消費完確認性能不好!
              ack.acknowledge();
          }
       
          //配置多個消費組
          @KafkaListener(topics = "test--topic", groupId = "testGroup2")
          public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
              String value = record.value();
              System.out.println("testGroup2 message: " + value);
              System.out.println("testGroup2 record: " + record);
              //手動提交offset
              ack.acknowledge();
          }
      }

      使用swagger測試發送消息

      控制臺打印消息

      分享到:

      相關信息

      • Windows Install Clean Up使用方法

        很多小伙伴都需要對自己的電腦進行清理,這個時候就會需要用到清理工具,很多用戶都不知道Windows Install Clean Up怎么使用,今天就給你們帶來了Windows Install Clean Up使用方法,如果你也需要就來學習下吧。...

        2022-11-10

      • rpc服務器不可用解決方法

        很多用戶都在電腦上遇到過rpc服務器出現錯誤不可用但是不知道怎么去解決,所以今天就給你們帶來了rpc服務器不可用解決方法,有需要的小伙伴快來學習一下操作方法吧。...

        2022-11-10

      系統教程欄目

      欄目熱門教程

      人氣教程排行

      站長推薦

      熱門系統下載

      天堂资源中文WWW,久久精品女人天堂AV免费观看,无码专区一ⅤA亚洲V天堂,免费观看在线AⅤ天堂视频