十年网站开发经验 + 多家企业客户 + 靠谱的建站团队
量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决
这篇文章主要介绍“Oracle数据怎么发送到kafka传输数据”,在日常操作中,相信很多人在Oracle数据怎么发送到kafka传输数据问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Oracle数据怎么发送到kafka传输数据”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
创新互联建站服务项目包括南岸网站建设、南岸网站制作、南岸网页制作以及南岸网络营销策划等。多年来,我们专注于互联网行业,利用自身积累的技术优势、行业经验、深度合作伙伴关系等,向广大中小型企业、政府机构等提供互联网行业的解决方案,南岸网站推广取得了明显的社会效益与经济效益。目前,我们服务的客户以成都为中心已经辐射到南岸省份的部分城市,未来相信会继续扩大服务区域并继续获得客户的支持与信任!
配置
OGG ADPATER FOR KAFKA
需要的kafka包:
Kafka 0.8.2.1
kafka-clients-0.8.2.1.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
snappy-java-1.1.1.6.jar
###################################################################配置OGG
主库
dblogin userid goldengate, password oggpassword
add extract EXTJMS,tranlog, threads 3,begin now
add exttrail /data/oggsrc/dirdat/jm, extract EXTJMS megabytes 200
add trandata testKAFKA.*
--add schematrandata testKAFKA
抽取进程:
edit param extjms
EXTRACT EXTJMS
SETENV (ORACLE_SID = "rac3")
SETENV (ORACLE_HOME=/data/app/oracle/product/10.2.0/db_1)
SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")
DBOPTIONS ALLOWUNUSEDCOLUMN, FETCHBATCHSIZE 1500
userid goldengate, password oggpassword
EXTTRAIL /data/oggsrc/dirdat/jm, FORMAT RELEASE 9.5
DISCARDFILE /data/oggsrc/dirtmp/EXTJMS.dsc, APPEND, MEGABYTES 500
tranlogoptions asmuser SYS@rac_asm, ASMPASSWORD oracle_123
THREADOPTIONS MAXCOMMITPROPAGATIONDELAY 90000
WARNLONGTRANS 30MIN, CHECKINTERVAL 3MIN
CHECKPOINTSECS 5
FLUSHCSECS 80
GETUPDATEBEFORES
NOCOMPRESSUPDATES
NOCOMPRESSDELETES
RecoveryOptions OverwriteMode
--DDL INCLUDE ALL
DDL INCLUDE MAPPED &
exclude objname testKAFKA.PK_CATEGORY_RANKLIST &
exclude objtype 'PACKAGE' &
exclude objtype 'PACKAGE BODY' &
exclude INSTR 'REPLACE SYNONYM' &
exclude INSTR 'CREATE OR REPLACE PACKAGE' &
exclude objtype 'PROCEDURE' &
exclude objtype 'FUNCTION' &
exclude objtype 'TYPE' &
exclude objtype 'TRIGGER' &
exclude objtype 'GRANT' &
exclude instr 'GRANT' &
exclude objtype 'DATABASE LINK' &
exclude objtype 'CONSTRAINT' &
exclude objtype 'JOB' &
exclude instr 'ALTER SESSION' &
exclude instr 'MATERIALIZED VIEW' &
exclude INSTR 'AS SELECT' &
exclude INSTR 'REPLACE SYNONYM' &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_CMP" &
EXCLUDE OBJNAME "testKAFKA.DBMS_TABCOMP_TEMP_UNCMP"
--GETUPDATEBEFORES
--ddloptions addtrandata,REPORT
FETCHOPTIONS, USESNAPSHOT, NOUSELATESTVERSION, MISSINGROW REPORT
dynamicresolution
EOFDELAYCSECS 5
TABLEEXCLUDE testKAFKA.RULE_ACTION_LOG;
TABLE testKAFKA.* ;
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY, UNIQUE INDEX) COLUMNS;
Database altered.
SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (all) COLUMNS;
Database altered.
SQL> select SUPPLEMENTAL_LOG_DATA_MIN,SUPPLEMENTAL_LOG_DATA_PK,SUPPLEMENTAL_LOG_DATA_UI ,FORCE_LOGGING from v$database;
SUPPLEME SUP SUP FOR
-------- --- --- ---
YES YES YES YES
SQL>
源端添加新的pump进程:
在testKAFKA源库测试添加pump进程:
添加pump进程:
添加新的pump:
add extract EDPKK,exttrailsource /data/oggsrc/dirdat/jm, begin now
edit param EDPKK
EXTRACT EDPKK
setenv (NLS_LANG = AMERICAN_AMERICA.AL32UTF8)
PASSTHRU
GETUPDATEBEFORES
RecoveryOptions OverwriteMode
RMTHOST 192.168.0.3, MGRPORT 7839
RMTTRAIL /data/ogg_for_bigdata/dirdat/kk
RMTTRAIL /data/ogg_for_kafka/dirdat/kk
DISCARDFILE ./dirrpt/EDPKK.dsc,APPEND,MEGABYTES 5
TABLE testKAFKA.* ;
add rmttrail /data/ogg_for_bigdata/dirdat/kk, extract EDPKK megabytes 200
add rmttrail /data/ogg_for_kafka/dirdat/kk,extract EDPKK megabytes 200
编辑定义文件:
userid goldengate, password oggpassword
defsfile dirdef/testKAFKA.def
TABLEEXCLUDE *.DBMS_TABCOMP_TEMP*;
TABLE testKAFKA.*;
传递定义文件:
./defgen paramfile ./dirprm/defgen.prm
cd dirdef/
[oracle@vm01 dirdef]$ scp dirdef/testKAFKA.def oracle@192.168.0.3:/data/ogg_for_bigdata/dirdef
The authenticity of host '192.168.0.3 (192.168.0.3)' can't be established.
RSA key fingerprint is 46:8c:35:61:74:ca:43:e0:b0:74:d5:ff:0c:2f:67:8a.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '192.168.0.3' (RSA) to the list of known hosts.
reverse mapping checking getaddrinfo for bogon failed - POSSIBLE BREAK-IN ATTEMPT!
oracle@192.168.0.3's password:
testKAFKA.def 100% 899KB 898.7KB/s 00:00
[oracle@vm01 dirdef]$
目标端直接端
mgr:
PORT 7839
DYNAMICPORTLIST 7840-7850
--AUTOSTART replicat *
--AUTORESTART replicat *,RETRIES 5,WAITMINUTES 2
AUTORESTART ER *, RETRIES 3, WAITMINUTES 5, RESETMINUTES 10
PURGEOLDEXTRACTS /data/ogg_for_bigdata/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
PURGEOLDEXTRACTS /data/ogg_for_kafka/dirdat/*, USECHECKPOINTS, MINKEEPHOURS 2
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
添加 UE DATA PUMP:
使用版本:
Version 12.1.2.1.4 20470586 OGGCORE_12.1.2.1.0OGGBP_PLATFORMS_150303.1209
在目标端 新端口7889:
ADD EXTRACT repkk, EXTTRAILSOURCE /data/ogg_for_bigdata/dirdat/kk
ADD EXTRACT repkk, EXTTRAILSOURCE /data/ogg_for_kafka/dirdat/kk
edit param repkk
GGSCI (localhost.localdomain) 18> view param repkk
EXTRACT repkk
SETENV (GGS_USEREXIT_CONF ="dirprm/repkk.props")
GetEnv (JAVA_HOME)
GetEnv (PATH)
GetEnv (LD_LIBRARY_PATH)
SourceDefs dirdef/testKAFKA.def
CUserExit libggjava_ue.so CUSEREXIT PassThru IncludeUpdateBefores
GetUpdateBefores
NoCompressDeletes
NoCompressUpdates
NoTcpSourceTimer
TABLEEXCLUDE testKAFKA.MV*;
TABLE testKAFKA.*;
[oracle@repvm dirdef]$ cp testKAFKA.def /data/ogg_for_kafka/dirdef
配置文件:
[oracle@repvm dirprm]$ cat repkk.props
gg.handlerlist =kafkahandler
#gg.handler.kafkahandler.type=oracle.goldengate.handler.kafka.KafkaHandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=core_kafka_producer.properties
gg.handler.kafkahandler.TopicName =zqtest
gg.handler.kafkahandler.format =avro_op
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode =tx
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
#gg.log.level=INFO
gg.log.level=DEBUG
gg.report.time=30sec
#gg.classpath=dirprm/:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar:/data/ogg_for_kafka/dirprm/kafka_jar/*:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/*
gg.classpath=dirprm:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/kafka_2.10-0.8.2.2/libs/*
javawriter.bootoptions=-Xmx4096m -Xms4096m -Djava.class.path=/data/ogg_for_kafka/ggjava/ggjava.jar:/data/ogg_for_kafka/ggjava/resources/lib/*:/data/jdk1.8.0_60/lib/dt.jar:/data/jdk1.8.0_60/lib/tools.jar -Dlog4j.configuration=/data/ogg_for_bigdata/cfg/log4j.properties
[oracle@repvm dirprm]$
javawriter.bootoptions 必须包含ogg for kafka的lib包
kafka的属性文件:
bootstrap.servers=localhost:9092
acks = 1
compression.type = gzip
reconnect.backoff.ms = 1000
value.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer = org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size = 102400
linger.ms = 10000
max.request.size = 5024000
send.buffer.bytes = 5024000
compression.type 参数默认:
配置kafka:
[oracle@repvm kafka_2.10-0.8.2.2]$ pwd
/data/kafka_2.10-0.8.2.2
[oracle@repvm kafka_2.10-0.8.2.2]$
[oracle@repvm kafka_2.10-0.8.2.2]$ grep -v '^$\|^\s*\#' config/server.properties
broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
[oracle@repvm kafka_2.10-0.8.2.2]$
[oracle@repvm libs]$ ll
total 17452
-rw-r--r-- 1 oracle oinstall 53244 Aug 31 2014 jopt-simple-3.2.jar
-rw-r--r-- 1 oracle oinstall 3991269 Sep 3 2015 kafka_2.10-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall 37748 Sep 3 2015 kafka_2.10-0.8.2.2-javadoc.jar
-rw-r--r-- 1 oracle oinstall 2324165 Sep 3 2015 kafka_2.10-0.8.2.2-scaladoc.jar
-rw-r--r-- 1 oracle oinstall 521466 Sep 3 2015 kafka_2.10-0.8.2.2-sources.jar
-rw-r--r-- 1 oracle oinstall 1233391 Sep 3 2015 kafka_2.10-0.8.2.2-test.jar
-rw-r--r-- 1 oracle oinstall 324016 Sep 3 2015 kafka-clients-0.8.2.2.jar
-rw-r--r-- 1 oracle oinstall 481535 Aug 31 2014 log4j-1.2.16.jar
-rw-r--r-- 1 oracle oinstall 165505 Aug 31 2014 lz4-1.2.0.jar
-rw-r--r-- 1 oracle oinstall 82123 Aug 31 2014 metrics-core-2.2.0.jar
-rw-r--r-- 1 oracle oinstall 7126372 Nov 25 2014 scala-library-2.10.4.jar
-rw-r--r-- 1 oracle oinstall 28688 Aug 31 2014 slf4j-api-1.7.6.jar
-rw-r--r-- 1 oracle oinstall 9753 Aug 31 2014 slf4j-log4j12-1.6.1.jar
-rw-r--r-- 1 oracle oinstall 594033 May 29 2015 snappy-java-1.1.1.7.jar
-rw-r--r-- 1 oracle oinstall 64009 Aug 31 2014 zkclient-0.3.jar
-rw-r--r-- 1 oracle oinstall 792964 Aug 31 2014 zookeeper-3.4.6.jar
[oracle@repvm libs]$
kafka-clients-0.8.2.1.jar
lz4-1.2.0.jar
slf4j-api-1.7.6.jar
snappy-java-1.1.1.6.jar
[oracle@repvm bin]$ pwd
/data/kafka_2.10-0.8.2.2/bin
[oracle@repvm bin]$
nohup sh kafka-server-start.sh ../config/server.properties > /tmp/server.log &
先启动zookeeper:
[oracle@repvm kafka_2.10-0.8.2.2]$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
[1] 18645
nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2]$ tail -f nohup.out
[2016-06-02 12:22:42,981] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:os.version=2.6.32-358.el6.x86_64 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.name=oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.home=/home/oracle (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:42,981] INFO Server environment:user.dir=/data/kafka_2.10-0.8.2.2 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,013] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-06-02 12:22:43,035] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)
启动kafka:
[oracle@repvm kafka_2.10-0.8.2.2]$ nohup bin/kafka-server-start.sh config/server.properties &
[1] 18845
nohup: ignoring input and appending output to `nohup.out'
[oracle@repvm kafka_2.10-0.8.2.2]$
创建topic:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest
Created topic "zqtest".
[oracle@repvm kafka_2.10-0.8.2.2]$
查看:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-topics.sh --list --zookeeper localhost:2181
zqtest
[oracle@repvm kafka_2.10-0.8.2.2]$
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic zqtest
测试发送消息:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic zqtest
[2016-06-02 14:33:50,690] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
ds
接收端:
[oracle@repvm kafka_2.10-0.8.2.2]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic zqtest --from-beginning
ds
成功接受!!!
到此,关于“Oracle数据怎么发送到kafka传输数据”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!