IN
- Type of the elements emitted by this sinkpublic class RedisSink<IN> extends RichSinkFunction<IN>
The sink takes two arguments FlinkJedisConfigBase
and RedisMapper
.
When FlinkJedisPoolConfig
is passed as the first argument,
the sink will create connection using JedisPool
. Please use this when
you want to connect to a single Redis server.
When FlinkJedisSentinelConfig
is passed as the first argument, the sink will create connection
using JedisSentinelPool
. Please use this when you want to connect to Sentinel.
Please use FlinkJedisClusterConfig
as the first argument if you want to connect to
a Redis Cluster.
Example:
public static class RedisExampleMapper implements RedisMapper<Tuple2<String, String>> {
private RedisCommand redisCommand;
public RedisExampleMapper(RedisCommand redisCommand){
this.redisCommand = redisCommand;
}
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(redisCommand, REDIS_ADDITIONAL_KEY);
}
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig.Builder()
.setHost(REDIS_HOST).setPort(REDIS_PORT).build();
new RedisSink<String>(jedisPoolConfig, new RedisExampleMapper(RedisCommand.LPUSH));
Constructor and Description |
---|
RedisSink(FlinkJedisConfigBase flinkJedisConfigBase,
RedisMapper<IN> redisSinkMapper)
Creates a new
RedisSink that connects to the Redis server. |
Modifier and Type | Method and Description |
---|---|
void |
close()
Closes commands container.
|
void |
invoke(IN input)
Called when new data arrives to the sink, and forwards it to Redis channel.
|
void |
open(Configuration parameters)
Initializes the connection to Redis by either cluster or sentinels or single server.
|
getIterationRuntimeContext, getRuntimeContext, setRuntimeContext
public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper)
RedisSink
that connects to the Redis server.flinkJedisConfigBase
- The configuration of FlinkJedisConfigBase
redisSinkMapper
- This is used to generate Redis command and key value from incoming elements.public void invoke(IN input) throws Exception
RedisDataType
),
a different Redis command will be applied.
Available commands are RPUSH, LPUSH, SADD, PUBLISH, SET, PFADD, HSET, ZADD.invoke
in interface SinkFunction<IN>
invoke
in class RichSinkFunction<IN>
input
- The incoming dataException
public void open(Configuration parameters) throws Exception
open
in interface RichFunction
open
in class AbstractRichFunction
parameters
- The configuration containing the parameters attached to the contract.IllegalArgumentException
- if jedisPoolConfig, jedisClusterConfig and jedisSentinelConfig are all nullException
- Implementations may forward exceptions, which are caught by the runtime. When the
runtime catches an exception, it aborts the task and lets the fail-over logic
decide whether to retry the task execution.Configuration
public void close() throws IOException
close
in interface RichFunction
close
in class AbstractRichFunction
IOException
- if command container is unable to close.Copyright © 2014–2017 The Apache Software Foundation. All rights reserved.