内容介绍
- Pulsar介绍
- Pulsar关键特性
- Pulsar vs Kafka
- Pulsar架构设计
- Pulsar消息机制
- Pulsar Schema
- Pulsar Functions
- Pulsar Connectors
- Pulsar Deployment
- Pulsar Admin
- Pulsar Manager
- Pulsar Flink
- 更多福利
#############################################################################
## Server parameters
#############################################################################
# Directories BookKeeper outputs its write ahead log.
# Could define multi directories to store write head logs, separated by ','.
journalDirectories=/data/appData/pulsar/bookkeeper/journal
#############################################################################
## Ledger storage settings
#############################################################################
# Directory Bookkeeper outputs ledger snapshots
# could define multi directories to store snapshots, separated by ','
ledgerDirectories=/data/appData/pulsar/bookkeeper/ledgers
### --- Managed Ledger --- ###
# Number of bookies to use when creating a ledger
managedLedgerDefaultEnsembleSize=2
# Number of copies to store for each message
managedLedgerDefaultWriteQuorum=2
# Number of guaranteed copies (acks to wait before write is complete)
managedLedgerDefaultAckQuorum=2
$ $PULSAR_HOME/bin/pulsar zookeeper-shell
> ls /
[admin, bookies, counters, ledgers, loadbalance, managed-ledgers, namespace, pulsar, schemas, stream, zookeeper]
{persistent|non-persistent}://tenant/namespace/topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
list public/default
$ $PULSAR_HOME/bin/pulsar-admin topics \
create persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
lookup persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
delete persistent://public/default/input-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
stats persistent://public/default/input-seed-avro-topic
$ curl http://server-101:8080/admin/v2/persistent/public/default/exclamation-input/stats | python -m json.tool
$ $PULSAR_HOME/bin/pulsar-admin topics \
create-partitioned-topic persistent://public/default/output-seed-avro-topic \
--partitions 2
$ $PULSAR_HOME/bin/pulsar-admin topics \
list-partitioned-topics public/default
$ $PULSAR_HOME/bin/pulsar-admin topics \
get-partitioned-topic-metadata persistent://public/default/output-seed-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin topics \
delete-partitioned-topic persistent://public/default/output-seed-avro-topic
public interface Message<T> {
Map<String, String> getProperties();
boolean hasProperty(String var1);
String getProperty(String var1);
byte[] getData();
T getValue();
MessageId getMessageId();
long getPublishTime();
long getEventTime();
long getSequenceId();
String getProducerName();
boolean hasKey();
String getKey();
boolean hasBase64EncodedKey();
byte[] getKeyBytes();
boolean hasOrderingKey();
byte[] getOrderingKey();
String getTopicName();
Optional<EncryptionContext> getEncryptionCtx();
int getRedeliveryCount();
byte[] getSchemaVersion();
boolean isReplicated();
String getReplicatedFrom();
}
public void send() throws PulsarClientException {
final String serviceUrl = "pulsar://server-100:6650";
// final String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";
// http://pulsar.apache.org/docs/en/client-libraries-java/#client
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.connectionTimeout(10000, TimeUnit.MILLISECONDS)
.build();
final String topic = "persistent://public/default/topic-sensor-temp";
// http://pulsar.apache.org/docs/en/client-libraries-java/#configure-producer
final Producer<byte[]> producer = client.newProducer()
.producerName("sensor-temp")
.topic(topic)
.compressionType(CompressionType.LZ4)
.enableChunking(true)
.enableBatching(true)
.batchingMaxBytes(1024)
.batchingMaxMessages(10)
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
.blockIfQueueFull(true)
.maxPendingMessages(512)
.sendTimeout(1, TimeUnit.SECONDS)
.create();
MessageId mid = producer.send("sensor-temp".getBytes());
System.out.printf("\nmessage with ID %s successfully sent", mid);
mid = producer.newMessage()
.key("sensor-temp-key")
.value("sensor-temp-key".getBytes())
.property("my-key", "my-value")
.property("my-other-key", "my-other-value")
.send();
System.out.printf("message-key with ID %s successfully sent", mid);
producer.close();
client.close();
}
public void consume() throws PulsarClientException {
final String serviceUrl = "pulsar://server-101:6650";
final String topic = "input-seed-avro-topic";
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.enableTcpNoDelay(true)
.build();
final Consumer<byte[]> consumer = client
.newConsumer()
.consumerName("seed-avro-consumer")
.subscriptionName("seed-avro-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(topic)
.receiverQueueSize(10)
.subscribe();
final AvroSchema<SeedEvent> schema = AvroSchema.of(SeedEvent.class);
while (true) {
try {
final Message<byte[]> msg = consumer.receive();
LOG.info("接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",
Thread.currentThread().getId(),
msg.getTopicName(),
msg.getMessageId(),
msg.getSequenceId(),
msg.getEventTime(),
msg.getPublishTime(),
msg.getProducerName(),
msg.getKey(), schema.decode(msg.getValue()));
try {
consumer.acknowledge(msg);
} catch (final PulsarClientException e) {
consumer.negativeAcknowledge(msg);
LOG.error("acknowledge:" + e.getLocalizedMessage(), e);
}
} catch (final PulsarClientException e) {
LOG.error("receive:" + e.getLocalizedMessage(), e);
}
}
}
public void read() throws IOException {
final String serviceUrl = "pulsar://server-101:6650";
final PulsarClient client = PulsarClient.builder()
.serviceUrl(serviceUrl)
.build();
// http://pulsar.apache.org/docs/en/client-libraries-java/#reader
final Reader<byte[]> reader = client.newReader()
.topic("my-topic")
.startMessageId(MessageId.earliest()) // MessageId.latest
.create();
while (true) {
final Message<byte[]> message = reader.readNext();
System.out.println(new String(message.getData()));
}
}
# Default message retention time
# 默认0, 修改为3天=60*24*3
defaultRetentionTimeInMinutes=4320
# Default retention size
# 默认为0, 修改为10G
defaultRetentionSizeInMB=10240
# Default ttl for namespaces if ttl is not already configured at namespace policies. (disable default-ttl with value 0)
ttlDurationDefaultInSeconds=0
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
get-retention public/default
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/retention | python -m json.tool
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
set-retention public/default \
--size 1024M \
--time 5m
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/retention \
--header "Content-Type:application/json" \
--data '{
"retentionTimeInMinutes" : 5,
"retentionSizeInMB" : 1024
}'
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
get-message-ttl public/default
$ curl -X GET http://server-101:8080/admin/v2/namespaces/public/default/messageTTL
$ $PULSAR_HOME/bin/pulsar-admin namespaces \
set-message-ttl public/default \
--messageTTL 1800
$ curl -X POST http://server-101:8080/admin/v2/namespaces/public/default/messageTTL \
--header "Content-Type:application/json" \
--data '1800'
Producer<String> producer = client.newProducer(Schema.STRING).create();
producer.newMessage().value("Hello Pulsar!").send();
Consumer<String> consumer = client.newConsumer(Schema.STRING).subscribe();
consumer.receive();
Schema<KeyValue<Integer, String>> schema = Schema.KeyValue(
Schema.INT32,
Schema.STRING,
KeyValueEncodingType.SEPARATED
);
// Producer
Producer<KeyValue<Integer, String>> producer = client.newProducer(schema)
.topic(TOPIC)
.create();
final int key = 100;
final String value = "value-100";
producer.newMessage().value(new KeyValue<>(key, value)).send();
// Consumer
Consumer<KeyValue<Integer, String>> consumer = client.newConsumer(schema)
.topic(TOPIC).subscriptionName(SubscriptionName).subscribe();
Message<KeyValue<Integer, String>> msg = consumer.receive();
Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage().value(User.builder().userName("pulsar-user").userId(1L).build()).send();
Consumer<User> consumer = client.newConsumer(Schema.AVRO(User.class)).subscribe();
User user = consumer.receive();
$ $PULSAR_HOME/bin/pulsar-admin schemas \
get persistent://public/default/spirit-avro-topic
$ $PULSAR_HOME/bin/pulsar-admin schemas \
get persistent://public/default/spirit-avro-topic \
--version=2
$ $PULSAR_HOME/bin/pulsar-admin schemas upload \
persistent://public/default/test-topic \
--filename $PULSAR_HOME/connectors/json-schema.json
$ $PULSAR_HOME/bin/pulsar-admin schemas \
extract persistent://public/default/test-topic \
--classname com.cloudwise.modal.Packet \
--jar ~/cloudwise-pulsar-1.0.0-RELEASE.jar \
--type json
public void schemaInfo() {
System.out.println("AvroSchema:" + AvroSchema.of(SeedEvent.class).getSchemaInfo());
System.out.println("Schema.AVRO:" + Schema.AVRO(SeedEvent.class).getSchemaInfo());
}
$ $PULSAR_HOME/bin/pulsar-admin schemas \
delete persistent://public/default/spirit-avro-topic
extraServerComponents=org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent
functionsWorkerEnabled=true
pulsarFunctionsCluster: pulsar-cluster
numFunctionPackageReplicas: 2
public class WordCountWindowFunction implements org.apache.pulsar.functions.api.WindowFunction<String, Void> {
@Override
public Void process(Collection<Record<String>> inputs, WindowContext context) throws Exception {
for (Record<String> input : inputs) {
}
return null;
}
}
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-api</artifactId>
<version>${pulsar.version}</version>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-functions-local-runner</artifactId>
<version>${pulsar.version}</version>
</dependency>
public class WordCountFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
@Override
public Void process(String input, Context context) throws Exception {
Arrays.asList(input.split(" ")).forEach(word -> {
String counterKey = word.toLowerCase();
if (context.getCounter(counterKey) == 0) {
context.putState(counterKey, ByteBuffer.wrap(ByteUtils.from(100)));
}
context.incrCounter(counterKey, 1);
});
return null;
}
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--broker-service-url pulsar://server-101:6650 \
--jar target/cloudwise-pulsar-functions-with-dependencies.jar \
--classname com.cloudwise.quickstart.pulsar.functions.WordCountFunction \
--tenant public \
--namespace default \
--name word-count-function \
--inputs persistent://public/default/sentences \
--output persistent://public/default/wordcount
/**
* 基本思路是检查每条消息的内容,根据消息内容将消息路由到不同目的地。
*/
public class RoutingFunction implements org.apache.pulsar.functions.api.Function<String, String> {
@Override
public String process(String input, Context context) throws Exception {
String regex = context.getUserConfigValue("regex").toString();
String matchedTopic = context.getUserConfigValue("matched-topic").toString();
String unmatchedTopic = context.getUserConfigValue("unmatched-topic").toString();
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(input);
if (matcher.matches()) {
context.newOutputMessage(matchedTopic, Schema.STRING).value(input).send();
} else {
context.newOutputMessage(unmatchedTopic, Schema.STRING).value(input).send();
}
return null;
}
}
public class LoggingFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
@Override
public Void process(String s, Context context) throws Exception {
Logger LOG = context.getLogger();
String messageId = context.getFunctionId();
if (s.contains("danger")) {
LOG.warn("A warning was received in message {}", messageId);
} else {
LOG.info("Message {} received\nContent: {}", messageId, s);
}
return null;
}
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--jar cloudwise-pulsar-functions-1.0.0.jar \
--classname com.cloudwise.quickstart.pulsar.functions.LoggingFunction \
--log-topic persistent://public/default/logging-function-logs
public class UserConfigFunction implements org.apache.pulsar.functions.api.Function<String, Void> {
@Override
public Void process(String s, Context context) throws Exception {
Logger log = context.getLogger();
Optional<Object> value = context.getUserConfigValue("word-of-the-day");
if (value.isPresent()) {
log.info("The word of the day is {}", value);
} else {
log.warn("No word of the day provided");
}
return null;
}
}
$ $PULSAR_HOME/bin/pulsar-admin functions create \
--broker-service-url pulsar://server-101:6650 \
--jar target/cloudwise-pulsar-functions-with-dependencies.jar \
--classname com.cloudwise.quickstart.pulsar.functions.UserConfigFunction \
--tenant public \
--namespace default \
--name word-count-function \
--inputs persistent://public/default/userconfig \
--user-config '{"word-of-the-day":"verdure"}'
CREATE DATABASE IF NOT EXISTS monitor;
CREATE TABLE IF NOT EXISTS monitor.pulsar_clickhouse_jdbc_sink
(
id UInt32,
name String
) ENGINE = TinyLog;
INSERT INTO monitor.pulsar_clickhouse_jdbc_sink (id, name)
VALUES (1, 'tmp');
SELECT *
FROM monitor.pulsar_clickhouse_jdbc_sink;
$ vi $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml
{
"userName": "sysop",
"password": "123456",
"jdbcUrl": "jdbc:clickhouse://server-101:8123/monitor",
"tableName": "pulsar_clickhouse_jdbc_sink"
}
$ vi $PULSAR_HOME/connectors/json-schema.json
{
"name": "",
"schema": {
"type": "record",
"name": "SeedEvent",
"namespace": "com.cloudwise.quickstart.model",
"fields": [
{
"name": "id",
"type": [
"null",
"int"
]
},
{
"name": "name",
"type": [
"null",
"string"
]
}
]
},
"type": "JSON",
"properties": {
"__alwaysAllowNull": "true",
"__jsr310ConversionEnabled": "false"
}
}
$ $PULSAR_HOME/bin/pulsar-admin schemas upload \
pulsar-postgres-jdbc-sink-topic \
-f $PULSAR_HOME/connectors/json-schema.json
$ $PULSAR_HOME/bin/pulsar-admin sinks create \
--tenant public \
--namespace default \
--name pulsar-clickhouse-jdbc-sink \
--inputs pulsar-clickhouse-jdbc-sink-topic \
--sink-config-file $PULSAR_HOME/connectors/pulsar-clickhouse-jdbc-sink.yaml \
--archive $PULSAR_HOME/connectors/pulsar-io-jdbc-clickhouse-2.6.2.nar \
--processing-guarantees EFFECTIVELY_ONCE \
--parallelism 1
# 前台启动
$ $PULSAR_HOME/bin/pulsar standalone
# 后台启动
$ $PULSAR_HOME/bin/pulsar-daemon start standalone
$ jps | grep -v Jps
1873 PulsarStandaloneStarter
# 后台停止
$ $PULSAR_HOME/bin/pulsar-daemon stop standalone -force
# consumer
$ $PULSAR_HOME/bin/pulsar-client consume \
persistent://public/default/seed-avro-topic \
--subscription-name cli-pack-avro-subscription \
--subscription-type Exclusive \
--subscription-position Latest \
--num-messages 0
# producer
$ $PULSAR_HOME/bin/pulsar-client produce \
persistent://public/default/seed-avro-topic \
--num-produce 100 \
--messages "Hello Pulsar" \
--separator ","
public void createNonPartitionedTopic() throws PulsarClientException {
final String serviceHttpUrl = "http://10.2.2.26:8080";
final PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(serviceHttpUrl)
.build();
try {
final String namespace = "public/monitor";
List<String> topics = admin.topics().getList(namespace);
topics.forEach(t -> System.err.println("before topic:" + t));
// 以下几种写法是等效的
// final String topic = "input-3-seed-avro-topic";
// final String topic = "public/monitor/input-seed-avro-topic";
final String topic = "persistent://public/default/input-5-seed-avro-topic";
if (topics.indexOf(topic) == -1) {
admin.topics().createNonPartitionedTopic(topic);
admin.schemas().createSchema(topic,
AvroSchema.of(SeedEvent.class).getSchemaInfo());
}
topics = admin.topics().getList(namespace);
topics.forEach(t -> System.err.println("after topic:" + t));
System.err.println("schema:" + admin.schemas().getSchemaInfo(topic));
} catch (final PulsarAdminException e) {
e.printStackTrace();
}
admin.close();
}
$ $PULSAR_HOME/bin/pulsar-admin clusters
$ $PULSAR_HOME/bin/pulsar-admin tenants
$ $PULSAR_HOME/bin/pulsar-admin brokers
$ $PULSAR_HOME/bin/pulsar-admin namespaces
$ $PULSAR_HOME/bin/pulsar-admin topics
$ $PULSAR_HOME/bin/pulsar-admin schemas
$ $PULSAR_HOME/bin/pulsar-admin functions
pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<flink.version>1.11.2</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--statebackend -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--pulsar -->
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-${scala.binary.version}-${flink.version}</artifactId>
<version>2.5.4.1</version>
</dependency>
<!-- format -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency><!-- -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
public class PulsarSinkJob {
private static final Logger LOG = LoggerFactory.getLogger(PulsarSinkJob.class);
public static SourceFunction<SeedEvent> getSeedSource() {
final int interval = 5000;
return new PeriodicEventSource<>(
Integer.MAX_VALUE, interval, new PeriodicEventSource.Creator<SeedEvent>() {
private static final long serialVersionUID = 1L;
@Override
public Collection<SeedEvent> build(long i) {
return Arrays.stream(new String[]{"TEM-A-01", "HUM-A-01", "PRS-A-01"})
.map(code -> {
final SeedEvent event = new SeedEvent(
Instant.now().toEpochMilli(), code, Long.toString(i));
LOG.info("创建消息:[{}] {}", Thread.currentThread().getId(), event);
return event;
})
.collect(Collectors.toList());
}
@Override
public Class<SeedEvent> clazz() {
return SeedEvent.class;
}
});
}
public static FlinkPulsarSink<SeedEvent> getPulsarSink(ParameterTool params) {
// String adminUrl = "http://server-101:8080,server-102:8080,server-103:8080";
final String serviceUrl = params.get("serviceUrl", "pulsar://10.2.2.26:6650");
final String adminUrl = params.get("adminUrl", "http://10.2.2.26:8080");
final String outputTopic = params.get("topic", "output-seed-avro-topic");
final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
final String authParams = params.get("authParams");
final Properties props = new Properties();
props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true");
props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000");
final ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(serviceUrl);
clientConf.setConnectionTimeoutMs(6000);
clientConf.setUseTcpNoDelay(true);
if (!StringUtils.isNullOrWhitespaceOnly(authParams)) {
clientConf.setUseTls(true);
clientConf.setAuthPluginClassName(authPlugin);
clientConf.setAuthParams(authParams);
}
final TopicKeyExtractor<SeedEvent> topicKeyExtractor = new TopicKeyExtractor<SeedEvent>() {
private static final long serialVersionUID = 1L;
@Override
public byte[] serializeKey(SeedEvent element) {
LOG.info("serializeKey:[{}] {}", Thread.currentThread().getId(), element);
return element.getCode().getBytes();
}
@Override
public String getTopic(SeedEvent element) {
return null;
}
};
final FlinkPulsarSink<SeedEvent> sink = new FlinkPulsarSink<>(
adminUrl, Optional.of(outputTopic), clientConf, props, topicKeyExtractor, SeedEvent.class);
return sink;
}
@SuppressWarnings("deprecation")
public static void main(String[] args) {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.setStateBackend(new RocksDBStateBackend(new FsStateBackend("file:///tmp/checkpoint/")));
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
// Job取消和故障时会保留Checkpoint数据, 以便根据实际需要恢复到指定的Checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 确保Checkpoint之间有至少500ms的间隔(Checkpoint最小间隔)
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 检查点必须在一分钟内完成, 否则被丢弃(Checkpoint的超时时间)
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一时间只允许进行一个检查点
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getConfig().setGlobalJobParameters(params);
// DataStream<SeedEvent> stream = env.fromCollection(getSeedEvents()).name("Collection");
final DataStream<SeedEvent> stream = env.addSource(getSeedSource()).name("SourceFunction");
final DataStream<SeedEvent> result = stream
.keyBy(new KeySelector<SeedEvent, String>() {
private static final long serialVersionUID = 1L;
@Override
public String getKey(SeedEvent value) throws Exception {
return value.getCode();
}
})
.process(new KeyedProcessFunction<String, SeedEvent, SeedEvent>() {
private static final long serialVersionUID = 1L;
private Map<String, String> infos;
private transient ListState<String> state;
@Override
public void open(Configuration parameters) throws Exception {
LOG.info("open...");
this.state = getRuntimeContext().getListState(
new ListStateDescriptor<>("state", String.class));
this.infos = new HashMap<>();
this.infos.put("open", LocalDateTime.now().toString());
}
@Override
public void close() throws Exception {
LOG.info("close...");
}
@Override
public void processElement(SeedEvent value,
KeyedProcessFunction<String, SeedEvent, SeedEvent>.Context ctx, Collector<SeedEvent> out)
throws Exception {
LOG.info("processElement...");
final StringBuffer buffer = new StringBuffer();
this.state.get().forEach(t -> buffer.append(t));
LOG.info("CurrentKey:{} Input:{} State:{} Infos:{}",
ctx.getCurrentKey(), value, buffer, this.infos);
value.setPayload("[Prev]" + value.getPayload());
this.state.clear();
this.state.add(value.toString());
out.collect(value);
}
})
.setParallelism(1);
result
.print()
.setParallelism(1);
result
.addSink(getPulsarSink(params))
.name("FlinkPulsarSink")
.setParallelism(2);
LOG.info("ExecutionPlan:{}", env.getExecutionPlan());
try {
env.execute("PulsarSinkJob");
} catch (final Exception e) {
e.printStackTrace();
}
}
}
public class PulsarSourceJob {
private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceJob.class);
public static FlinkPulsarSource<SeedEvent> getPulsarSource(ParameterTool params) {
// String serviceUrl = "pulsar://server-101:6650,server-102:6650,server-103:6650";
// String adminUrl = "http://server-101:8080,server-102:8080,server-103:8080";
// final String serviceUrl = params.get("serviceUrl", "pulsar://server-101:6650");
// final String adminUrl = params.get("adminUrl", "http://server-101:8080");
final String serviceUrl = params.get("serviceUrl", "pulsar://10.2.2.26:6650");
final String adminUrl = params.get("adminUrl", "http://10.2.2.26:8080");
// final String inputTopic = params.get("topic", "input-1-seed-avro-topic");
// final String subscription = params.get("subscription", "seed-subscription");
final String inputTopics = params.get("topic", "persistent://public/yang11/zlp.gjsjbz.gjbzcd3");
// final String inputTopics = params.get("topic", "public/monitor/input-0-seed-avro-topic");
final String subscription = params.get("subscription", "mutil-seed-subscription");
// final String inputTopicPatten = params.get("topicPatten", "input-1-seed-avro-topic");
// final String subscription = params.get("subscription", "patten-seed-subscription");
final String authPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
final String authParams = params.get("authParams");
final Properties props = new Properties();
// http://pulsar.apache.org/docs/en/client-libraries-java/#reader
props.setProperty(PulsarOptions.PULSAR_READER_OPTION_KEY_PREFIX + "receiverQueueSize", "2000");
// props.setProperty(PulsarOptions.TOPIC_SINGLE_OPTION_KEY, inputTopic);
props.setProperty(PulsarOptions.TOPIC_MULTI_OPTION_KEY, inputTopics);
// props.setProperty(PulsarOptions.TOPIC_PATTERN_OPTION_KEY, inputTopicPatten);
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getPartitionDiscoveryIntervalInMillis
props.setProperty(PulsarOptions.PARTITION_DISCOVERY_INTERVAL_MS_OPTION_KEY, "5000"); // 自动发现topic时间间隔,默认-1
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getClientCacheSize
props.setProperty(PulsarOptions.CLIENT_CACHE_SIZE_OPTION_KEY, "5");
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.flushOnCheckpoint
props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "true");
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.failOnWrite
props.setProperty(PulsarOptions.FAIL_ON_WRITE_OPTION_KEY, "false");
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getPollTimeoutMs
props.setProperty(PulsarOptions.POLL_TIMEOUT_MS_OPTION_KEY, "120000");
// org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher
props.setProperty(PulsarOptions.SUBSCRIPTION_ROLE_OPTION_KEY, subscription);
// org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils.getCommitMaxRetries
props.setProperty(PulsarOptions.COMMIT_MAX_RETRIES, "3");
// org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher.PulsarFetcher
props.setProperty(PulsarOptions.FAIL_ON_DATA_LOSS_OPTION_KEY, "false");
final ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(serviceUrl);
clientConf.setConnectionTimeoutMs(6000);
if (!StringUtils.isNullOrWhitespaceOnly(authParams)) {
clientConf.setUseTls(true);
clientConf.setAuthPluginClassName(authPlugin);
clientConf.setAuthParams(authParams);
}
PulsarDeserializationSchema<SeedEvent> deserializer = null;
deserializer = new PulsarDeserializationSchemaWrapper<>(AvroDeser.of(SeedEvent.class));
deserializer = new PulsarDeserializationSchema<SeedEvent>() {
private static final long serialVersionUID = 1L;
private final DeserializationSchema<SeedEvent> schema = AvroDeser.of(SeedEvent.class);
public void open(DeserializationSchema.InitializationContext context) throws Exception {
this.schema.open(context);
}
@Override
public TypeInformation<SeedEvent> getProducedType() {
return this.schema.getProducedType();
}
@Override
public boolean isEndOfStream(SeedEvent nextElement) {
return this.schema.isEndOfStream(nextElement);
}
@Override
public SeedEvent deserialize(@SuppressWarnings("rawtypes") Message message) throws IOException {
LOG.info("{}", new String(message.getData()));
final SeedEvent value = new SeedEvent();
// final SeedEvent value = this.schema.deserialize(message.getData());
LOG.info("接收消息:[{}] topic:{} mid:{} sid:{} event:{} publish:{} producer:{} key:{} value:{}",
Thread.currentThread().getId(),
message.getTopicName(),
message.getMessageId(),
message.getSequenceId(),
message.getEventTime(),
message.getPublishTime(),
message.getProducerName(),
message.getKey(), value);
return value;
}
};
final FlinkPulsarSource<SeedEvent> source = new FlinkPulsarSource<>(
adminUrl, clientConf, deserializer, props);
source.setStartFromEarliest();
// source.setStartFromSubscription(subscription);
// source.setStartFromLatest();
return source;
}
@SuppressWarnings("deprecation")
public static void main(String[] args) {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStateBackend(new RocksDBStateBackend(new FsStateBackend("file:///tmp/checkpoint/")));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
// 必须开启Checkpoint, 才能从上一次未消费处开始消费, 否则从头开始消费
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(1);
final DataStream<String> stream = env
.addSource(getPulsarSource(params))
.name("FlinkPulsarSource")
.uid("PulsarSource")
.setParallelism(1)
.map(new MapFunction<SeedEvent, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map(SeedEvent value) throws Exception {
return "[SourceJob]" + value;
}
});
stream
.print()
.name("[Print]")
.uid("PrintSink")
.setParallelism(1);
try {
env.execute("PulsarSourceJob");
} catch (final Exception e) {
e.printStackTrace();
}
}
}
|