0%

ELK日志系统安装调优

ELK日志系统安装调优

@(ELK安装配置手册)[运维,基本操作, 安装]

[TOC]

ELK日志系统安装

zookeeper安装

zookeeper是kafka的组件,部署在kafka节点上

  1. 官网下载zookeeper

    1
    wget https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/stable/apache-zookeeper-3.5.5-bin.tar.gz
  2. 解压文件

    1
    2
    tar -xzf apache-zookeeper-3.5.5-bin.tar.gz -C /opt/
    ln -sf /opt/apache-zookeeper-3.5.5-bin /opt/zookeeper
  3. 修改配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    cat >/opt/zookeeper/conf/zoo.cfg <<EOF
    tickTime=2000
    dataDir=/data/zookeeper/data
    dataLogDir=/data/zookeeper/logs
    clientPort=2181
    admin.serverPort=2182
    initLimit=10
    syncLimit=5
    server.1=172.22.3.61:2888:3888
    server.2=172.22.3.62:2888:3888
    server.3=172.22.3.63:2888:3888
    EOF
  4. 生成唯一myid

    1
    2
    3
    4
    mkdir -p /data/zookeeper/data /data/zookeeper/logs
    echo 1 >/data/zookeeper/data/myid
    echo 2 >/data/zookeeper/data/myid
    echo 3 >/data/zookeeper/data/myid
  5. systemd启动zookeeper

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    cat > /etc/systemd/system/zookeeper.service <<-EOF 
    [Unit]
    Description=zookeeper
    After=network.target remote-fs.target nss-lookup.target

    [Service]
    User=root
    Type=forking
    Environment="JAVA_HOME=/opt/jdk"
    ExecStart=/opt/zookeeper/bin/zkServer.sh start /opt/zookeeper/conf/zoo.cfg
    ExecReload=/bin/kill -s HUP \$MAINPID
    ExecStop=/bin/kill \$MAINPID
    Restart=always

    [Install]
    WantedBy=multi-user.target
    EOF
1
2
systemctl enable zookeeper.service
systemctl start zookeeper.service

kafka安装

kafka推荐三台4C8G集群,100G的SSD磁盘,日志保留12小时。

  1. 官网下载kafka

    1
    wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
  2. 解压文件

    1
    2
    tar -xzf kafka_2.12-2.3.0.tgz -C /opt/
    ln -s /opt/kafka_2.12-2.3.0 /opt/kafka
  3. 修改配置,broker.id为1,2,3

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    cat > /opt/kafka/config/server.properties <<-EOF
    broker.id=1
    delete.topic.enable=true
    listeners=PLAINTEXT://0.0.0.0:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=1024000
    socket.receive.buffer.bytes=1024000
    socket.request.max.bytes=104857600
    log.dirs=/data/kafka/logs
    num.partitions=3
    default.replication.factor=2
    num.recovery.threads.per.data.dir=2
    offsets.topic.replication.factor=2
    transaction.state.log.replication.factor=2
    transaction.state.log.min.isr=2
    log.retention.hours=24
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=172.22.3.61:2181,172.22.3.62:2181,172.22.3.63:2181
    zookeeper.connection.timeout.ms=6000
    EOF
  4. systemd启动kafka

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    cat > /etc/systemd/system/kafka.service <<-EOF 
    [Unit]
    Description=kafka
    After=network.target remote-fs.target nss-lookup.target

    [Service]
    User=root
    Type=simple
    Environment="JAVA_HOME=/opt/jdk"
    ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
    ExecReload=/bin/kill -s HUP \$MAINPID
    ExecStop=/bin/kill \$MAINPID
    Restart=always

    [Install]
    WantedBy=multi-user.target
    EOF
1
2
3
mkdir -p /data/kafka/data /data/kafka/logs
systemctl enable kafka.service
systemctl start kafka.service

filebeat安装

每台需要收集日志的节点需要安装filebeat,将生产的日志发送给kafka保存起来

  1. 官网下载filebeat

    1
    wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.3.2-linux-x86_64.tar.gz
  2. 解压文件

    1
    2
    tar -xzf filebeat-7.3.2-linux-x86_64.tar.gz -C /opt/
    ln -s /opt/filebeat-7.3.2-linux-x86_64 /opt/filebeat
  3. applog filebeat配置,type需要修改(line8-applog、tech-applog)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    cat >/opt/filebeat/filebeat.yml <<EOF
    filebeat.inputs:
    - type: log
    paths:
    - /data/logs/*/*.log
    fields:
    type: ${HOSTNAME%%-*}-applog
    multiline:
    pattern: '^\|'
    negate: true
    match: after
    tail_files: true

    output.kafka:
    hosts: ["172.22.3.61:9092", "172.22.3.62:9092", "172.22.3.63:9092"]
    topic: '%{[fields][type]}'
    partition.round_robin:
    reachable_only: false
    compression: gzip
    max_message_bytes: 1000000

    logging.to_files: true
    logging.files:
    path: /var/log/filebeat
    name: filebeat
    rotateeverybytes: 10485760
    keepfiles: 7
    EOF
  4. nginx filebeat配置,type需要修改(nginx-access、pre-nginx-access)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
cat >/opt/filebeat/filebeat.yml <<EOF
filebeat.inputs:
- type: log
paths:
- /data/nginx/logs/access.log
fields:
type: nginx-access
tail_files: true
- type: log
paths:
- /data/nginx/logs/error.log
fields:
type: nginx-error
tail_files: true

output.kafka:
hosts: ["172.22.3.61:9092", "172.22.3.62:9092", "172.22.3.63:9092"]
topic: '%{[fields][type]}'
partition.round_robin:
reachable_only: false
compression: gzip
max_message_bytes: 1000000

logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
rotateeverybytes: 10485760
keepfiles: 7
EOF
  1. systemd启动服务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    cat >/etc/systemd/system/filebeat.service <<EOF
    [Unit]
    Description=filebeat
    After=network.target

    [Service]
    User=root
    Type=simple
    ExecStart=/opt/filebeat/filebeat -c /opt/filebeat/filebeat.yml
    ExecReload=/bin/kill -s HUP \$MAINPID
    ExecStop=/bin/kill \$MAINPID
    Restart=always

    [Install]
    WantedBy=multi-user.target
    EOF
    systemctl enable filebeat.service
    systemctl start filebeat.service

elasticsearch安装

elasticsearch推荐单台配置为16U64G,JVM为31G,每台可以处理日志1w条/s,1亿日志、50G磁盘空间,计算自己适合的台数。

  1. 官网下载elasticsearch

    1
    wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.3.2-linux-x86_64.tar.gz
  2. 解压文件

    1
    2
    tar -xzf elasticsearch-7.4.2-linux-x86_64.tar.gz -C /opt/
    ln -sf /opt/elasticsearch-7.4.2 /opt/elasticsearch
  3. 修改配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    cat >/opt/elasticsearch/config/elasticsearch.yml<<EOF
    cluster.name: sre-elasticsearch-prod
    node.name: sre-elasticsearch-prod01

    bootstrap.memory_lock: true

    path.data: /data/elasticsearch/data
    path.logs: /data/elasticsearch/logs

    network.host: 0.0.0.0
    http.port: 9200

    discovery.zen.ping.unicast.hosts: ["172.22.3.69", "172.22.3.70", "172.22.3.71", "172.22.3.73", "172.22.3.74"]
    discovery.zen.minimum_master_nodes: 3
    cluster.initial_master_nodes: ["172.22.3.69", "172.22.3.70", "172.22.3.71", "172.22.3.73", "172.22.3.74"]

    gateway.recover_after_nodes: 3
    action.destructive_requires_name: false

    xpack.security.enabled: true
    xpack.security.transport.ssl.enabled: true
    xpack.security.transport.ssl.verification_mode: certificate
    xpack.security.transport.ssl.keystore.path: certs/elastic-certificates.p12
    xpack.security.transport.ssl.truststore.path: certs/elastic-certificates.p12
    EOF
  4. systemd启动elasticsearch

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    cat > /etc/systemd/system/elasticsearch.service <<-EOF 
    [Unit]
    Description=elasticsearch
    After=network.target remote-fs.target nss-lookup.target

    [Service]
    User=akulaku
    Type=simple
    Environment="JAVA_HOME=/opt/jdk"
    ExecStart=/opt/elasticsearch/bin/elasticsearch
    ExecReload=/bin/kill -s HUP \$MAINPID
    ExecStop=/bin/kill \$MAINPID
    Restart=always

    [Install]
    WantedBy=multi-user.target
    EOF
  5. 修改jvm参数

    1
    vim /opt/elasticsearch/config/jvm.options
  6. 几个节点修改完配置后普通用户启动服务

    1
    2
    3
    4
    mkdir -p /data/elasticsearch/data /data/elasticsearch/logs /opt/elasticsearch/config/certs
    chown akulaku:akulaku -R /data/elasticsearch/ /opt/elasticsearch-7.4.2/
    systemctl enable elasticsearch.service
    systemctl start elasticsearch.service
  7. 修改limit限制

    1
    2
    3
    4
    5
    vim /etc/systemd/system.conf
    DefaultLimitCORE=infinity
    DefaultLimitNOFILE=655350
    DefaultLimitNPROC=204800
    DefaultLimitMEMLOCK=infinity
  8. 查看服务的状态

    1
    GET _cat/health
  9. 配置index模板

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    {
    "order": 0,
    "version": 60001,
    "index_patterns": [
    "logstash-*"
    ],
    "settings": {
    "index": {
    "number_of_shards": "5",
    "number_of_replicas": "0",
    "refresh_interval": "2s"
    }
    },
    "mappings": {
    "_default_": {
    "_all": {
    "enabled": false
    },
    "dynamic_templates": [
    {
    "message_field": {
    "path_match": "message",
    "match_mapping_type": "string",
    "mapping": {
    "type": "text",
    "norms": false
    }
    }
    },
    {
    "string_fields": {
    "match": "*",
    "match_mapping_type": "string",
    "mapping": {
    "type": "text",
    "norms": false,
    "fields": {
    "keyword": {
    "type": "keyword",
    "ignore_above": 256
    }
    }
    }
    }
    }
    ],
    "properties": {
    "@timestamp": {
    "type": "date"
    },
    "@version": {
    "type": "keyword"
    },
    "geoip": {
    "dynamic": true,
    "properties": {
    "ip": {
    "type": "ip"
    },
    "location": {
    "type": "geo_point"
    },
    "latitude": {
    "type": "half_float"
    },
    "longitude": {
    "type": "half_float"
    }
    }
    },
    "message": {
    "type": "text",
    "analyzer": "ik_max_word",
    "search_analyzer": "ik_max_word"
    }
    }
    }
    },
    "aliases": {}
    }
安装ik分词
  1. 在所有elasticsearch安装ik插件
    1
    /opt/elasticsearch/bin/elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v7.4.2/elasticsearch-analysis-ik-7.4.2.zip
安装cerebro
  1. 下载cerebro管理elasticsearch集群(https://github.com/lmenezes/cerebro/releases)

    1
    wget https://github.com/lmenezes/cerebro/releases/download/v0.8.4/cerebro-0.8.4.tgz

    tar -xzf cerebro-0.8.4.tgz -C /opt/
    ln -s /opt/cerebro-0.8.4 /opt/cerebro

  2. systemd启动cerebro

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    cat >/etc/systemd/system/cerebro.service <<EOF
    [Unit]
    Description=cerebro
    After=network.target

    [Service]
    User=root
    Type=simple
    Environment="JAVA_HOME=/opt/jdk"
    ExecStart=/opt/cerebro/bin/cerebro
    ExecReload=/bin/kill -s HUP \$MAINPID
    ExecStop=/bin/kill -s QUIT \$MAINPID
    Restart=always

    [Install]
    WantedBy=multi-user.target
    EOF

logstash安装

> logstash建议单台处理所有topic,JVM为机器内存90%,不会存在资源分配不均衡的问题,logstash需要注意的是pipeline.batch.size和pipeline.batch.delay这两个配置,要多测试,调试出最大的索引速率,elasticsearch索引默认是世界时间,可以添加一段ruby的配置将世界时间转换成北京时间,还有去除一些不需要的字段的操作,可以加快搜索速度,节省存储空间
  1. 官网下载logstash

    1
    wget https://artifacts.elastic.co/downloads/logstash/logstash-7.3.2.tar.gz
  2. 解压文件

    1
    2
    tar -xzf logstash-7.3.2.tar.gz -C /opt/
    ln -s /opt/logstash-7.3.2 /opt/logstash
  3. 修改配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    cat /opt/logstash/config/logstash.yml 
    pipeline.workers: 32
    pipeline.batch.size: 2500
    pipeline.batch.delay: 20
    pipeline.output.workers: 32

    http.host: "0.0.0.0"
    http.port: 9600

    path.config: "/usr/local/logstash/config/yunwei-logstash.conf"

    xpack.monitoring.enabled: true
    xpack.monitoring.elasticsearch.url: "http://172.20.40.20:9200"
    xpack.monitoring.collection.interval: 10s
    xpack.monitoring.collection.pipeline.details.enabled: true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
cat /opt/logstash/config/sre-logstash-prod.conf 
input{
kafka{
bootstrap_servers => "172.22.3.61:9092,172.22.3.62:9092,172.22.3.63:9092"
group_id => "sre-logstash-prod"
topics => ["nginx-access", "nginx-error", "line8-applog", "tech-applog"]
consumer_threads => 24
codec => json
decorate_events => false
client_id => "sre-logstash-prod01"
}
}

filter{
if [fields][type] == "nginx-access" {
grok{
patterns_dir => ["/opt/logstash/config/pattarns"]
match => ["message","%{NGINXACCESSLOG}"]
}
geoip{
source => "client_ip"
}
mutate{
convert => { "request_time" => "float"}
convert => { "response_time" => "float"}
convert => { "response_status" => "integer"}
convert => { "upstream_status" => "integer"}
}
ruby{
code => "
request = event.get('request')
if request.include?'?'
request_path = request.split('?')[0]
event.set('request_path', request_path)
else
event.set('request_path', request)
end
"
}
date{
match => ["logtime","dd/MMM/yyyy:HH:mm:ss Z"]
}
} else if [fields][type] == "nginx-error" {
grok{
patterns_dir => ["/opt/logstash/config/pattarns"]
match => ["message","%{NGINXERRORLOG}"]
}
} else if [fields][type] in ["line8-applog", "tech-applog"] {
grok{
patterns_dir => ["/opt/logstash/config/pattarns"]
match => ["message","%{APPLOG}"]
}
ruby{
code => "event.set('project_name', event.get('[log][file][path]').split('/')[-1].sub('.log', ''))"
}
}
ruby{
code => "event.set('day', (event.get('@timestamp').time.localtime + 8*60*60).strftime('%Y.%m.%d'))"
}
grok{
overwrite => ["message"]
}
mutate{
add_field => {'topic' => "%{[fields][type]}"}
remove_field => ["@version","agent","input","ecs","tags","fields"]
}
}

output {
elasticsearch {
hosts=> ["172.22.3.69:9200", "172.22.3.70:9200", "172.22.3.71:9200"]
index => "logstash-%{topic}-%{day}"
user => "elastic"
password => "xxx"
}
}
  1. systemd启动服务
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    cat >/etc/systemd/system/logstash.service <<EOF
    [Unit]
    Description=logstash
    After=network.target

    [Service]
    User=akulaku
    Type=simple
    ExecStart=/opt/logstash/bin/logstash
    ExecReload=/bin/kill -s HUP \$MAINPID
    ExecStop=/bin/kill -s QUIT \$MAINPID
    Restart=always

    [Install]
    WantedBy=multi-user.target
    EOF

####kibana安装

kibana配置可以很低,前面最好加一个nginx,可以打印出访问日志

  1. 官网下载kibana

    1
    https://artifacts.elastic.co/downloads/kibana/kibana-7.3.2-linux-x86_64.tar.gz
  2. 解压文件

    1
    2
    tar -xzf kibana-7.3.2-linux-x86_64.tar.gz -C /opt/
    ln -s /opt/kibana-7.3.2-linux-x86_64 /opt/kibana
  3. 修改配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    cat >/opt/kibana/config/kibana.yml <<EOF
    server.host: "0.0.0.0"
    server.port: 5601
    server.maxPayloadBytes: 1048576
    server.name: "sre-kibana-prod01"

    elasticsearch.hosts: "http://172.22.3.69:9200"
    elasticsearch.pingTimeout: 1500
    elasticsearch.requestTimeout: 60000

    logging.quiet: true
    ops.interval: 5000
    i18n.locale: 'cn'
    EOF
  4. systemd启动服务

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    cat >/etc/systemd/system/kibana.service <<EOF
    [Unit]
    Description=kibana
    After=network.target

    [Service]
    User=akulaku
    Type=simple
    ExecStart=/opt/kibana/bin/kibana
    ExecReload=/bin/kill -s HUP \$MAINPID
    ExecStop=/bin/kill -s QUIT \$MAINPID
    Restart=always

    [Install]
    WantedBy=multi-user.target
    EOF

openresty代理

kibana和elasticsearch前面加上openresty代理,可以做到请求负载均衡和打印访问日志

  1. openresty配置
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    cat /usr/local/openresty/nginx/conf/vhost/localhost.conf 
    upstream yunwei_elasticsearch {
    zone zone_for_yunwei_elasticsearch 2m;
    server 172.20.40.14:9200 weight=1 max_fails=3 fail_timeout=30s;
    server 172.20.40.15:9200 weight=1 max_fails=3 fail_timeout=30s;
    server 172.20.40.16:9200 weight=1 max_fails=3 fail_timeout=30s;
    server 172.20.40.17:9200 weight=1 max_fails=3 fail_timeout=30s;
    server 172.20.40.18:9200 weight=1 max_fails=3 fail_timeout=30s;
    ip_hash;
    }

    upstream yunwei_kibana {
    zone zone_for_yunwei_kibana 2m;
    server 172.20.40.20:5601 weight=1 max_fails=3 fail_timeout=30s;
    ip_hash;
    }

    server {
    listen 80;
    location / {
    proxy_pass http://yunwei_kibana;
    }
    }

    server {
    listen 9200;
    location / {
    proxy_pass http://yunwei_elasticsearch;
    }
    }

elasticsearch使用

x-park插件sql查询

x-park可以支持直接使用sql语法从elasticsearch查询数据

1
2
POST /_xpack/sql?format=txt
{"query":"select \"@timestamp\",message from \"logstash-architecture-*\" where fields.app='message-sms' and message like '%Notice%' and \"@timestamp\">'2018-08-16T16:00:00.000Z'"}

修改所有索引备份数

建议number_of_shards为机器数量,不重要的服务,如日志服务number_of_replicas可以设置为0,不需要搜索实时性可以设置refresh_interval大一点,减小elasticsearch压力

1
2
3
4
5
6
7
8
PUT _all/_settings
{
"index": {
"number_of_shards": "5",
"number_of_replicas": "0",
"refresh_interval": "10s"
}
}

####关闭字段_all

_all带来搜索方便,其代价是增加了系统在索引阶段对CPU和存储空间资源的开销

1
2
3
4
5
6
7
8
PUT _all/_mappings
{
"_default_": {
"_all": {
"enabled": false
}
}
}

查看所有索引信息

1
GET /_cat/indices

elasticsearch升级

  1. 禁用分片分配

    1
    2
    3
    4
    5
    6
    PUT _cluster/settings
    {
    "persistent": {
    "cluster.routing.allocation.enable": "primaries"
    }
    }
  2. 停止非必要索引并执行同步刷新(可选)

    1
    POST _flush/synced
  3. 停止并升级单个节点

    1
    systemctl stop elasticsearch.service
  4. 启动已升级的节点

    1
    GET /_cat/nodes
  5. 重新启用分片分配

    1
    2
    3
    4
    5
    6
    PUT _cluster/settings
    {
    "persistent": {
    "cluster.routing.allocation.enable": "all"
    }
    }
  6. 等待节点恢复

    1
    2
    GET /_cat/health
    GET /_cat/recovery
  7. 重复操作

elasticsearch清理n天之前的索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import requests
import time

delete_day_ago = 6
elasticsearch_url = 'http://172.22.3.78:9200'
auth_user = ('elastic', '123')

res = requests.get('{}/_cat/indices'.format(elasticsearch_url), auth=auth_user)
index_list = res.text.split()[2::10]

index_expired_day = time.strftime("%Y.%m.%d", time.localtime(time.time() - delete_day_ago * 24 * 60 * 60))
index_expired_timestamp = time.mktime(time.strptime(index_expired_day, '%Y.%m.%d'))

for index in index_list:
if index.startswith('logstash-'):
index_day = index[-10:]
index_timestamp = time.mktime(time.strptime(index_day, '%Y.%m.%d'))
if int(index_timestamp) < index_expired_timestamp:
requests.delete('{}/{}'.format(elasticsearch_url,index), auth=auth_user)
print('DELETE: {}'.format(index))

ik分词使用

1
2
3
4
5
GET _analyze?pretty
{
"analyzer": "ik_max_word",
"text":"安徽省长江流域"
}

kafka的基本命令使用

列出所有可用的topic

1
./bin/kafka-topics.sh --list --zookeeper localhost:2181

新建topic命令(partitions为kafka节点倍数)

1
./bin/kafka-topics.sh -zookeeper localhost:2181 -replication-factor 2 -partitions 12 -create -topic line8-applog

删除topic

1
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic line8-applog

彻底删除Kafka中的topic

1
2
3
./bin/zkCli.sh
deleteall /brokers/topics/line8-applog
deleteall /admin/delete_topics/line8-applog

查看kafka数据

1
./bin/kafka-consumer-groups.sh --bootstrap-server 172.22.3.61:9092 --describe --group sre-logstash-prod --topic nginx-access

调整topic分片数

1
./bin/kafka-topics.sh --alter --topic nginx-access --zookeeper localhost:2181 --partitions 9

单个topic修改数据过期时间

1
./bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --alter --entity-name line8-applog --add-config retention.ms=86400000

查看topic配置

1
./bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --describe --entity-name line8-applog

命令行从头开始消费数据

1
./bin/kafka-console-consumer.sh --bootstrap-server 172.22.3.61:9092 --topic line8-applog --from-beginning

修改topic的备份数量

查看当前的topic信息
1
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe  --topic line8-applog
编辑修改的策略文件replication.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
{
"version": 1,
"partitions": [
{
"topic": "line8-applog",
"partition": 0,
"replicas": [
1,
2,
3
]
},
{
"topic": "line8-applog",
"partition": 1,
"replicas": [
2,
3,
1
]
},
{
"topic": "line8-applog",
"partition": 2,
"replicas": [
3,
1,
2
]
}
]
}

执行策略
1
./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file replication.json --execute
查看进度
1
./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file replication.json --verify
发送数据
1
./bin/kafka-console-producer.sh --broker-list 172.22.3.61:9092 --topic line8-applog
python测试
1
2
3
4
5
6
7
8
9
10
11
12
13
import json
from kafka import KafkaProducer

kafka_obj = KafkaProducer(bootstrap_servers=['172.22.3.61:9092'])

data = {"id":"11010119900300","account":"1000"}

print(str(data).encode('utf-8'))
res = kafka_obj.send('line8-applog', str(data).encode('utf-8'))
print(res)

kafka_obj.flush()
kafka_obj.close()