Bookmark and Share

Sunday, October 4, 2009

XStream Outbound... A sample Java Program

Like with XStream InBound in my previous post, you'll find below a simple Java for XStream Outbound. This new program subscribes to Streams captured changes. Does it sound easy ? It is, like you'll figure out below.

Step 1: Create a Sample Schema

For this sample program, create a schema and a table :
connect / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t7(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

insert into t7(id, text1, text2)
values (1,'Text 1','Text 1');

insert into t7(id, text1, text2)
values (2,'Text 2','Text 2');

commit;

Step 2: Create a Streams Administrator

To subscribe to Streams changes, you must create a Streams administrator:
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

Step 3: Create a Simple XStream Out Configuration

For our Java program to dequeue messages, you must create a XStream Outbound server:
connect strmadmin/strmadmin

begin
dbms_xstream_adm.create_outbound(
server_name => 'DEMO_SERVER',
source_database => '&&dbname',
table_names => 'source.t7',
schema_names => null,
capture_user => null,
connect_user => null,
comment => 'XStream OutBound Server Demonstration');
end;
/

SELECT *
FROM DBA_XSTREAM_OUTBOUND;

select *
from dba_apply
where purpose='XSTREAM OUT';

Step 4: Create a JAVA XStream Outbound Client

Create a XStreamOutDemo.java file, like below. Change the strings in red to meet your own configuration:
import java.sql.Connection;
import java.sql.DriverManager;

import oracle.jdbc.internal.OracleConnection;

import oracle.streams.XStreamOut;
import oracle.streams.ChunkColumnValue;
import oracle.streams.ColumnValue;
import oracle.streams.DefaultRowLCR;
import oracle.streams.LCR;
import oracle.streams.RowLCR;

public class XStreamOutDemo {

public static void main(String args[])
{
String out_url = "jdbc:oracle:oci:@arkzoyd-easyteam:1521:BLACK";

/*
* Connect to the Database
*/
Connection out_conn = null;
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
out_conn=DriverManager.getConnection(out_url, "strmadmin", "strmadmin");
}
catch(Exception e)
{
System.out.println("DB Connection Failed: " + out_url);
e.printStackTrace();
}

/*
* Get a XStream Out Handler
*/
XStreamOut xsOut=null;
byte[] lastPosition = null;

try
{
// when attach to an outbound server, client needs to tell outbound
// server the last position.
xsOut = XStreamOut.attach((OracleConnection) out_conn, "DEMO_SERVER",
lastPosition, XStreamOut.DEFAULT_MODE);
System.out.println("Attached to outbound server: DEMO_SERVER");
System.out.print("Last Position is: ");
if (lastPosition != null) { printHex(lastPosition); }
else { System.out.println("NULL");}
}
catch(Exception e)
{
System.out.println("cannot attach to outbound server: DEMO_SERVER");
System.out.println(e.getMessage());
e.printStackTrace();
}

byte[] processedLowPosition = null;
try
{
while(true)
{
// receive an LCR from outbound server
LCR alcr = xsOut.receiveLCR(XStreamOut.DEFAULT_MODE);

if (xsOut.getBatchStatus() == XStreamOut.EXECUTING) // batch is active
{
assert alcr != null;

// also get chunk data for this LCR if any
if (alcr instanceof RowLCR)
{
// receive chunk from outbound then send to inbound
if (((RowLCR)alcr).hasChunkData())
{
ChunkColumnValue chunk = null;
do
{
chunk = xsOut.receiveChunk(XStreamOut.DEFAULT_MODE);
} while (!chunk.isEndOfRow());
}
}

String command=alcr.getCommandType();
if (!command.equals("COMMIT"))
System.out.print(command+" on "+ alcr.getObjectOwner()+"."+
alcr.getObjectName()+"\n");
else System.out.print(command+"\n");
if (command.equals("INSERT") || command.equals("UPDATE")) {
System.out.print(" -- NEW VALUES ----------\n");
for (int i=0;i<((RowLCR) alcr).getNewValues().length;i++) {
System.out.print(" Column:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getNewValues()[i]).getColumnName(),10)+
"Value:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getNewValues()[i]).getColumnData().
stringValue(),30)+"\n");
}
}
if (command.equals("UPDATE") || command.equals("DELETE")) {
System.out.print(" -- OLD VALUES ----------\n");
for (int i=0;i<((RowLCR) alcr).getOldValues().length;i++) {
System.out.print(" Column:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getOldValues()[i]).getColumnName(),10)+
"Value:"+
fixsize(((ColumnValue) ((RowLCR) alcr).
getOldValues()[i]).getColumnData().
stringValue(),30)+"\n");
}
}
System.out.print(" -- DML ----------\n");
System.out.println(" "+((DefaultRowLCR) alcr).getStatement(false));
processedLowPosition = alcr.getPosition();
if (null != processedLowPosition)
xsOut.setProcessedLowWatermark(processedLowPosition,
XStreamOut.DEFAULT_MODE);
System.out.print("Last Position is: ");
printHex(processedLowPosition);
} else // batch is end
{ assert alcr == null; }
}
} catch(Exception e) {
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public static void printHex(byte[] b) { for (int i = 0; i <>length) {
output=text.substring(0,text.length());
} else {
output=text;
for (int i=0; i<(length-text.length()); i++) {
output=output+" ";
}
}
return output;
}
}
To compile and execute the program, run the script below:
export CLASSPATH=.:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH
export JAVA_HOME=/opt/jdk1.6.0_13
export PATH=$JAVA_HOME/bin:$PATH

javac XStreamOutDemo.java
java XStreamOutDemo
To test the program, open another session to the database and enqueue messages int the table like below:
insert into source.t7 values (3,'X','X');
commit;
update source.t7 set text2='Y' where id=3;
commit;
delete from source.t7 where id=3;
commit;
The output looks like below:
INSERT on SOURCE.T7
-- NEW VALUES ----------
Column:ID Value:3
Column:TEXT1 Value:X
Column:TEXT2 Value:X
Last Position is: 0000001d856d00000001000000010000001d856c000000010000000101
COMMIT
Last Position is: 0000001d856d00000001000000010000001d856d000000010000000101
UPDATE on SOURCE.T7
-- NEW VALUES ----------
Column:TEXT2 Value:Y
-- OLD VALUES ----------
Column:ID Value:3
Column:TEXT2 Value:X
Last Position is: 0000001d856f00000001000000010000001d856e000000010000000101
COMMIT
Last Position is: 0000001d856f00000001000000010000001d856f000000010000000101
DELETE on SOURCE.T7
-- OLD VALUES ----------
Column:ID Value:3
Column:TEXT1 Value:X
Column:TEXT2 Value:Y
Last Position is: 0000001d857200000001000000010000001d8570000000010000000101
COMMIT
Last Position is: 0000001d857200000001000000010000001d8572000000010000000101
To exit the program, type:
CTRL+C

Step 5: Drop the Configuration

I hope you've found this example useful. To drop the configuration, run the script below:
connect strmadmin/strmadmin

begin
dbms_xstream_adm.drop_outbound(
server_name=>'DEMO_SERVER');
end;
/

SELECT *
FROM DBA_XSTREAM_OUTBOUND;

select * from dba_apply;
select * from dba_capture;
select * from dba_streams_table_rules;

connect / as sysdba

drop user source cascade;
drop user strmadmin cascade;
You're done! It's nice to enter the new Oracle Data Integration generation with GoldenGate, don't you think?

1 comment: