, you'll find below a simple Java for XStream Outbound. This new program subscribes to Streams captured changes. Does it sound easy ? It is, like you'll figure out below.
Step 1: Create a Sample Schema
For this sample program, create a schema and a table :
connect / as sysdba
create user source
identified by source
default tablespace users
temporary tablespace temp;
grant connect,resource to source;
col dbname new_value dbname
select value dbname
from v$parameter
where name='db_unique_name';
prompt &&dbname
connect source/source
create table t7(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));
insert into t7(id, text1, text2)
values (1,'Text 1','Text 1');
insert into t7(id, text1, text2)
values (2,'Text 2','Text 2');
commit;
Step 2: Create a Streams Administrator
To subscribe to Streams changes, you must create a Streams administrator:
connect / as sysdba
create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;
CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;
grant dba to strmadmin;
begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/
exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);
select *
from dba_streams_administrator;
Step 3: Create a Simple XStream Out Configuration
For our Java program to dequeue messages, you must create a XStream Outbound server:
connect strmadmin/strmadmin
begin
dbms_xstream_adm.create_outbound(
server_name => 'DEMO_SERVER',
source_database => '&&dbname',
table_names => 'source.t7',
schema_names => null,
capture_user => null,
connect_user => null,
comment => 'XStream OutBound Server Demonstration');
end;
/
SELECT *
FROM DBA_XSTREAM_OUTBOUND;
select *
from dba_apply
where purpose='XSTREAM OUT';
Step 4: Create a JAVA XStream Outbound Client
Create a XStreamOutDemo.java
file, like below. Change the strings in red to meet your own configuration:
import java.sql.Connection;
import java.sql.DriverManager;
import oracle.jdbc.internal.OracleConnection;
import oracle.streams.XStreamOut;
import oracle.streams.ChunkColumnValue;
import oracle.streams.ColumnValue;
import oracle.streams.DefaultRowLCR;
import oracle.streams.LCR;
import oracle.streams.RowLCR;
public class XStreamOutDemo {
public static void main(String args[])
{
String out_url = "jdbc:oracle:oci:@arkzoyd-easyteam:1521:BLACK";
/*
* Connect to the Database
*/
Connection out_conn = null;
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
out_conn=DriverManager.getConnection(out_url, "strmadmin", "strmadmin");
}
catch(Exception e)
{
System.out.println("DB Connection Failed: " + out_url);
e.printStackTrace();
}
/*
* Get a XStream Out Handler
*/
XStreamOut xsOut=null;
byte[] lastPosition = null;
try
{
// when attach to an outbound server, client needs to tell outbound
// server the last position.
xsOut = XStreamOut.attach((OracleConnection) out_conn, "DEMO_SERVER",
lastPosition, XStreamOut.DEFAULT_MODE);
System.out.println("Attached to outbound server: DEMO_SERVER");
System.out.print("Last Position is: ");
if (lastPosition != null) { printHex(lastPosition); }
else { System.out.println("NULL");}
}
catch(Exception e)
{
System.out.println("cannot attach to outbound server: DEMO_SERVER");
System.out.println(e.getMessage());
e.printStackTrace();
}
byte[] processedLowPosition = null;
try
{
while(true)
{
// receive an LCR from outbound server
LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);
if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active
{
assert alcr != null;
// also get chunk data for this LCR if any
if (alcr instanceof RowLCR)
{
// receive chunk from outbound then send to inbound
if (((RowLCR)alcr).hasChunkData())
{
ChunkColumnValue chunk = null;
do
{
chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
} while (!chunk.isEndOfRow());
}
}
String command=alcr.getCommandType();
if (!command.equals("COMMIT"))
System.out.print(command+" on "+ alcr.getObjectOwner()+"."+
alcr.getObjectName()+"\n");
else System.out.print(command+"\n");
if (command.equals("INSERT") || command.equals("UPDATE")) {
System.out.print(" -- NEW VALUES ----------\n");
for (int i=0;i<((RowLCR) alcr).getNewValues().length;i++) {
System.out.print(" Column:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getNewValues()[i]).getColumnName(),10)+
"Value:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getNewValues()[i]).getColumnData().
stringValue(),30)+"\n");
}
}
if (command.equals("UPDATE") || command.equals("DELETE")) {
System.out.print(" -- OLD VALUES ----------\n");
for (int i=0;i<((RowLCR) alcr).getOldValues().length;i++) {
System.out.print(" Column:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getOldValues()[i]).getColumnName(),10)+
"Value:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getOldValues()[i]).getColumnData().
stringValue(),30)+"\n");
}
}
System.out.print(" -- DML ----------\n");
System.out.println(" "+((DefaultRowLCR) alcr).getStatement(false));
processedLowPosition = alcr.getPosition();
if (null != processedLowPosition)
xsOut.setProcessedLowWatermark(processedLowPosition,
XStreamOut.DEFAULT_MODE);
System.out.print("Last Position is: ");
printHex(processedLowPosition);
} else // batch is end
{ assert alcr == null; }
}
} catch(Exception e) {
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}
public static void printHex(byte[] b) { for (int i = 0; i <>length) {
output=text.substring(0,text.length());
} else {
output=text;
for (int i=0; i<(length-text.length()); i++) {
output=output+" ";
}
}
return output;
}
}
To compile and execute the program, run the script below:
export CLASSPATH=.:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH
export JAVA_HOME=/opt/jdk1.6.0_13
export PATH=$JAVA_HOME/bin:$PATH
javac XStreamOutDemo.java
java XStreamOutDemo
To test the program, open another session to the database and enqueue messages int the table like below:
insert into source.t7 values (3,'X','X');
commit;
update source.t7 set text2='Y' where id=3;
commit;
delete from source.t7 where id=3;
commit;
The output looks like below:
INSERT on SOURCE.T7
-- NEW VALUES ----------
Column:ID Value:3
Column:TEXT1 Value:X
Column:TEXT2 Value:X
Last Position is: 0000001d856d00000001000000010000001d856c000000010000000101
COMMIT
Last Position is: 0000001d856d00000001000000010000001d856d000000010000000101
UPDATE on SOURCE.T7
-- NEW VALUES ----------
Column:TEXT2 Value:Y
-- OLD VALUES ----------
Column:ID Value:3
Column:TEXT2 Value:X
Last Position is: 0000001d856f00000001000000010000001d856e000000010000000101
COMMIT
Last Position is: 0000001d856f00000001000000010000001d856f000000010000000101
DELETE on SOURCE.T7
-- OLD VALUES ----------
Column:ID Value:3
Column:TEXT1 Value:X
Column:TEXT2 Value:Y
Last Position is: 0000001d857200000001000000010000001d8570000000010000000101
COMMIT
Last Position is: 0000001d857200000001000000010000001d8572000000010000000101
To exit the program, type:
CTRL+C
Step 5: Drop the Configuration
I hope you've found this example useful. To drop the configuration, run the script below:
connect strmadmin/strmadmin
begin
dbms_xstream_adm.drop_outbound(
server_name=>'DEMO_SERVER');
end;
/
SELECT *
FROM DBA_XSTREAM_OUTBOUND;
select * from dba_apply;
select * from dba_capture;
select * from dba_streams_table_rules;
connect / as sysdba
drop user source cascade;
drop user strmadmin cascade;
You're done! It's nice to enter the new Oracle Data Integration generation with GoldenGate, don't you think?
Thanks for sharing your knowledge!
ReplyDelete