一、Ignite是什么
Ignite是: 一个以内存为中心的分布式数据库、缓存和处理平台,可以在PB级数据中,以内存级的速度进行事务性、分析性以及流式负载的处理。
先上一张整体架构图红色部分为Ignite自身功能,除了丰富的处理和结算功能外,定位为内存分布式数据库,可以水平扩展以支持PB级数据,支持数据落地存储和第三方数据库数据存储功能。
二、启动Ignite
1、环境要求
JDK:Oracle JDK8及以上,Open JDK8及以上,IBM JDK8及以上
OS:Linux(任何版本),Mac OS X(10.6及以上),Windows(XP及以上),Windows Server(2008及以上),Oracle Solaris
网络:没有限制(建议10G)
架构:x86,x64,SPARC,PowerPC
2、启动一个Ignite节点
通过二进制版本
从官网下载zip格式压缩包;
解压到系统中的一个安装文件夹;
(可选)配置
IGNITE_HOME
环境变量,指向安装文件夹,确保路径以/
结尾。
默认启动
Linux
$ bin/ignite.sh
Windows
$ bin\ignite.bat
输出
[02:49:12] Ignite node started OK (id=ab5d18a6)
[02:49:12] Topology snapshot [ver=1, nodes=1, CPUs=8, heap=1.0GB]
ignite.sh
会使用config/default-config.xml
这个默认配置文件启动节点。
指定配置文件启动
如果要使用一个定制配置文件,可以将其作为参数传给ignite.sh/bat
,如下:
Linux
$ bin/ignite.sh examples/config/example-ignite.xml
Windows
$ bin\ignite.bat examples\config\example-ignite.xml
配置文件的路径,可以是绝对路径,也可以是相对于IGNITE_HOME
(Ignite安装文件夹)的相对路径,也可以是类路径中的META-INF
文件夹。
- Maven方式嵌入项目启动
gnite中只有ignite-core
模块是必须的,一般来说,要使用基于Spring的xml配置,还需要ignite-spring
模块,要使用SQL查询,还需要ignite-indexing
模块。
下面中的${ignite-version}
需要替换为实际使用的版本。
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>${ignite.version}</version>
</dependency>
每个发布版中,都会有一个示例工程,在开发环境中打开这个工程,然后转到
{ignite_version}/examples
文件夹找到pom.xml
文件,依赖引入之后,各种示例就可以演示Ignite的各种功能了。
三、Ignite节点启动过程
1、JAVA启动入口
Ignite ignite = Ignition.start(); //寻找IGNITE_HOME下default-config.xml启动
Ignite ignite = Ignition.start("examples/config/example-cache.xml");//指定配置文件启动
2、ignite启动配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd ">
<!--
Alter configuration below as needed.
-->
<bean id="igniteCfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- config log4j2 默认从IGNITE_HOME获取配置记录的是IGNITE自己的日志-->
<property name="gridLogger">
<bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
<constructor-arg type="java.lang.String" value="E:\eclipse\workspace\px-ignite-server\src\main\resources\log4j2-spring.xml"/>
</bean>
</property>
<!-- Consistent globally unique node identifier which survives node restarts.-->
<!-- 设置节点固定的一致性id,使得节点使用专用目录和数据分区 ,集群指定节点名时修改此项-->
<property name="consistentId" value="px-ignite2"/>
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<!-- ignite集群间字节码交换,手动写的代码只发送到一个点,其他节点动态感知,这一块可能有问题实际使用时还是每个节点都发布代码为好 -->
<!-- 生产环境中禁用对等类加载以免影响性能 -->
<property name="peerClassLoadingEnabled" value="false"/>
<!-- 为部署的类和任务设置部署模式-->
<!-- Set deployment mode. -->
<property name="deploymentMode" value="CONTINUOUS"/>
<!-- config client mode 设置为服务端模式-->
<property name="clientMode" value="false"/>
<!-- Disable missed resources caching. -->
<!-- 错过的资源缓存的大小,设为0会避免错过的资源缓存,默认值为100 -->
<property name="peerClassLoadingMissedResourcesCacheSize" value="0"/>
<!-- 数据再平衡线程数 -->
<property name="rebalanceThreadPoolSize" value="4"/>
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>
<!-- page大小默认是4kb -->
<!-- <property name="memoryConfiguration">
<bean class="org.apache.ignite.configuration.MemoryConfiguration">
Setting the page size to 4 KB
<property name="pageSize" value="#{4 * 1024}"/>
</bean>
</property>
-->
<!-- 节点自定义存储配置 -->
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
<!-- Threads that generate dirty pages too fast during ongoing checkpoint will be throttled -->
<property name="writeThrottlingEnabled" value="true"/>
<property name="pageSize" value="4096"/>
<!-- 再平衡线程池大小 ,有新节点加入需要数据迁移时使用-->
<!-- property name="rebalanceThreadPoolSize" value="4"/> -->
<!--Checkpointing frequency which is a minimal interval when the dirty pages will be written to the Persistent Store.-->
<!-- 检查点频率 -->
<!-- <property name="checkpointFrequency" value="180000"/> -->
<!-- Number of threads for checkpointing.-->
<!-- 检查点线程数 -->
<!-- <property name="checkpointThreads" value="4"/> -->
<!-- 在检查点同步完成后预写日志历史保留数量-->
<!-- Number of checkpoints to be kept in WAL after checkpoint is finished.-->
<!-- <property name="maxwalarchivesize" value="20"/> -->
<!-- Redefining the default region's settings -->
<property name="defaultDataRegionConfiguration">
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<property name="name" value="Default_Region"/>
<!-- 设置默认内存区最大内存为 1GB. -->
<property name="maxSize" value="#{1L * 1024 * 1024 * 1024}"/>
<!-- 默认内存区开启持久化. -->
<property name="persistenceEnabled" value="true"/>
</bean>
</property>
<property name="dataRegionConfigurations">
<list>
<!-- 自定义内存区并开启持久化-->
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
<!-- 内存区名. -->
<property name="name" value="tableRegion"/>
<!-- 1GB initial size. -->
<property name="initialSize" value="#{1L * 1024 * 1024 * 1024}"/>
<!-- 2GB maximum size. -->
<property name="maxSize" value="#{2L * 1024 * 1024 * 1024}"/>
<!-- 开启持久化. -->
<property name="persistenceEnabled" value="true"/>
<!-- 检查点缓存区默认256m,可以调大 -->
<!-- Increasing the buffer size to 1 GB. -->
<property name="checkpointPageBufferSize"
value="#{1024L * 1024 * 1024}"/>
</bean>
</list>
</property>
<!-- 设置持久化预写日志模式. -->
<property name="walMode" value="FSYNC"/>
<!-- 持久化文件存储路径. -->
<property name="storagePath" value="E:\\ignite-project\\work\\db" />
<!-- 预写日志存储路径. -->
<property name="walPath" value="E:\\ignite-project\\work\\db\\wal" />
<!-- 预写日志归档路径. -->
<property name="walArchivePath" value="E:\\ignite-project\\work\\db\\wal\\archive" />
<!--walPath与 walArchivePath配置为一样表示禁用wal归档 -->
<!-- <property name="walArchivePath" value="E:\\ignite-project\\work\\db\\wal" />
--> </bean>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- 本地节点启动时默认端口 -->
<property name="localPort" value="48500"/>
<!-- 本地节点启动端口被占用时默认重试端口数量 -->
<property name="localPortRange" value="10"/>
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!-- 静态IP发现 -->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<!-- 组播和静态ip发现一起使用 -->
<!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
--> <!-- 配置组名和本机ip地址,使用组播发现 -->
<!-- <property name="multicastGroup" value="127.0.0.1"/>
--> <!-- 配置静态ip地址使用静态发现 -->
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<!-- 这里添加IP地址 -->
<value>192.168.1.212:48500..48509</value>
<value>192.168.1.39:48500..48509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<property name="localPort" value="48100"/>
<property name="localPortRange" value="10"/>
<property name="socketWriteTimeout" value="60000"/>
</bean>
</property>
<!-- 配置缓存 -->
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- 设置sqlSchema为public后jdbc可以直接读取 -->
<property name="sqlSchema" value="PUBLIC"/>
<!-- 设置缓存名与表名一致便于直接使用mybatis直接读取 -->
<property name="name" value="StudentTest"></property>
<!-- 设置缓存模式
--> <property name="cacheMode" value="PARTITIONED"/>
<!--配置事务模式缓存 -->
<property name="atomicityMode" value="ATOMIC"/>
<!-- 设置备份节点数量
--> <property name="backups" value="1"/>
<!-- 同步写模式设置集群间同步 -->
<!-- <property name="writeSynchronizationMode" value="FULL_ASYNC"/>
--> <!-- 启用通读 -->
<property name="readThrough" value="true"></property>
<!-- 启用通写 -->
<property name="writeThrough" value="true"></property>
<!-- 后写缓存 -->
<property name="WriteBehindEnabled" value="true"></property>
<property name="writeBehindFlushThreadCount" value="8"></property>
<!-- 刷新频率设为2s -->
<property name="writeBehindFlushFrequency" value="2000"></property>
<!-- 刷新一次数据大小设为20k -->
<property name="writeBehindFlushSize" value="20048"></property>
<property name="dataRegionName" value="tableRegion"/>
<!--以下两个配置设置再平衡时每100ms发送2m数据 -->
<property name="rebalanceBatchSize" value="#{2 * 1024 * 1024}"/>
<property name="rebalanceThrottle" value="100"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="com.pxjy.puxin.ignite.server.store.CacheStudentStore"></constructor-arg>
</bean>
</property>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.Integer"></property>
<property name="valueType" value="com.pxjy.puxin.entity.StudentTest"></property>
<!-- <property name="tableName" value="student_test"></property>
--> <property name="fields">
<map>
<entry key="id" value="java.lang.Integer"></entry>
<entry key="age" value="java.lang.Integer"></entry>
<entry key="name" value="java.lang.String"></entry>
</map>
</property>
<property name="keyFieldName" value="id"/>
<!-- 定义查询索引 -->
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="id"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</list>
</property>
</beans>
在ignite配置文件中装配IgniteConfiguration对象并注入到Ignite实例中,系统完成启动后即可使用。
四、Ignite集群说明
1、集群化
Ignite具有非常先进的集群能力,包括逻辑集群组和自动发现。Ignite节点之间会自动发现对方,这有助于必要时扩展集群,而不需要重启整个集群。Ignite集群节点关系是一种平等关系无主次之分,节点启动后根据配置的集群发现机制自动发现并加入集群。集群架构图如下:
2、节点类型
Ignite节点有一个客户端模式和服务端模式的概念。
服务端节点参与数据缓存、计算、流处理等为ignite默认的节点。客户端节点也属于集群中的一员可以使用ignite所有接口功能但不缓存数据,设置为客户端节点需要显示设置。
XML配置:
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<!-- Enable client mode. -->
<property name="clientMode" value="true"/>
...
</bean>
Java方式:
Ignition.setClientMode(true);
// Start Ignite in client mode.
Ignite ignite = Ignition.start();
3、集群发现
Ignite的发现机制,根据不同的使用场景,有两种实现:
TCP/IP发现:面向几十以及100-300集群节点设计和优化;
ZooKeeper发现:允许将Ignite集群节点数扩展至百级甚至千级,仍然保证扩展性和性能
-
组播发现
XML配置
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="multicastGroup" value="228.10.10.157"/>
</bean>
</property>
</bean>
</property>
</bean>
Java方式
``
TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
ipFinder.setMulticastGroup("228.10.10.157");
spi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default discovery SPI.
cfg.setDiscoverySpi(spi);
// Start Ignite node.
Ignition.start(cfg);
``
对于组播被禁用的网络组播发现可能无法使用,使用前需了解清楚
静态IP发现
XML
``
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<value>1.2.3.4</value>
<!--
IP Address and optional port range of a remote node.
You can also optionally specify an individual port.
-->
<value>1.2.3.5:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
JAVA
TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryVmIpFinder ipFinder = new TcpDiscoveryVmIpFinder();
// Set initial IP addresses.
// Note that you can optionally specify a port or a port range.
ipFinder.setAddresses(Arrays.asList("1.2.3.4", "1.2.3.5:47500..47509"));
spi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default discovery SPI.
cfg.setDiscoverySpi(spi);
// Start Ignite node.
Ignition.start(cfg);
``
组播和静态IP探测器同时使用
XML
``
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
...
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="multicastGroup" value="228.10.10.157"/>
<!-- list of static IP addresses-->
<property name="addresses">
<list>
<value>1.2.3.4</value>
<!--
IP Address and optional port range.
You can also optionally specify an individual port.
-->
<value>1.2.3.5:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
JAVA
TcpDiscoverySpi spi = new TcpDiscoverySpi();
TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
// Set Multicast group.
ipFinder.setMulticastGroup("228.10.10.157");
// Set initial IP addresses.
// Note that you can optionally specify a port or a port range.
ipFinder.setAddresses(Arrays.asList("1.2.3.4", "1.2.3.5:47500..47509"));
spi.setIpFinder(ipFinder);
IgniteConfiguration cfg = new IgniteConfiguration();
// Override default discovery SPI.
cfg.setDiscoverySpi(spi);
// Start Ignite node.
Ignition.start(cfg);
``
五、Ignite第三方存储
Ignite可以做为第三方数据库的一个缓存层(数据网格),包括RDBMS、Apache Cassandra或者MongoDB,该模式可以对底层数据库进行加速。
JCache规范提供了javax.cache.integration.CacheLoader
和javax.cache.integration.CacheWriter
API,他们分别用于底层持久化存储的通读
和通写
(比如RDBMS中的Oracle或者MySQL,以及NoSQL数据库中的MongoDB或者CouchDB)。除了键-值操作,Ignite还支持INSERT、UPDATE和MERGE操作的通写,但是SELECT查询是无法读取第三方数据库的数据的。
以单独地配置
CacheRLoader
和CacheWriter
,但是在两个单独的类中实现事务化存储是非常尴尬的,因为多个load
和put
操作需要在同一个事务中的同一个连接中共享状态。为了解决这个问题,Ignite提供了org.apacche.ignite.cache.store.CacheStore
接口,他同时扩展了CacheLoader
和CacheWriter
。
事务
CacheStore
是完整事务性的,他会自动地融入当前的缓存事务。CacheJdbcPojoStore
Ignite附带了他自己的CacheJdbcPojoStore
,他会自动地建立Java POJO和数据库模式之间的映射。
1、通读和通写
如果需要通读和通写行为时,就得提供一个正确的CacheStore
实现。通读意味着当缓存无效时会从底层的持久化存储中读取,通写意味着当缓存更新时会自动地进行持久化。所有的通读和通写都会参与整体的缓存事务以及作为一个整体提交或者回滚。
要配置通读和通写,需要实现CacheStore
接口以及设置CacheConfiguration
中cacheStoreFactory
的readThrough
和writeThrough
属性,下面的示例会有说明。
2、后写缓存
在一个简单的通写模式中每个缓存的put和remove操作都会涉及一个持久化存储的请求,因此整个缓存更新的持续时间可能是相对比较长的。另外,密集的缓存更新频率也会导致非常高的存储负载。
对于这种情况,Ignite提供了一个选项来执行异步化的持久化存储更新,也叫做后写,这个方式的主要概念是累加更新操作然后作为一个批量操作异步化地刷入持久化存储中。真实的数据持久化可以被基于时间的事件触发(数据输入的最大时间受到队列的限制),也可以被队列的大小触发(当队列大小达到一个限值),或者通过两者的组合触发,这时任何事件都会触发刷新。
更新顺序
对于后写的方式只有数据的最后一次更新会被写入底层存储。如果键为key1的缓存数据分别依次地更新为值value1、value2和value3,那么只有(key1,value3)对这一个存储请求会被传播到持久化存储。 更新性能
批量的存储操作通常比按顺序的单一存储操作更有效率,因此可以通过开启后写模式的批量操作来利用这个特性。简单类型(put和remove)的简单顺序更新操作可以被组合成一个批量操作。比如,连续地往缓存中加入(key1,value1),(key2,value2),(key3,value3)可以通过一个单一的CacheStore.putAll(...)
操作批量处理。
后写缓存可以通过CacheConfiguration.setWriteBehindEnabled(boolean)
配置项来开启。
3、CacheStore
Ignite中的CacheStore
接口用于向底层的数据存储写入或者加载数据。除了标准的JCache加载和存储方法,他还引入了最终事务划界以及从底层数据存储批量载入数据的能力。
loadCache()
CacheStore.loadCache()
方法可以加载缓存,即使没有传入要加载的所有键,它通常用于启动时缓存的热加载,但是也可以在缓存加载完之后的任何时间点调用。
在每一个相关的集群节点,IgniteCache.loadCache()
方法会分配给CacheStore.loadCache()
方法,如果只想在本地节点上进行加载,可以用IgniteCache.localLoadCache()
方法。
对于分区缓存,不管是主节点还是备份节点,如果键没有被映射到该节点,会被缓存自动丢弃。
load(), write(), delete()
当IgniteCache
接口的get
,put
,remove
方法被调用时,相对应的CacheStore
的load()
,write()
和delete()
方法会被调用,当与单个缓存数据工作时,这些方法会用于启用通读和通写行为。
loadAll(), writeAll(), deleteAll()
当IgniteCache
接口的getAll
,putAll
,removeAll
方法被调用时,相对应的CacheStore
的loadAll()
,writeAll()
和deleteAll()
方法会被调用,当与多个缓存数据工作时,这些方法会用于启用通读和通写行为,他们通常用批量操作的方式实现以提供更好的性能。
CacheStoreAdapter
提供了loadAll()
,writeAll()
和deleteAll()
方法的默认实现,他只是简单地对键进行一个一个地迭代。
sessionEnd()
Ignite有一个存储会话的概念,它可以跨越不止一个CacheStore
操作,会话对于事务非常有用。
对于原子化
的缓存,sessionEnd()
方法会在每个CacheStore
方法完成之后被调用,对于事务化
的缓存,不管是在底层持久化存储进行提交或者回滚多个操作,sessionEnd()
方法都会在每个事务结束后被调用。
CacheStoreAdapater
提供了sessionEnd()
方法的默认的空实现。
4、配置说明
下面的配置参数可以通过CacheConfiguration
用于启用以及配置后写缓存:
setter方法 | 描述 | 默认值 |
---|---|---|
setWriteBehindEnabled(boolean) |
设置后写是否启用的标志 | false |
setWriteBehindFlushSize(int) |
后写缓存的最大值,如果超过了这个限值,所有的缓存数据都会被刷入CacheStore 然后写缓存被清空。如果值为0,刷新操作将会依据刷新频率间隔,注意不能将写缓存大小和刷新频率都设置为0 |
10240 |
setWriteBehindFlushFrequency(long) |
后写缓存的刷新频率,单位为毫秒,该值定义了从对缓存对象进行插入/删除和当相应的操作被施加到CacheStore 的时刻之间的最大时间间隔。如果值为0,刷新会依据写缓存大小,注意不能将写缓存大小和刷新频率都设置为0 |
5000 |
setWriteBehindFlushThreadCount(int) |
执行缓存刷新的线程数 | 1 |
setWriteBehindBatchSize(int) |
后写缓存存储操作的操作数最大值 | 512 |
CacheStore
接口可以在IgniteConfiguration
上通过一个工厂进行设置,就和CacheLoader
和CacheWriter
同样的方式。
六、Ignite瘦客户端
Java瘦客户端将二进制客户端协议暴露给Java开发者。 瘦客户端是一个轻量级的Ignite客户端,通过标准的Socket连接接入集群,不会成为集群拓扑的一部分,也不持有任何数据,也不会参与计算网格的计算。 它所做的只是简单地建立一个与标准Ignite节点的Socket连接,并通过该节点执行所有操作。
如果需要高效使用瘦客户端需要自己创建一个连接池连接Ignite集群
1、maven引入瘦客户端
<properties>
<ignite.version>2.5.0</ignite.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${ignite.version}</version>
</dependency>
</dependencies>
2、使用举例
public static void main(String[] args) {
ClientConfiguration cfg = new ClientConfiguration().setAddresses("127.0.0.1:10800");
try (IgniteClient igniteClient = Ignition.startClient(cfg)) {
System.out.println();
System.out.println(">>> Thin client put-get example started.");
final String CACHE_NAME = "put-get-example";
ClientCache<Integer, Address> cache = igniteClient.getOrCreateCache(CACHE_NAME);
System.out.format(">>> Created cache [%s].\n", CACHE_NAME);
Integer key = 1;
Address val = new Address("1545 Jackson Street", 94612);
cache.put(key, val);
System.out.format(">>> Saved [%s] in the cache.\n", val);
Address cachedVal = cache.get(key);
System.out.format(">>> Loaded [%s] from the cache.\n", cachedVal);
}
catch (ClientException e) {
System.err.println(e.getMessage());
}
catch (Exception e) {
System.err.format("Unexpected failure: %s\n", e);
}
}
该应用做了如下的工作:
使用
Ignition#startClient(clientCfg)
向运行在本地127.0.0.1
上的Ignite服务端发起了一个瘦客户端连接;使用
IgniteClient#getOrCreateCache(cacheName)
创建了一个指定名字的缓存;使用
ClientCache#put(key, val)}
和ClientCache#get(key)
存储和获取数据。
3、缓存管理
下面的方法可以用于获取表示缓存的CacheClient
实例。IgniteClient#cache(String)
:假定指定名字的缓存已经存在,该方法不会与Ignite通信确认该缓存是否真的存在,后续的缓存操作如果缓存不存在会报错;IgniteClient#getOrCreateCache(String),IgniteClient#getOrCreateCache(ClientCacheConfiguration)
:获取指定名字的缓存,如果不存在会进行创建,缓存创建时会使用默认的配置;IgniteClient#createCache(String), IgniteClient#createCache(ClientCacheConfiguration)
:用指定的名字创建缓存,如果缓存已经存在,会报错。
IgniteClient#cacheNames()
方法可以列出所有的已有缓存。
ClientCacheConfiguration cacheCfg = new ClientCacheConfiguration()
.setName("References")
.setCacheMode(CacheMode.REPLICATED)
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
ClientCache<Integer, String> cache = client.getOrCreateCache(cacheCfg);
4、瘦客户端和JCache
目前,瘦客户端只实现了JCache的一个子集,因此并没有实现javax.cache.Cache
(ClientCacheConfiguration
也没有实现javax.cache.configuration
)。ClientCache<K, V>
目前支持如下的JCache API:
-
V get(K key)
; -
void put(K key, V val)
; -
boolean containsKey(K key)
; -
String getName()
; -
CacheClientConfiguration getConfiguration()
; -
Map<K, V> getAll(Set<? extends K> keys)
; -
void putAll(Map<? extends K, ? extends V> map)
; -
boolean replace(K key, V oldVal, V newVal)
; -
boolean replace(K key, V val)
; -
boolean remove(K key)
; -
boolean remove(K key, V oldVal)
; -
void removeAll(Set<? extends K> keys)
; -
void removeAll()
; -
V getAndPut(K key, V val)
; -
V getAndRemove(K key)
; -
V getAndReplace(K key, V val)
; -
boolean putIfAbsent(K key, V val)
; void clear()
ClientCache<K, V>
暴露了JCache中没有的高级的缓存API:
-
int size(CachePeekMode... peekModes)
5、扫描查询
使用ScanQuery<K, V>
可以在服务端使用Java谓词对数据进行过滤,然后在客户端对过滤后的结果集进行迭代。 过滤后的条目是按页传输到客户端的,这样每次只有一个页面的数据会被加载到客户单的内存,页面大小可以通过ScanQuery#setPageSize(int)
进行配置。
Query<Cache.Entry<Integer, Person>> qry = new ScanQuery<Integer, Person>((i, p) -> p.getName().contains("Smith")).setPageSize(1000);
for (Query<Cache.Entry<Integer, Person>> qry : queries) {
try (QueryCursor<Cache.Entry<Integer, Person>> cur = cache.query(qry)) {
for (Cache.Entry<Integer, Person> entry : cur) {
// Handle the entry ...
SQL查询共有2种:
- 数据定义语言语句:用来管理缓存和索引;
- 数据维护语言语句:用来管理数据。
可以通过如下的方式使用瘦客户端的SQL API:
-
IgniteClient#query(SqlFieldsQuery).getAll()
:执行非SELECT语句(DDL和DML); -
IgniteClient#query(SqlFieldsQuery)
:执行SELECT语句以及获取字段的子集; -
IgniteCache#query(SqlQuery)
:执行SELECT语句,获取整个对象并且将结果集反序列化为Java类型实例。
和扫描查询一样,SELECT查询也是按页返回结果集,这样每次只有一个页面的数据加载到客户端的内存。
client.query(
new SqlFieldsQuery(String.format(
"CREATE TABLE IF NOT EXISTS Person (id INT PRIMARY KEY, name VARCHAR) WITH \"VALUE_TYPE=%s\"",
Person.class.getName()
)).setSchema("PUBLIC")
).getAll();
int key = 1;
Person val = new Person(key, "Person 1");
client.query(new SqlFieldsQuery(
"INSERT INTO Person(id, name) VALUES(?, ?)"
).setArgs(val.getId(), val.getName()).setSchema("PUBLIC")).getAll();
Object cachedName = client.query(
new SqlFieldsQuery("SELECT name from Person WHERE id=?").setArgs(key).setSchema("PUBLIC")
).getAll().iterator().next().iterator().next();
assertEquals(val.getName(), cachedName);
七、JDBC方式连接Ignite
1、用JDBC thin驱动连接Ignite
JDBC Thin模式驱动是默认的,是一个轻量级驱动,要使用这种驱动,只需要将ignite-core-{version}.jar
放入应用的类路径即可。
JDBC连接串可以有两种模式:URL查询模式以及分号模式:
// URL query pattern
jdbc:ignite:thin://<hostAndPortRange0>[,<hostAndPortRange1>]...[,<hostAndPortRangeN>][/schema][?<params>]
hostAndPortRange := host[:port_from[..port_to]]
params := param1=value1[¶m2=value2]...[¶mN=valueN]
// Semicolon pattern
jdbc:ignite:thin://<hostAndPortRange0>[,<hostAndPortRange1>]...[,<hostAndPortRangeN>][;schema=<schema_name>][;param1=value1]...[;paramN=valueN]
-
host
:必需,它定义了要接入的集群节点主机地址; -
port_from
:打开连接的端口范围的起始点,如果忽略此参数默认为10800
; -
port_to
:可选,如果忽略此参数则等同于port_from
; -
schema
:要访问的模式名,默认是PUBLIC
,这个名字对应于SQL的ANSI-99标准,不加引号是大小写不敏感的,加引号是大小写敏感的。如果使用了分号模式,模式可以通过参数名schema
定义; -
<params>
:可选。
驱动类名为org.apache.ignite.IgniteJdbcThinDriver
比如,下面就是如何打开到集群节点的连接,监听地址为192.168.0.50:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
// Open the JDBC connection.
Connection conn = DriverManager.getConnection("jdbc:ignite:thin://192.168.0.50");
下表列出了JDBC连接串支持的所有参数:
属性名 | 描述 | 默认值 |
---|---|---|
user |
SQL连接的用户名,如果服务端开启了认证则此参数为必需。 | ignite |
password |
SQL连接的密码,如果服务端开启了认证则此参数为必需。 | ignite |
distributedJoins |
对于非并置数据是否使用分布式关联 | false |
enforceJoinOrder |
是否在查询中强制表的关联顺序,如果配置为true ,查询优化器在关联中不会对表进行重新排序。 |
false |
collocated |
数据是否并置,当执行分布式查询时,它会将子查询发送给各个节点,如果事先知道要查询的数据在相同的节点是并置在一起的,那么Ignite会有显著的性能提升和网络优化。 | false |
replicatedOnly |
查询是否只包含复制表,这是一个潜在的可能提高性能的提示。 | false |
autoCloseServerCursor |
当拿到最后一个结果集时是否自动关闭服务端游标。开启之后,对ResultSet.close() 的调用就不需要网络访问,这样会改进性能。但是,如果服务端游标已经关闭,在调用ResultSet.getMetadata() 方法时会抛出异常,这时为什么默认值为false 的原因。 |
false |
socketSendBuffer |
发送套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 | 0 |
socketReceiveBuffer |
接收套接字缓冲区大小,如果配置为0,会使用操作系统默认值。 | 0 |
tcpNoDelay |
是否使用TCP_NODELAY 选项。 |
true |
lazy |
查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError ,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 |
false |
skipReducerOnUpdate |
开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 | false |
sslMode |
开启SSL连接。可用的模式为:1.require :在客户端开启SSL协议,只有SSL连接才可以接入。2.disable :在客户端禁用SSL协议,只支持普通连接。 |
disable |
sslProtocol |
安全连接的协议名,如果未指定,会使用TLS协议。协议实现由JSSE支持:SSLv3 (SSL), TLSv1 (TLS), TLSv1.1, TLSv1.2
|
TLS |
sslKeyAlgorithm |
创建密钥管理器使用的算法。注意大多数情况使用默认值就可以了。算法实现由JSSE提供:PKIX (X509 or SunPKIX), SunX509
|
|
sslClientCertificateKeyStoreUrl |
客户端密钥存储库文件的url,这是个强制参数,因为没有密钥管理器SSL上下文无法初始化。如果sslMode 为require 并且未通过属性文件指定密钥存储库 URL,那么会使用JSSE属性javax.net.ssl.keyStore 的值。 |
JSSE系统属性javax.net.ssl.keyStore 的值。 |
sslClientCertificateKeyStorePassword |
客户端密钥存储库密码。如果sslMode 为require 并且未通过属性文件指定密钥存储库密码,那么会使用JSSE属性javax.net.ssl.keyStorePassword 的值。 |
JSSE属性javax.net.ssl.keyStorePassword 的值。 |
sslClientCertificateKeyStoreType |
用于上下文初始化的客户端密钥存储库类型。如果sslMode 为require 并且未通过属性文件指定密钥存储库类型,那么会使用JSSE属性javax.net.ssl.keyStoreType 的值。 |
JSSE属性javax.net.ssl.keyStoreType 的值,如果属性未定义,默认值为JKS。 |
sslTrustCertificateKeyStoreUrl |
truststore文件的URL。这是个可选参数,但是sslTrustCertificateKeyStoreUrl 和sslTrustAll 必须配置一个。如果sslMode 为require 并且未通过属性文件指定truststore文件URL,那么会使用JSSE属性javax.net.ssl.trustStore 的值。 |
JSSE系统属性javax.net.ssl.trustStore 的值。 |
sslTrustCertificateKeyStorePassword |
truststore密码。如果sslMode 为require 并且未通过属性文件指定truststore密码,那么会使用JSSE属性javax.net.ssl.trustStorePassword 的值。 |
JSSE系统属性javax.net.ssl.trustStorePassword 的值。 |
sslTrustCertificateKeyStoreType |
truststore类型。如果sslMode 为require 并且未通过属性文件指定truststore类型,那么会使用JSSE属性javax.net.ssl.trustStoreType 的值。 |
JSSE系统属性javax.net.ssl.trustStoreType 的值。如果属性未定义,默认值为JKS。 |
sslTrustAll |
禁用服务端的证书验证。配置为true 信任任何服务端证书(撤销的、过期的或者自签名的SSL证书)。注意,如果不能完全信任网络(比如公共互联网),不要在生产中启用该选项。 |
false |
sslFactory |
Factory<SSLSocketFactory> 的自定义实现的类名,如果sslMode 为require 并且指定了该工厂类,自定义的工厂会替换JSSE的默认值,这时其它的SSL属性也会被忽略。 |
连接串示例
-
jdbc:ignite:thin://myHost
:接入myHost
,其它比如端口为10800
等都是默认值; -
jdbc:ignite:thin://myHost:11900
:接入myHost
,自定义端口为11900
,其它为默认值; -
jdbc:ignite:thin://myHost:11900;user=ignite;password=ignite
:接入myHost
,自定义端口为11900
,并且带有用于认证的用户凭据; -
jdbc:ignite:thin://myHost:11900;distributedJoins=true&autoCloseServerCursor=true
:接入myHost
,自定义端口为11900
,开启了分布式关联和autoCloseServerCursor
优化; -
jdbc:ignite:thin://myHost:11900/myschema;lazy=true
:接入myHost
,自定义端口为11900
,模式为MYSCHEMA
,并且开启了查询的延迟执行; -
jdbc:ignite:thin://myHost:11900/"MySchema";lazy=true
:接入myHost
,自定义端口为11900
,模式为MySchema
(模式名区分大小写),并且开启了查询的延迟执行。
多端点 在连接串中配置多个连接端点也是可以的,这样如果连接中断会开启自动故障转移,JDBC驱动会从列表中随机选择一个地址接入。如果之前的连接中断,驱动会选择另一个地址只到连接恢复,如果所有的端点都不可达,JDBC会停止重连并且抛出异常。 下面的示例会显示如何通过连接串传递三个地址:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
// Open the JDBC connection passing several connection endpoints.
Connection conn = DriverManager.getConnection(
"jdbc:ignite:thin://192.168.0.50:101,192.188.5.40:101, 192.168.10.230:101");
集群配置 为了接收和处理来自JDBC Thin驱动转发过来的请求,一个节点需要绑定到一个本地网络端口10800
,然后监听入站请求。 通过IgniteConfiguration
配置ClientConnectorConfiguration
,可以对参数进行修改: Java:
IgniteConfiguration cfg = new IgniteConfiguration()
.setClientConnectorConfiguration(new ClientConnectorConfiguration());
XML:
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="clientConnectorConfiguration">
<bean class="org.apache.ignite.configuration.ClientConnectorConfiguration" />
</property>
</bean>
其支持如下的参数:
参数名 | 描述 | 默认值 |
---|---|---|
host |
绑定的主机名或者IP地址,如果配置为null ,会使用localHost 。 |
null |
port |
绑定的TCP端口,如果指定的端口已被占用,Ignite会使用portRange 属性来查找其他可用的端口。 |
10800 |
portRange |
定义尝试绑定的端口数量,比如,如果端口配置为10800 并且端口范围为100 ,Ignite会从10800开始,在[10800,10900]范围内查找可用端口。 |
100 |
maxOpenCursorsPerConnection |
每个连接打开的服务端游标的最大数量。 | 128 |
threadPoolSize |
线程池中负责请求处理的线程数量。 | max(8,CPU核数) |
socketSendBufferSize |
TCP套接字发送缓冲区大小,如果配置为0 ,会使用操作系统默认值。 |
0 |
socketReceiveBufferSize |
TCP套接字接收缓冲区大小,如果配置为0 ,会使用操作系统默认值。 |
0 |
tcpNoDelay |
是否使用TCP_NODELAY 选项。 |
true |
idleTimeout |
客户端连接空闲超时时间。在空闲超过配置的超时时间后,客户端与服务端的连接会断开。如果该参数配置为0或者负值,空闲超时会被禁用。 | 0 |
isJdbcEnabled |
是否允许JDBC访问。 | true |
isThinClientEnabled |
是否允许瘦客户端访问。 | true |
sslEnabled |
如果开启SSL,只允许SSL客户端连接。一个节点只允许一种连接模式:SSL或普通,一个节点无法同时接收两种模式的客户端连接,但是这个参数集群中的各个节点可以不同。 | false |
useIgniteSslContextFactory |
在Ignite配置中是否使用SSL上下文工厂(具体可以看IgniteConfiguration.sslContextFactory )。 |
true |
sslClientAuth |
是否需要客户端认证。 | false |
sslContextFactory |
提供节点侧SSL的Factory<SSLContext> 实现的类名。 |
null |
JDBC Thin模式驱动并非线程安全 JDBC对象中的
Connection
、Statement
和ResultSet
不是线程安全的。因此不能在多个线程中使用一个JDBC连接对Statement和ResultSet进行操作。 JDBC Thin模式驱动添加了并发保护,如果检测到了并发访问,那么会抛出SQLException
,消息为:Concurrent access to JDBC connection is not allowed [ownThread=<guard_owner_thread_name>, curThread=<current_thread_name>]", SQLSTATE="08006
。
2、JDBC客户端模式驱动
JDBC客户端节点模式驱动使用自己的完整功能的客户端节点连接接入集群,这要求开发者提供一个完整的Spring XML配置作为JDBC连接串的一部分,然后拷贝下面所有的jar文件到应用或者SQL工具的类路径中:
-
{apache_ignite_release}\libs
目录下的所有jar文件; -
{apache_ignite_release}\ignite-indexing
和{apache_ignite_release}\ignite-spring
目录下的所有jar文件;
这个驱动很重,而且可能不支持Ignite的最新SQL特性,但是因为它底层使用客户端节点连接,它可以执行分布式查询,然后在应用端直接对结果进行汇总。 JDBC连接URL的规则如下:
jdbc:ignite:cfg://[<params>@]<config_url>
-
<config_url>
是必需的,表示指向Ignite客户端节点配置文件的任意合法URL,当驱动试图建立到集群的连接时,这个节点会在Ignite JDBC客户端节点驱动中启动; -
<params>
是可选的,格式如下:
param1=value1:param2=value2:...:paramN=valueN
驱动类名为org.apache.ignite.IgniteJdbcDriver
,比如下面的代码,展示了如何打开一个到集群的连接:
// Register JDBC driver.
Class.forName("org.apache.ignite.IgniteJdbcDriver");
// Open JDBC connection (cache name is not specified, which means that we use default cache).
Connection conn = DriverManager.getConnection("jdbc:ignite:cfg://file:///etc/config/ignite-jdbc.xml");
它支持如下的参数:
属性 | 描述 | 默认值 |
---|---|---|
cache |
缓存名,如果未定义会使用默认的缓存,区分大小写 | |
nodeId |
要执行的查询所在节点的Id,对于在本地查询是有用的 | |
local |
查询只在本地节点执行,这个参数和nodeId 参数都是通过指定节点来限制数据集 |
false |
collocated |
优化标志,当Ignite执行一个分布式查询时,他会向单个的集群节点发送子查询,如果提前知道要查询的数据已经被并置到同一个节点,Ignite会有显著的性能提升和网络优化 | false |
distributedJoins |
可以在非并置的数据上使用分布式关联。 | false |
streaming |
通过INSERT 语句为本链接开启批量数据加载模式,具体可以参照后面的流模式 相关章节。 |
false |
streamingAllowOverwrite |
通知Ignite对于重复的已有键,覆写它的值而不是忽略他们,具体可以参照后面的流模式 相关章节。 |
false |
streamingFlushFrequency |
超时时间,毫秒,数据流处理器用于刷新数据,数据默认会在连接关闭时刷新,具体可以参照后面的流模式 相关章节。 |
0 |
streamingPerNodeBufferSize |
数据流处理器的每节点缓冲区大小,具体可以参照后面的流模式 相关章节。 |
1024 |
streamingPerNodeParallelOperations |
数据流处理器的每节点并行操作数。具体可以参照后面的流模式 相关章节。 |
16 |
transactionsAllowed |
目前已经支持了ACID事务,但是仅仅在键-值API层面,在SQL层面Ignite支持原子性,还不支持事务一致性,这意味着使用这个功能的时候驱动可能抛出Transactions are not supported 这样的异常。但是,有时需要使用事务语法(即使不需要事务语义),比如一些BI工具会一直强制事务行为,也需要将该参数配置为true 以避免异常。 |
false |
multipleStatementsAllowed |
JDBC驱动可以同时处理多个SQL语句并且返回多个ResultSet 对象,如果该参数为false,多个语句的查询会返回错误。 |
false |
lazy |
查询延迟执行。Ignite默认会将所有的结果集放入内存然后将其返回给客户端,对于不太大的结果集,这样会提供较好的性能,并且使内部的数据库锁时间最小化,因此提高了并发能力。但是,如果相对于可用内存来说结果集过大,那么会导致频繁的GC暂停,甚至OutOfMemoryError ,如果使用这个标志,可以提示Ignite延迟加载结果集,这样可以在不大幅降低性能的前提下,最大限度地减少内存的消耗。 |
false |
skipReducerOnUpdate |
开启服务端的更新特性。当Ignite执行DML操作时,首先,它会获取所有受影响的中间行给查询发起方进行分析(通常被称为汇总),然后会准备一个更新值的批量发给远程节点。这个方式可能影响性能,如果一个DML操作会移动大量数据条目时,还可能会造成网络堵塞。使用这个标志可以提示Ignite在对应的远程节点上进行中间行的分析和更新。默认值为false,这意味着会首先获取中间行然后发给查询发起方。 | false |