环境:CentOS 7.6 64bit
,装在了VM虚拟机中。
00、配置网络环境(因为使用了vm虚拟机。非VM环境就无需)
#第一步:修改网络配置(使用NAT模式),打开以下网络配置文件,路径和修改内容如下:
vi /etc/sysconfig/network-scripts/ifcfg-ens33
--修改
ONBOOT=yes
BOOTPROTO=static //静态网络IP dhcp 动态获取网络IP
--添加
IPADDR=192.168.58.xxx
NETMASK=255.255.255.0
GATEWAY=192.168.58.xxx
DNS1=114.114.114.114
--删除
UUID
--进行网络测试
ip addr
systemctl restart network.service
ping www.baidu.com
#第二步:关闭网络防火墙:
systemctl stop firewalld (本次服务内关闭防火墙)
systemctl disable firewalld(禁用防火墙服务,服务器重启后防火墙禁用)
#第三步:关闭软件安装限制:
vi /etc/selinux/config
--修改配置
SELINUX=disabled
01、MySQL安装
一、下载并安装mysql:
wget http://dev.mysql.com/get/mysql57-community-release-el7-10.noarch.rpm
yum -y install mysql57-community-release-el7-10.noarch.rpm
yum -y install mysql-community-server
#遇到错误:
失败的软件包是:mysql-community-client-5.7.41-1.el7.x86_64
GPG 密钥配置为:file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql
#依次执行以下命令
rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
yum install -y mysql-community-server
二、启动并查看状态MySQL:
systemctl start mysqld.service
systemctl status mysqld.service
三、查看MySQL的默认密码:
grep "password" /var/log/mysqld.log
#如下结果,最后类似的随机字符串“3Xn1mKp.TnQ(”就是默认密码
2023-03-16T00:29:16.010934Z 1 [Note] A temporary password is generated for root@localhost: 3Xn1mKp.TnQ(
四、登录进MySQL
mysql -uroot -p
五、修改默认密码(设置密码需要有大小写符号组合---安全性),把下面的my passrod
替换成自己的密码
ALTER USER 'root'@'localhost' IDENTIFIED BY 'my password';
#如下设置会报错,因为密码太简单;这个与验证密码策略validate_password_policy的值有关。默认是1,
#所以刚开始设置的密码必须符合长度,且必须含有数字,小写或大写字母,特殊字符。如果不想设置8位,或者想设置简单点,可以选择Policy0.
ALTER USER 'root'@'localhost' IDENTIFIED BY '123456';
Your password does not satisfy the current policy requirements
#修改validate_password_policy参数的值(等级为0)
mysql> set global validate_password_policy=0;
ALTER USER 'root'@'localhost' IDENTIFIED BY 'Aa123456';
六、开启远程访问 (把下面的my passrod
替换成自己的密码)
grant all privileges on *.* to 'root'@'%' identified by 'my password' with grant option;
flush privileges;
exit
七、在云服务上增加MySQL的端口:
02、Docker环境
首先我们需要安装GCC相关的环境:
yum -y install gcc
yum -y install gcc-c++
安装Docker需要的依赖软件包:
yum install -y yum-utils device-mapper-persistent-data lvm2
设置国内的镜像(提高速度)
yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
更新yum软件包索引:
yum makecache fast
安装DOCKER CE(注意:Docker分为CE版和EE版,一般我们用CE版就够用了)
yum -y install docker-ce
启动Docker:
systemctl start docker
下载回来的Docker版本::
docker version
来一发HelloWorld:
docker run hello-world
03、Docker compose环境
Compose 是用于定义和运行多容器 Docker 应用程序的工具。通过 Compose,您可以使用 YML 文件来配置应用程序需要的所有服务。然后,使用一个命令,就可以从 YML 文件配置中创建并启动所有服务.
运行以下命令以下载 Docker Compose 的当前稳定版本:
sudo curl -L "https://github.com/docker/compose/releases/download/1.24.1/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose
将可执行权限应用于二进制文件:
sudo chmod +x /usr/local/bin/docker-compose
创建软链:
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
测试是否安装成功:
docker-compose --version
04、kafka
kafka >>> compose 文件
新建搭建kafka环境的docker-compose.yml文件,内容如下:
文件内TODO 中的ip需要改成自己的,并且如果你用的是云服务器,那需要把端口给打开。
version: '3'
services:
zookepper:
image: wurstmeister/zookeeper # 原镜像`wurstmeister/zookeeper`
container_name: zookeeper # 容器名为'zookeeper'
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "/etc/localtime:/etc/localtime"
ports: # 映射端口
- "2181:2181"
kafka:
image: wurstmeister/kafka # 原镜像`wurstmeister/kafka`
container_name: kafka # 容器名为'kafka'
volumes: # 数据卷挂载路径设置,将本机目录映射到容器目录
- "/etc/localtime:/etc/localtime"
environment: # 设置环境变量,相当于docker run命令中的-e
KAFKA_BROKER_ID: 0 # 在kafka集群中,每个kafka都有一个BROKER_ID来区分自己
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://ip:9092 # TODO 将kafka的地址端口注册给zookeeper
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 # 配置kafka的监听端口
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 # zookeeper地址
KAFKA_CREATE_TOPICS: "hello_world"
ports: # 映射端口
- "9092:9092"
depends_on: # 解决容器依赖启动先后问题
- zookepper
kafka-manager:
image: sheepkiller/kafka-manager # 原镜像`sheepkiller/kafka-manager`
container_name: kafka-manager # 容器名为'kafka-manager'
environment: # 设置环境变量,相当于docker run命令中的-e
ZK_HOSTS: zookeeper:2181 # zookeeper地址
APPLICATION_SECRET: xxxxx
KAFKA_MANAGER_AUTH_ENABLED: "true" # 开启kafka-manager权限校验
KAFKA_MANAGER_USERNAME: admin # 登陆账户
KAFKA_MANAGER_PASSWORD: 123456 # 登陆密码
ports: # 映射端口
- "9000:9000"
depends_on: # 解决容器依赖启动先后问题
- kafka
kafka >>> 启动kafka
在存放docker-compose.yml的目录下执行启动命令:
#格式为docker-compose up [options] [SERVICE...],
#该命令可以自动完成包括构建镜像,(重新)创建服务,启动服务,并关联服务相关容器的一系列操作。
#-d表示后台执行
docker-compose up -d
可以查看下docker镜像运行的情况:
docker ps
进入kafka 的容器:
docker exec -it kafka sh
创建一个topic(这里我的topicName就叫austin,你们可以改成自己的)
$KAFKA_HOME/bin/kafka-topics.sh --create --topic austin --partitions 4 --zookeeper zookeeper:2181 --replication-factor 1
查看刚创建的topic信息:
$KAFKA_HOME/bin/kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic austin
启动一个消费者:
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --from-beginning --topic austin
新增一个窗口,启动一个生产者:
docker exec -it kafka sh
$KAFKA_HOME/bin/kafka-console-producer.sh --topic=austin --broker-list kafka:9092
kafka >>> Java程序验证
引入Kafka依赖(SpringBoot有默认的版本,不需要写version)
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
yml配置文件
# 以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接
spring.kafka.bootstrap-servers=xx.xx.xx.xx:9092
#如果该值大于零时,表示启用重试失败的发送次数
spring.kafka.producer.retries=3
#每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
#这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
spring.kafka.producer.batch-size=16384
#生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
#acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
#acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
#acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
#可以设置的值为:all, -1, 0, 1
spring.kafka.producer.acks=1
#key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#用于标识此使用者所属的使用者组的唯一字符串。
spring.kafka.consumer.group-id=default‐group
#如果为true,则消费者的偏移量将在后台定期提交,默认值为true
spring.kafka.consumer.enable-auto-commit=false
#当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
#可选的值为latest, earliest, none
spring.kafka.consumer.auto-offset-reset=earliest
#值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#侦听器的AckMode,参见https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets
#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
spring.kafka.listener.ack-mode=manual_immediate
定义一个实体类:
@Data
@Accessors(chain = true)
public class UserLog {
private String username;
private String userid;
private String state;
}
定义生产者(austin是topicName):
@Component
public class UserLogProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 发送数据
* @param userid
*/
public void sendLog(String userid){
UserLog userLog = new UserLog();
userLog.setUsername("jhp").setUserid(userid).setState("0");
System.err.println("发送用户日志数据:"+userLog);
kafkaTemplate.send("austin", JSON.toJSONString(userLog));
}
}
定义消费者:
@Component
@Slf4j
public class UserLogConsumer {
@KafkaListener(topics = {"austin"},groupId = "austinGroup2")
public void consumer(ConsumerRecord<?,?> consumerRecord){
//判断是否为null
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
log.error(">>>austinGroup2>>>>>>> record =" + kafkaMessage);
if(kafkaMessage.isPresent()){
//得到Optional实例中的值
Object message = kafkaMessage.get();
System.err.println("消费消息:"+message);
}
}
/**
* 不指定消费组消费
*
* @param record
* @param ack
*/
@KafkaListener(topics = "austin",groupId = "austinGroup1")
public void listenerOne(ConsumerRecord<?,?> record, Acknowledgment ack) {
//判断是否为null
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
log.error(">>>austinGroup1>>>>>>> record =" + kafkaMessage);
if(kafkaMessage.isPresent()){
//得到Optional实例中的值
Object message = kafkaMessage.get();
System.err.println("消费消息:"+message);
}
//手动提交offset
ack.acknowledge();
}
}
定义接口:
@RestController
public class KafkaTestController {
@Autowired
private UserLogProducer userLogProducer;
/**
* test insert
*/
@GetMapping("/kafka/insert")
public String insert(String userId) {
userLogProducer.sendLog(userId);
return null;
}
}
测试:http://localhost:8080/kafka/insert?userId=33
发送用户日志数据:UserLog(username=jhp, userid=33, state=0)
消费消息:{"state":"0","userid":"33","username":"jhp"}
消费消息:{"state":"0","userid":"33","username":"jhp"}
2023-03-20 02:00:59.269 ERROR 56940 --- [ntainer#0-0-C-1] c.e.kafkademo.config.UserLogConsumer : >>>austinGroup2>>>>>>> record =Optional[{"state":"0","userid":"33","username":"jhp"}]
2023-03-20 02:00:59.270 ERROR 56940 --- [ntainer#1-0-C-1] c.e.kafkademo.config.UserLogConsumer : >>>austinGroup1>>>>>>> record =Optional[{"state":"0","userid":"33","username":"jhp"}]
05、Redis
Redis >>> compose 文件
安装Redis的环境跟上次Kafka是一样的,为了方便我就继续用docker-compose的方式来进行
首先,我们新建一个文件夹redis,然后在该目录下创建出data文件夹、redis.conf文件和docker-compose.yaml文件
redis.conf文件的内容如下(后面的配置可在这更改,比如requirepass 我指定的密码为123456)
protected-mode no
port 6379
timeout 0
save 900 1
save 300 10
save 60 10000
rdbcompression yes
dbfilename dump.rdb
dir /data
appendonly yes
appendfsync everysec
requirepass 123456
docker-compose.yaml的文件内容如下:
version: '3'
services:
redis:
image: redis:latest
container_name: redis
restart: always
ports:
- 6379:6379
volumes:
- ./redis.conf:/usr/local/etc/redis/redis.conf:rw
- ./data:/data:rw
command:
/bin/bash -c "redis-server /usr/local/etc/redis/redis.conf "
配置的工作就完了,如果是云服务器,记得开redis端口6379
Redis >>> 启动Redis
启动Redis跟之前安装Kafka的时候就差不多
# docker-compose up解决错误ERROR: Couldn't connect to Docker daemon at http+docker://localhost - is it running?
# If it's at a non-standard location, specify the URL with the DOCKER_HOST environment variable.
#如果发生以上错误提示,可能是:docker服务没启动,那就启动(命令如下:sudo systemctl start docker)
docker-compose up -d
docker ps
docker exec -it redis redis-cli
进入redis客户端了之后,我们想看验证下是否正常。(在正式输入命令之前,我们需要通过密码校验,在配置文件下配置的密码是123456)
auth 123456
然后随意看看命令是不是正常
set 1 1
get 1
keys *
Redis >>> Java程序验证
在SpringBoot环境下,使用Redis就非常简单了(再次体现出使用SpringBoot的好处)。我们只需要在pom文件下引入对应的依赖,并且在配置文件下配置host/port和password就搞掂了。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>