redirectPort
If this Connector is supporting non-SSL requests, and a request is received for which a matching <security-constraint> requires SSL transport, Catalina will automatically redirect the request to the port number specified here.
# RSA密钥
[hadoop@cu1 conf]$ openssl genrsa -des3 -out server.key 1024
Generating RSA private key, 1024 bit long modulus
...........++++++
................++++++
e is 65537 (0x10001)
Enter pass phrase for server.key:
Verifying - Enter pass phrase for server.key:
# 拷贝一个不需要输入密码的密钥文件
[hadoop@cu1 conf]$ cp server.key server.key.org
[hadoop@cu1 conf]$ openssl rsa -in server.key.org -out server.key
Enter pass phrase for server.key.org:
writing RSA key
# 需要提交给 SSL 认证机构的(也叫作:生成一个证书请求)
[hadoop@cu1 conf]$ openssl req -new -key server.key -out server.csr
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [XX]:CN
State or Province Name (full name) []:GD
Locality Name (eg, city) [Default City]:GZ
Organization Name (eg, company) [Default Company Ltd]:Eshore
Organizational Unit Name (eg, section) []:CU
Common Name (eg, your name or your server's hostname) []:cu.esw.cn
Email Address []:ca@esw.cn
Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
# 认证机构颁发(自己签发证书)
[hadoop@cu1 conf]$ openssl x509 -req -days 365 -in server.csr -signkey server.key -out server.crt
Signature ok
subject=/C=CN/ST=Guangdong/L=Guangzhou/O=Eshore/OU=CU/CN=cu1
Getting Private key
[root@cu1 apache-tomcat-8.0.38]# ll /etc/pki/CA
total 16
drwxr-xr-x. 2 root root 4096 Mar 1 2016 certs 存放CA签署(颁发)过的数字证书(证书备份目录)
drwxr-xr-x. 2 root root 4096 Mar 1 2016 crl 吊销的证书
drwxr-xr-x. 2 root root 4096 Mar 1 2016 newcerts
drwx------. 2 root root 4096 Mar 1 2016 private 用于存放CA的私钥
[root@cu1 apache-tomcat-8.0.38]# ll /etc/pki/tls/
total 24
lrwxrwxrwx 1 root root 19 May 22 2015 cert.pem -> certs/ca-bundle.crt
drwxr-xr-x. 2 root root 4096 Mar 22 2016 certs
drwxr-xr-x. 2 root root 4096 Mar 22 2016 misc
-rw-r--r-- 1 root root 10906 Feb 24 2016 openssl.cnf openssl的CA主配置文件
drwxr-xr-x. 2 root root 4096 Mar 1 2016 private 证书密钥存放目录
# 修改配置(一部分为默认值,一部分与sha2有关)
[root@cu1 pki]# pwd
/etc/pki
[root@cu1 pki]# vi tls/openssl.cnf
修改一些参数,后面的是修改后的
[root@cu1 pki]# diff /home/hadoop/openssl.cnf tls/openssl.cnf
50c50
< certificate = $dir/cacert.pem # The CA certificate
---
> certificate = $dir/ca.crt # The CA certificate
55c55
< private_key = $dir/private/cakey.pem# The private key
---
> private_key = $dir/private/ca.key # The private key
74c74
< default_crl_days= 30 # how long before next CRL
---
> default_crl_days= 365 # how long before next CRL
86,87c86,87
< stateOrProvinceName = match
< organizationName = match
---
> stateOrProvinceName = optional
> organizationName = optional
107c107
< default_md = sha1
---
> default_md = sha256
126c126
< # req_extensions = v3_req # The extensions to add to a certificate request
---
> req_extensions = v3_req # The extensions to add to a certificate request
130c130
< countryName_default = XX
---
> countryName_default = CN
135c135
< #stateOrProvinceName_default = Default Province
---
> #stateOrProvinceName_default = GD
# CA证书
清理原来的旧文件
[root@cu1 pki]# cd CA
[root@cu1 CA]# rm -rf index.txt* serial*
[root@cu1 CA]# rm cacert.pem private/cakey.pem
初始化
[root@cu1 CA]# touch index.txt serial
[root@cu1 CA]# echo 01 > serial
[root@cu1 CA]# openssl genrsa -out private/ca.key 2048
Generating RSA private key, 2048 bit long modulus
.........................................................................+++
...........................+++
e is 65537 (0x10001)
[root@cu1 CA]# chmod 600 private/ca.key
[root@cu1 CA]# openssl req -new -x509 -key private/ca.key -out ca.crt
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [CN]:CN
State or Province Name (full name) []:GD
Locality Name (eg, city) [Default City]:GZ
Organization Name (eg, company) [Default Company Ltd]:Eshore
Organizational Unit Name (eg, section) []:CU
Common Name (eg, your name or your server's hostname) []:esw.cn
Email Address []:ca@esw.cn
# 应用端证书的新建
[root@cu1 conf]# openssl genrsa -out nginx.key 2048
Generating RSA private key, 2048 bit long modulus
...................................................................+++
.....+++
e is 65537 (0x10001)
[root@cu1 conf]#
[root@cu1 conf]# openssl req -new -key nginx.key -out nginx.csr
You are about to be asked to enter information that will be incorporated
into your certificate request.
What you are about to enter is what is called a Distinguished Name or a DN.
There are quite a few fields but you can leave some blank
For some fields there will be a default value,
If you enter '.', the field will be left blank.
-----
Country Name (2 letter code) [CN]:CN
State or Province Name (full name) []:GD
Locality Name (eg, city) [Default City]:GZ
Organization Name (eg, company) [Default Company Ltd]:Eshore
Organizational Unit Name (eg, section) []:Cu
Common Name (eg, your name or your server's hostname) []:cu.esw.cn
Email Address []:cu@esw.cn
Please enter the following 'extra' attributes
to be sent with your certificate request
A challenge password []:
An optional company name []:
查看是否为sha-2
[root@cu1 conf]# openssl req -in nginx.csr -text | grep sha256
# 应用端证书颁发 方式二选一
* 默认的方式,但不是sha-2。把生成的证书请求csr文件发到CA服务器上,在CA上执行:
签发过程 其实默认使用了 配置文件中指定的ca.crt和ca.key这两个文件.
[root@cu1 conf]# openssl ca -in nginx.csr -out nginx.crt
Using configuration from /etc/pki/tls/openssl.cnf
Check that the request matches the signature
Signature ok
Certificate Details:
Serial Number: 1 (0x1)
Validity
Not Before: Jan 19 07:24:24 2017 GMT
Not After : Jan 19 07:24:24 2018 GMT
Subject:
countryName = CN
stateOrProvinceName = GD
organizationName = Eshore
organizationalUnitName = Cu
commonName = cu.esw.cn
emailAddress = cu@esw.cn
X509v3 extensions:
X509v3 Basic Constraints:
CA:FALSE
Netscape Comment:
OpenSSL Generated Certificate
X509v3 Subject Key Identifier:
7B:3D:E8:18:D9:77:20:8F:A2:76:7F:6C:F2:01:65:49:3F:92:1A:7F
X509v3 Authority Key Identifier:
keyid:5F:8C:1E:3D:F7:A1:86:82:22:F5:88:12:36:16:82:49:B6:9C:84:F1
Certificate is to be certified until Jan 19 07:24:24 2018 GMT (365 days)
Sign the certificate? [y/n]:y
1 out of 1 certificate requests certified, commit? [y/n]y
Write out database with 1 new entries
Data Base Updated
[root@cu1 conf]#
* sha-2签名算法
http://www.ibm.com/support/knowledgecenter/zh/SSPMR3_9.0.0/com.ibm.tivoli.tem.doc_9.0/SUA_9.0/com.ibm.license.mgmt.doc/security/t_generate_certificate_CA.html
[root@cu1 conf]# openssl x509 -req -days 365 -in nginx.csr -CA /etc/pki/CA/ca.crt -CAkey /etc/pki/CA/private/ca.key -CAserial /etc/pki/CA/serial -sha256 -out nginx.crt
Signature ok
subject=/C=CN/ST=GD/L=GZ/O=Eshore/OU=Cu/CN=cu.esw.cn/emailAddress=cu@esw.cn
Getting CA Private Key
[root@hadoop-master2 nginx]# curl https://www.winse.com
curl: (60) Peer certificate cannot be authenticated with known CA certificates
More details here: http://curl.haxx.se/docs/sslcerts.html
curl performs SSL certificate verification by default, using a "bundle"
of Certificate Authority (CA) public keys (CA certs). If the default
bundle file isn't adequate, you can specify an alternate file
using the --cacert option.
If this HTTPS server uses a certificate signed by a CA represented in
the bundle, the certificate verification probably failed due to a
problem with the certificate (it might be expired, or the name might
not match the domain name in the URL).
If you'd like to turn off curl's verification of the certificate, use
the -k (or --insecure) option.
[root@hadoop-master2 CA]# cp /etc/pki/tls/certs/ca-bundle.crt{,.bak}
[root@hadoop-master2 CA]# cat /etc/pki/CA/ca.crt >> /etc/pki/tls/certs/ca-bundle.crt
[root@hadoop-master2 CA]#
[root@hadoop-master2 CA]# curl https://www.winse.com
curl: (51) SSL: certificate subject name 'winse.com' does not match target host name 'www.winse.com'
[root@hadoop-master2 CA]# curl https://winse.com
...
org/apache/kafka/clients/consumer/KafkaConsumer.java
private void updateFetchPositions(Set<TopicPartition> partitions) {
// lookup any positions for partitions which are awaiting reset (which may be the
// case if the user called seekToBeginning or seekToEnd. We do this check first to
// avoid an unnecessary lookup of committed offsets (which typically occurs when
// the user is manually assigning partitions and managing their own offsets).
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions()) {
// if we still don't have offsets for all partitions, then we should either seek
// to the last committed position or reset using the auto reset policy
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}
Kafka-0.10.0.1
12345678
org.apache.kafka.clients.consumer.KafkaConsumer#updateFetchPositions
private void updateFetchPositions(Set<TopicPartition> partitions) {
// refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
问题描述以及说明
当订阅同一个主题的多个分区时,每次SparkStreaming会获取每次处理的Offset。
1234567891011121314151617181920212223242526
org.apache.spark.streaming.kafka010.DirectKafkaInputDStream#latestOffsets
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
c.poll(0)
val parts = c.assignment().asScala
// make sure new partitions are reflected in currentOffsets
val newPartitions = parts.diff(currentOffsets.keySet)
// position for new partitions determined by auto.offset.reset if no commit
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
// don't want to consume messages, so pause
c.pause(newPartitions.asJava)
// find latest available offsets
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
val untilOffsets = clamp(latestOffsets())
val offsetRanges = untilOffsets.map { case (tp, uo) =>
val fo = currentOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo)
}
val rdd = new KafkaRDD[K, V](
context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)
...
org.apache.kafka.clients.consumer.KafkaConsumer#position
public long position(TopicPartition partition) {
acquire();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
if (offset == null) {
updateFetchPositions(Collections.singleton(partition));
offset = this.subscriptions.position(partition);
}
return offset;
} finally {
release();
}
}
private void updateFetchPositions(Set<TopicPartition> partitions) {
// lookup any positions for partitions which are awaiting reset (which may be the
// case if the user called seekToBeginning or seekToEnd. We do this check first to
// avoid an unnecessary lookup of committed offsets (which typically occurs when
// the user is manually assigning partitions and managing their own offsets).
fetcher.resetOffsetsIfNeeded(partitions);
if (!subscriptions.hasAllFetchPositions()) {
// if we still don't have offsets for all partitions, then we should either seek
// to the last committed position or reset using the auto reset policy
// first refresh commits for all assigned partitions
coordinator.refreshCommittedOffsetsIfNeeded();
// then do any offset lookups in case some positions are not known
fetcher.updateFetchPositions(partitions);
}
}
org.apache.kafka.clients.consumer.internals.Fetcher#resetOffsetsIfNeeded
public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
for (TopicPartition tp : partitions) {
// TODO: If there are several offsets to reset, we could submit offset requests in parallel
if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
resetOffset(tp);
}
}
org.apache.kafka.clients.consumer.internals.SubscriptionState.TopicPartitionState#seek
private void seek(long offset) {
this.position = offset;
this.resetStrategy = null;
}