Laboratorio de Replicación de datos desde BD Oracle hacia ambiente Bigdata (Kafka), utilizando Golden Gate.
INDICE
Introducción
El presente documento tiene por objetivo describir los pasos necesarios para realizar un laboratorio de pruebas, para replicar una base de datos oracle hacia un ambiente BigData (kafka) utilizando como software de Replicación, Golden Gate.
A continuación las versiones utilizadas en el presente laboratorio son las siguientes:
1.- Golden Gate servidor origen (o servidor source): 11.2.1.0.1
2.- Golden Gate servidor destino (o servidor target): 12.3.2.1.1
3.- Kafka 2.0 (Scala 2.11) (el cual será instalado en el servidor de destino)
Todas estas Máquinas virtuales se encuentran implementadas sobre platforma Virtual Box (ver 5.2.18)
A continuación algunas propiedades de las máquinas Virtuales utilizadas:
Link de descarga de máquinas Virtuales ya terminadas:
https://drive.google.com/file/d/1_BadxTRmzAJTXc8tWS_bYWCmEn5ChCBS/view?usp=sharing
Hostnames
|
SOURCE
|
TARGET
|
Ips
|
10.10.10.101
|
10.10.10.102
|
S.O.
|
Oracle Linux 6.9
|
Oracle Linux 7.4
|
Passwords root
|
root
|
root
|
Passwords oracle
|
oracle
|
oracle
|
Oracle_SID
|
DB11G
|
N/A
|
Versión de BD
|
11.2.0.1
|
N/A
|
Versión de Golden Gate
|
11.2.1.0.1
|
12.3.2.1.1
|
Path BD
|
/u01/app/oracle/product/11.2.0/db_1
|
N/A
|
Path GoldenGate
|
/gg
|
/gg
|
Path de Kafka
|
N/A
|
/root/kafka_2.11-2.0.1
|
Link de descarga de máquinas Virtuales ya terminadas:
https://drive.google.com/file/d/1_BadxTRmzAJTXc8tWS_bYWCmEn5ChCBS/view?usp=sharing
Links de Descarga de Herramientas
A continuación los link de descarga de las herramientas ocupadas en este laboratorio:
- Dirección desde dónde descargar Golden Gate para el servidor de origen y el adapter para BigData:
2.- Ubicación desde dónde descargar Big Data (Kafka para el presente laboratorio)
3.- Link con la Instalación de Golden Gate 11g:
Instalación y configuración de Golden Gate en servidor de Origen
Desde el link anterior se descargará el producto de Golden Gate 11.2.1.0 y se procederá a instalar sobre el servidor Source (u origen como le llamaremos también de ahora en adelante). Los siguientes pasos los llevaremos a cabo usando la herramienta sqlplus (obviamente con el usuario de sistema operativo oracle).
Recordar que el software de Instalación de golden gate en el servidor de origen se encuentra instalado en la ruta /gg (Misma path que es ocupado en el server de destino dónde activaremos la replica).
Como aclaración no es materia de este laboratorio mostrar la instalación de Golden Gate (de por si la instalación es muy simple). Para más información al respecto recomendamos seguir el paso a paso decrito en el link: https://www.oracle.com/webfolder/technetwork/tutorials/obe/fmw/goldengate/11g/orcl_orcl/index.html. En Dicho Link pueden seguir el apartado “1.2 Installing Oracle GoldenGate on Linux” dónde se encuentran los screenshot con las distintas pantallas de Instalación de GG.
Observación: Luego de haber instalado GoldenGate sobre un motor de base de datos 11g, no se olviden de crear el link simbólico de la librería libnnz11.so con el nombre libnnz10.so
Ejemplo:
ln -s /u01/app/oracle/product/11.2.0/db_1/lib/libnnz11.so /u01/app/oracle/product/11.2.0/db_1/lib/libnnz10.so
Lo anterior es para evitar errores al levantar la herramienta ggsci.
1.- Sobre el motor de base de datos de origen ejecutaremos los siguientes comandos:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
ALTER DATABASE FORCE LOGGING;
ALTER SYSTEM SWITCH all LOGFILE;
Posteriormente ejecutaremos el siguiente comando que nos debe devolver como resultado: YES
SELECT supplemental_log_data_min, force_logging FROM v$database;
2.- Ejecución de los siguientes comandos a través de sqlplus “/as sysdba”:
CREATE TABLESPACE ogg_data LOGGING DATAFILE ‘/u01/oradata/DB11G/ogg_data.dbf’ SIZE 1024M AUTOEXTEND ON NEXT 10M MAXSIZE UNLIMITED;
--Creacion del usuario de Golden gate
CREATE USER ogg_user IDENTIFIED BY oracle
DEFAULT TABLESPACE ogg_data
TEMPORARY TABLESPACE temp;
--Asignacion de privilegios para la cuenta antiormente creada
GRANT CONNECT, RESOURCE TO ogg_user;
GRANT SELECT ANY DICTIONARY, SELECT ANY TABLE TO ogg_user;
GRANT CREATE TABLE TO ogg_user;
GRANT FLASHBACK ANY TABLE TO ogg_user;
GRANT EXECUTE ON dbms_flashback TO ogg_user;
GRANT EXECUTE ON utl_file TO ogg_user;
GRANT CREATE ANY TABLE TO ogg_user;
GRANT INSERT ANY TABLE TO ogg_user;
GRANT UPDATE ANY TABLE TO ogg_user;
GRANT DELETE ANY TABLE TO ogg_user;
GRANT DROP ANY TABLE TO ogg_user;
GRANT ALTER ANY TABLE TO ogg_user;
GRANT ALTER SYSTEM TO ogg_user;
GRANT LOCK ANY TABLE TO ogg_user;
GRANT SELECT ANY TRANSACTION to ogg_user;
ALTER USER ogg_user QUOTA UNLIMITED ON ogg_data;
--Finalmente ejecutar lo siguiente:
exec DBMS_GOLDENGATE_AUTH.GRANT_ADMIN_PRIVILEGE('ogg_user');
ALTER SYSTEM SET enable_goldengate_replication=TRUE scope=both sid='*';
3.- Posteriormente ejecutaremos los siguientes comandos también vía sqlplus “/as sysdba” con la salvedad que nos debemos encontrar en el directorio dónde se instaló goldengate, pues allí se encuentran los scripts.
cd /gg/
sqlplus “/as sysdba” @marker_setup.sql
sqlplus “/as sysdba” @ddl_setup.sql
sqlplus “/as sysdba” @role_setup.sql
--Este commando es para asignar un rol fundamental
--al usuario de golden gate que no puede faltar
GRANT GGS_GGSUSER_ROLE TO ogg_user;
sqlplus “/as sysdba” @ddl_enable.sql
sqlplus “/as sysdba” @ddl_pin ogg_user
4.- Posteriormente continuaremos con la creación de directorios usando la herramienta /gg/ggsci
[oracle@source gg]$ ggsci
Oracle GoldenGate Command Interpreter for Oracle
Version 11.2.1.0.1 OGGCORE_11.2.1.0.1_PLATFORMS_120423.0230_FBO
Linux, x64, 64bit (optimized), Oracle 10g on Apr 23 2012 07:30:46
Copyright (C) 1995, 2012, Oracle and/or its affiliates. All rights reserved.
GGSCI (source) 1> create subdirs
Creating subdirectories under current directory /gg
Parameter files /gg/dirprm: already exists
Report files /gg/dirrpt: created
Checkpoint files /gg/dirchk: created
Process status files /gg/dirpcs: created
SQL script files /gg/dirsql: created
Database definitions files /gg/dirdef: created
Extract data files /gg/dirdat: created
Temporary files /gg/dirtmp: created
Stdout files /gg/dirout: created
GGSCI (source) 2>
5.- Editaremos el parámetro GLOBAL usando la herramienta ggsci, para agregar entre otras cosas el owner de base de datos para el esquema GOLDEN GATE:
edit params ./GLOBAL
--Agregar lo siguiente al archivo de parámetro GLOBAL y guardarlo
GGSCHEMA ogg_user
6.- Editaremos los parámetro del manager usando la herramienta ggsci:
edit param mgr
--Añadiremos lo siguiente
PORT 7809
dynamicportlist 7900-7950
lagreportminutes 5
laginfominutes 1
lagcriticalminutes 2
purgeoldextracts ./dirdat/*, minkeepdays 2, usecheckpoints
AUTORESTART EXTRACT *, RETRIES 10, WAITMINUTES 5, RESETMINUTES 60
AUTORESTART REPLICAT *, RETRIES 10, WAITMINUTES 5, RESETMINUTES 60
7.- Levantaremos el manager y visualizaremos su estado con los comandos:
start mgr
info all
GGSCI (source) 5> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
Instalación y configuración de Apache Kafka
Sobre el ambiente Virtual llamado TARGET (o el destino como se le conocerá también) procederemos a instalar kafka usando el usuario root de la máquina target (es sólo para efectos de este laboratorio lo más normal es que Kafka se ejecuté con otro usurario distinto del Sistema Operativo). Los pasos son los siguientes (Se deben realizar sobre el servidor de destino NO en el origen):
1.- Revisamos que tengamos java instalado (versión 1.8)
[root@target ~]# java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-b12)
OpenJDK 64-Bit Server VM (build 25.131-b12, mixed mode)
[root@target ~]#
2.- Descomprimimos el software de Kafka anteriormente descargado (Desde el link mencionado al comienzo):
3.- Dejaremos la descarga en la siguiente ruta:
/root/kafka_2.11-2.0.1
4.- Subida de Zookeeper Server:
[root@target bin]# cd /root/kafka_2.11-2.0.1/bin
[root@target bin]# nohup sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties &
[root@target bin]#
4.- Ahora configuraremos kafka server. En el archivo ../config/server.properties (tomar en cuenta que que los caracteres “..” hacen referencia a la carpeta de instalación /root/kafka_2.11-2.0.1/ ) habilitaremos la siguiente linea:
listeners=PLAINTEXT://localhost:9092
5.- Subida del servicio Kafka
[root@target bin]# cd /root/kafka_2.11-2.0.1/bin
[root@target bin]# nohup sh kafka-server-start.sh ../config/server.properties &
6.- Creamos un tópico de ejemplo en Kafka:
[root@target bin]# cd /root/kafka_2.11-2.0.1/bin
[root@target bin]# sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic empleado
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
Created topic "empleado".
(para visualizer el tópico creado usamos el commando:
sh kafka-topics.sh --list --zookeeper localhost:2181
)
7.- Finalmente daremos acceso chmod 755 a la carpeta /root (tener en cuenta que esto es solo un laboratorio). Nunca se debe realizar una operación de este tipo en producción, ni menos instalar el servicio de kafka en un directorio de /root. Todo esto es sólo con efectos de laboratorio y pruebas. Estos permisos son para que el replicador de golden gate posteriormente pueda usar las librerias de Kafka:
chmod 755 /root
Como tips adicional los comandos para levantar todos los servicios de Kafka anteriores es el siguiente a modo de resumen:
cd /root/kafka_2.11-2.0.1/bin
nohup sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties &
nohup sh kafka-server-start.sh ../config/server.properties &
Creación de extractores en ambiente golden gate de origen
A través de la herramienta de ggsci sobre el servior de origen crearemos los siguientes extractores (EXT y DMP). No olviden levantar un listener y crear un alias de tnsnames que apunte a la base de datos DB11G (con el mismo nombre DB11G). Los comandos de acontinuación deben ser ejecutados con el usuario oracle;
1.- Crear el siguiente directorio: /gg/dirdat/E_01
2.- A nivel de SQLPLUS activaremos el supplemental log de la tabla scott.emp que es la que usaremos como ejemplo:
SQL> alter table scott.EMP ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
Table altered.
3.- Procedemos a crear los extractores en la herramienta ggsci sobre el servidor source:
edit param e_01
EXTRACT E_01
SETENV(ORACLE_HOME = "/u01/app/oracle/product/11.2.0/db_1")
USERID ogg_user@DB11G, PASSWORD oracle
EXTTRAIL ./dirdat/E_01/et
DISCARDFILE ./dirrpt/E_01.dsc, MEGABYTES 2048, APPEND
TABLE SCOTT.EMP, KEYCOLS(EMPNO);
edit param p_01
EXTRACT P_01
passthru
RMTHOST 10.10.10.102, MGRPORT 7809
RMTTRAIL /gg/dirdat/P_01/rt
DISCARDFILE ./dirrpt/P_01.dsc, MEGABYTES 2048, APPEND
TABLE SCOTT.EMP;
--, KEYCOLS(EMPNO) ESTO FUE COMENTADO
-- YA QUE KEYCOLS NO PUEDE SER USADO CUANDO SE OCUPA passthru
dblogin USERID ogg_user@DB11G, PASSWORD oracle
ADD EXTRACT E_01, TRANLOG, BEGIN NOW
ADD EXTTRAIL ./dirdat/E_01/et, EXTRACT E_01 , MEGABYTES 500
ADD EXTRACT P_01, EXTTRAILSOURCE ./dirdat/E_01/et
ADD RMTTRAIL /gg/dirdat/P_01/rt EXTRACT P_01 MEGABYTES 500
register extract E_01 logretention
start E_01
Por Ahora solo dejar arriba levantado el E_01 ya que el PUMP si se es levantado dará error pues aún no se encuentra el golden gate de destino levantado.
4.- Revisamos el status del extractor E_01
GGSCI (source) 13> info E_01
EXTRACT E_01 Last Started 2018-11-11 17:39 Status RUNNING
Checkpoint Lag 00:00:00 (updated 00:00:01 ago)
Log Read Checkpoint Oracle Redo Logs
2018-11-11 17:40:37 Seqno 4, RBA 6032896
SCN 0.995639 (995639)
GGSCI (source) 14>
5.- Creación del archivo de definición dentro de la carpeta /gg. Crearemos el siguiente archivo ./dirprm/scott.prm con el contenido siguiente:
DEFSFILE ./dirdef/scott.def
userid ogg_user@DB11G password oracle
TABLE SCOTT.*;
Posteriormente generamos el archivo de definición de esta manera desde el directorio de /gg:
./defgen paramfile ./dirprm/scott.prm
Instalación de Golden Gate for Big Data en Target (Destino)
Ahora se procede a copiar el archivo zip OGG_BigData_Linux_x64_12.3.2.1.1.zip (El cual está mencionado al comienzo como uno de los archivos a descargar) y lo descomprimimos en la carpeta /gg del server destino (en el interior del archivo ZIP se encuentra el archivo tar final.)
1.- Un dato importante es que al archivo bash_profile del usuario oracle añadamos lo siguiente:
LD_LIBRARY_PATH=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-11.b12.el7.x86_64/jre/lib/amd64/server:/lib:/usr/lib; export LD_LIBRARY_PATH
Con respect a lo anterior variará el nombre de la carpeta puesto que depende de la versión de java instalado. Esa carpeta será necesaria puesto que allí se encuentra un archivo libjvm.so que será requerido por la herramienta de ggsci de golden gate for big data. Si esto no se realiza el ejecutable ggsci dará el siguiente error:
[oracle@target gg]$ ./ggsci
./ggsci: error while loading shared libraries: libjvm.so: cannot open shared object file: No such file or directory
2.- Procedemos con las configuraciones adicionales:
[oracle@target gg]$ ./ggsci
Oracle GoldenGate for Big Data
Version 12.3.2.1.1 (Build 005)
Oracle GoldenGate Command Interpreter
Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09
Operating system character set identified as UTF-8.
Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
GGSCI (target) 1> create subdirs
Creating subdirectories under current directory /gg
Parameter file /gg/dirprm: created.
Report file /gg/dirrpt: created.
Checkpoint file /gg/dirchk: created.
Process status files /gg/dirpcs: created.
SQL script files /gg/dirsql: created.
Database definitions files /gg/dirdef: created.
Extract data files /gg/dirdat: created.
Temporary files /gg/dirtmp: created.
Credential store files /gg/dircrd: created.
Masterkey wallet files /gg/dirwlt: created.
Dump files /gg/dirdmp: created.
GGSCI (target) 2>
3.- Configuración del componente Manager de Golden Gate for Big Data (usando la herramienta ggsci)
edit param mgr
Port 7809
Start mgr
GGSCI (target) 6> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
4.- En este mismo servidor procedemos a crear la carpeta que contendrás los trail que el proceso PUMP desde el servidor ORIGEN o Source copiará sobre este servidor (con la cuenta oracle).
mkdir /gg/dirdat/P_01
5.- Desde el servidor de ORIGEN procedemos a levanter el pump para que el proceso respectivo comience a copiar los trail hacia este servidor:
Start P_01
6.- Copiaremos el archivo de definición desde el Servidor de ORIGEN hacia este servidor (target) manteniendo la misma ruta:
scp ./dirdef/scott.def 10.10.10.102:/gg/dirdef/scott.def
Configuración del proceso de replicación en Golden Gate BigData
Procedemos a configurar (en el mismo ambiente target o destino) el replicador para esto seguimos los siguientes pasos:
1.- Copiar archivos de Adapter de ejemplo de BigData hacia la carpeta de parámetros de GoldenGate:
[oracle@target gg]$ cp /gg/AdapterExamples/big-data/kafka/* /gg/dirprm/
[oracle@target gg]$
2.- En el archivo /gg/dirprm/custom_kafka_producer.properties debemos declarar el nodo de Kafka por lo que deberia quedar de esta manera:
[oracle@target dirprm]$ cat custom_kafka_producer.properties
bootstrap.servers=localhost:9092
acks=1
reconnect.backoff.ms=1000
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 100KB per partition
batch.size=16384
linger.ms=0
[oracle@target dirprm]$
3.- En el archive /gg/dirprm/kafka.props debemos tener configuradas las siguientes variables:
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
#The following resolves the topic name using the short table name
gg.handler.kafkahandler.topicMappingTemplate=${tableName}
#The following selects the message key using the concatenated primary keys
gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
gg.handler.kafkahandler.format=json
gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
gg.handler.kafkahandler.BlockingSend =false
gg.handler.kafkahandler.includeTokens=false
gg.handler.kafkahandler.mode=op
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
#Sample gg.classpath for Apache Kafka
gg.classpath=dirprm/:/root/kafka_2.11-2.0.1/libs/*
#Sample gg.classpath for HDP
#gg.classpath=/etc/kafka/conf:/usr/hdp/current/kafka-broker/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
4.- Posteriormente con la herramienta ggsci crearemos el replicador
Edit param R_01
Replicat R_01
targetdb libfile libggjava.so SET property=dirprm/kafka.props
SOURCEDEFS ./dirdef/scott.def
DISCARDFILE ./dirrpt/R_01.dsc, MEGABYTES 2048, APPEND
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 1000
MAP SCOTT.EMP, TARGET mySchemaTopic.empleado;
add replicat R_01 exttrail ./dirdat/P_01/rt begin now
start replicat R_01
GGSCI (target) 4> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING R_01 00:00:00 00:00:09
Revisando que la repliación desde Origen hacia el Destino BigData funcione
Una vez terminado los pasos en la replicación debemos verificar que los cambios generados en el origen estén siendo replicados perfectamente al ambiente de destino. Sigamos los siguientes pasos para tal objetivo:
1.- Desde el servidor de ORIGEN insertamos una o algunas filas a la tabla scott.emp:
insert into scott.emp values (157,'test','DBA',4,null,400,null,null);
commit;
(Importante: NO olvidar el commit ☺ )
2.- Verificamos que el extractor y el Pump hayan capturado los cambios con los siguientes comandos (usando la herramienta ggsci en el server de Origen):
stats E_01
stats P_01
Ejemplo
GGSCI (source) 1> stats E_01
Sending STATS request to EXTRACT E_01 ...
Start of Statistics at 2018-11-11 20:37:52.
Output to ./dirdat/E_01/et:
Extracting from SCOTT.EMP to SCOTT.EMP:
*** Total statistics since 2018-11-11 18:57:23 ***
Total inserts 12.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 12.00
3.- Ahora desde el server de DESTINO a nivel de ggsci verificamos que el replicador haya aplicado el cambio (Pueden verificar que hay un nuevo archivo llamado ./dirrpt/R_01.log que contiene más detalle acerca de lo que está haciendo el adapter para kafka):
GGSCI (target) 2> stats R_01
Sending STATS request to REPLICAT R_01 ...
Start of Statistics at 2018-11-11 20:39:03.
Replicating from SCOTT.EMP to mySchemaTopic.empleado:
*** Total statistics since 2018-11-11 20:20:00 ***
Total inserts 12.00
Total updates 0.00
Total deletes 0.00
Total discards 0.00
Total operations 12.00
4.- Posteriormente si lo deseamos podemos hacer uso de la consola de kafka para revisar los cambios que se han implementado usando el comando de la carpeta bin de kafka kafka-console-consumer.sh (También desde el server de destino usando la cuenta root)
[root@target bin]# sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic EMPLEADO --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
{"table":"MYSCHEMATOPIC.EMPLEADO","op_type":"I","op_ts":"2018-11-11 19:52:40.336228","current_ts":"2018-11-11T19:52:47.878000","pos":"000000000000000000-2","after":{"EMPNO":150,"ENAME":"test","JOB":"DBA","MGR":4,"HIREDATE":null,"SAL":400.00,"COMM":null,"DEPTNO":null}}
{"table":"MYSCHEMATOPIC.EMPLEADO","op_type":"I","op_ts":"2018-11-11 20:04:15.720504","current_ts":"2018-11-11T20:04:21.384000","pos":"00000000000000002405","after":{"EMPNO":151,"ENAME":"test","JOB":"DBA","MGR":4,"HIREDATE":null,"SAL":400.00,"COMM":null,"DEPTNO":null}}
{"table":"MYSCHEMATOPIC.EMPLEADO","op_type":"I","op_ts":"2018-11-11 20:05:07.713534","current_ts":"2018-11-11T20:05:11.701000","pos":"00000000000000002633","after":{"EMPNO":152,"ENAME":"test","JOB":"DBA","MGR":4,"HIREDATE":null,"SAL":400.00,"COMM":null,"DEPTNO":null}}
{"table":"MYSCHEMATOPIC.EMPLEADO","op_type":"I","op_ts":"2018-11-11 20:06:00.752509","current_ts":"2018-11-11T20:06:06.798000","pos":"00000000000000002860","after":{"EMPNO":153,"ENAME":"test","JOB":"DBA","MGR":4,"HIREDATE":null,"SAL":400.00,"COMM":null,"DEPTNO":null}}
{"table":"MYSCHEMATOPIC.EMPLEADO","op_type":"I","op_ts":"2018-11-11 20:19:53.414611","current_ts":"2018-11-11T20:20:00.759000","pos":"00000000010000001110","after":{"EMPNO":155,"ENAME":"test","JOB":"DBA","MGR":4,"HIREDATE":null,"SAL":400.00,"COMM":null,"DEPTNO":null}}
{"table":"MYSCHEMATOPIC.EMPLEADO","op_type":"I","op_ts":"2018-11-11 20:22:25.470913","current_ts":"2018-11-11T20:22:31.290000","pos":"00000000010000001340","after":{"EMPNO":156,"ENAME":"test","JOB":"DBA","MGR":4,"HIREDATE":null,"SAL":400.00,"COMM":null,"DEPTNO":null}}
{"table":"MYSCHEMATOPIC.EMPLEADO","op_type":"I","op_ts":"2018-11-11 20:24:07.529164","current_ts":"2018-11-11T20:24:13.480000","pos":"00000000010000001569","after":{"EMPNO":157,"ENAME":"test","JOB":"DBA","MGR":4,"HIREDATE":null,"SAL":400.00,"COMM":null,"DEPTNO":null}}
Espero que les haya sido de utilidad este pequeño y humilde tutorial.
Saludos. Felipe ☺ .