Bookmark and Share

Sunday, October 4, 2009

XStream Inbound... A Sample Java Program

Oracle XStream is a new feature of Oracle Database 11g Release 2. Though it has been developed by the Streams team and is part of the database, XStream is part of GoldenGate! And now that the GoldenGate deal has been closed, 7 days before Oracle Openworld, it's easy to guess why XStream went out a few hours ago.

This post is about XStream; it provides a Java program that uses "XStream In" to enqueue messages to a buffered queue. A Streams apply process applies them to a table named SOURCE.T8.

To know everything about XStream, read the following documentation:

Step 1: Create a Sample Schema

For this example, you'll need a schema and a table:
sqlplus / as sysdba

create user source
identified by source
default tablespace users
temporary tablespace temp;

grant connect,resource to source;

col dbname new_value dbname

select value dbname
from v$parameter
where name='db_unique_name';

prompt &&dbname

connect source/source

create table t8(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));

commit;

Step 2: Create a Streams Administrator

To use XStream, you'll also need a Streams administrator and a queue to stage buffered messages :
connect / as sysdba

create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;

CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;

grant dba to strmadmin;

begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/

exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);

select *
from dba_streams_administrator;

Step 3: Create a Simple XStream In Configuration

You must create and start a XStream In configuration before you use a Java or OCI client to enqueue messages:
connect strmadmin/strmadmin

BEGIN
DBMS_XSTREAM_ADM.CREATE_INBOUND(
server_name => 'xin',
queue_name => 'xin_queue');
END;
/

SELECT *
FROM DBA_XSTREAM_INBOUND;

set pages 1000
select *
from dba_apply
where purpose='XSTREAM IN';

exec DBMS_APPLY_ADM.START_APPLY('xin');

Step 4: Create a JAVA XStream Inbound Client

Create a XStreamInDemo.java file that enqueue messages to the buffered queue. Change the part in red to match your environment:
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;

import java.util.Date;

import oracle.jdbc.internal.OracleConnection;

import oracle.sql.CHAR;
import oracle.sql.DATE;
import oracle.streams.ColumnValue;
import oracle.streams.DefaultColumnValue;
import oracle.streams.DefaultRowLCR;
import oracle.streams.RowLCR;
import oracle.streams.XStreamIn;

public class XStreamInDemo {

public static void main(String args[])
{
String in_url = "jdbc:oracle:oci:@arkzoyd-easyteam:1521:BLACK";

/*
* Connect to the Database
*/
Connection in_conn = null;
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
in_conn=DriverManager.getConnection(in_url, "strmadmin", "strmadmin");
}
catch(Exception e)
{
System.out.println("DB Connection Failed: " + in_url);
e.printStackTrace();
}

/*
* Get a XStream In Handler
*/
XStreamIn xsIn=null;
String xsinName="XIN";
byte[] lastPosition = null;
int transaction=0;
int rank=0;
try
{
xsIn = XStreamIn.attach ((OracleConnection)in_conn, xsinName,
"HI2" , XStreamIn.DEFAULT_MODE);

// use last position to decide where should we start sending LCRs
System.out.println("Attached to inbound server:"+xsinName);
System.out.print("Inbound Server Last Position is: ");
lastPosition = xsIn.getLastPosition();
if (null == lastPosition)
{
System.out.println("null");
transaction=1;
rank=1;
}
else {
printHex(lastPosition); System.out.println("");
transaction = getTransaction(lastPosition);
rank = getRank(lastPosition);
if (rank==1) rank=2; else { rank=1; transaction++; }
}
}
catch(Exception e)
{
System.out.println("cannot attach to inbound server: "+xsinName);
System.out.println(e.getMessage());
e.printStackTrace();
}

/*
* Create a
*/

try {
DATE mydate;
DefaultRowLCR alcr;
byte[] processedLowPosition;

while(true) {
if (rank==1) {
mydate = new DATE();
System.out.println("-- " +
Integer.toString(transaction) +
" -------------------------");
alcr=new DefaultRowLCR(
"BLACK", RowLCR.INSERT, "SOURCE", "T8",
"X."+Integer.toString(transaction), null,
encode2bytes(transaction, rank), mydate);
ColumnValue[] newcolumn= new ColumnValue[3];
newcolumn[0]= new DefaultColumnValue("ID",
new oracle.sql.NUMBER(transaction));
newcolumn[1]= new DefaultColumnValue("TEXT1",
new CHAR("Hello2", CHAR.DEFAULT_CHARSET));
newcolumn[2]= new DefaultColumnValue("TEXT2",
new CHAR("Hello2", CHAR.DEFAULT_CHARSET));
alcr.setNewValues(newcolumn);
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
System.out.println(" " +alcr.getStatement(false));
xsIn.flush(XStreamIn.DEFAULT_MODE);
rank++;
Thread.sleep(500);
} else {
mydate = new DATE();
alcr=new DefaultRowLCR(
"BLACK", RowLCR.COMMIT, null, null,
"X."+Integer.toString(transaction), null,
encode2bytes(transaction, rank), mydate);
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
xsIn.flush(XStreamIn.DEFAULT_MODE);
System.out.println(" " +alcr.getStatement(false));
rank=1;
transaction++;
Thread.sleep(500);
}
System.out.print(" ");
processedLowPosition =
xsIn.getProcessedLowWatermark();
if (processedLowPosition != null) {
System.out.print("processedLowPosition: ");
printHex(processedLowPosition);
System.out.print(" (" +
Integer.toString(getTransaction(processedLowPosition))+
", " +
Integer.toString(getRank(processedLowPosition)) +
")");
}
else {
System.out.print("processedLowPosition: null");
}
lastPosition =
xsIn.getLastPosition();
System.out.println("");
}
} catch (Exception e) {
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}

public static void printHex(byte[] b)
{
for (int i = 0; i < b.length; ++i)
{
System.out.print(
Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));
}
}

public static byte[] encode2bytes(int transaction, int rank)
{
byte[] mybyte= new byte[5];
mybyte[0] =(byte)( transaction >> 24 );
mybyte[1] =(byte)( (transaction << 8) >> 24 );
mybyte[2] =(byte)( (transaction << 16) >> 24 );
mybyte[3] =(byte)( (transaction << 24) >> 24 );
mybyte[4] =(byte)( rank ) ;
return mybyte;
}

public static int getTransaction(byte[] mybyte)
{
int i = 0;
int pos = 0;
i += ((int) mybyte[pos++] & 0xFF) << 24;
i += ((int) mybyte[pos++] & 0xFF) << 16;
i += ((int) mybyte[pos++] & 0xFF) << 8;
i += ((int) mybyte[pos] & 0xFF);
return i;
}

public static int getRank(byte[] mybyte)
{
int foo;
foo =((int)mybyte[4] & 0xFF);
return foo;
}
}
Notes:
  • getLastPosition should be used once after you attach the server to get the last message handled by XStream In
  • The client has to be OCI or Java/OCI. You must include the xstreams.jar library in the classpath

To compile the code, set the CLASSPATH, the PATH and the JAVA_HOME. One done, you can simply run it:
export CLASSPATH=.:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH
export JAVA_HOME=/opt/jdk1.6.0_13
export PATH=$JAVA_HOME/bin:$PATH

javac XStreamInDemo.java
java XStreamInDemo

Check messages are applied to the SOURCE.T8 table

The program enqueue messages. You can verify messages are applied to the table like in the script below:
sqlplus / as sysdba

select count(*)
from source.T8;

COUNT(*)
----------
1073


/

COUNT(*)
----------
1076

Clean Up the Environment

To clean up the environment, execute the script below:
connect strmadmin/strmadmin

exec DBMS_APPLY_ADM.STOP_APPLY('xin');

BEGIN
DBMS_XSTREAM_ADM.DROP_INBOUND(
server_name => 'xin');
END;
/

select *
from DBA_XSTREAM_INBOUND;

select *
from dba_apply
where purpose='XSTREAM IN';

connect / as sysdba

drop user source cascade;
drop user strmadmin cascade;

2 comments:

  1. thank you for an example of using XStream. But is the XStream realy something new? Maybe just a new closed framework from oracle based on Streams technology as for example dbms_cdc*, which functionality could be just selfprogrammed with standard Streams components?

    Thanks,
    Gennadiy.

    ReplyDelete
  2. * Yes, XStream is a new feature of 11g release 2
    * No, it doesn't rely on any already existing features of Streams
    * Yes, XStream provides a very nice extension to Streams that you would probably not be able to build manually, even with months of custom development, except maybe, if you mine the content of the redo/archived logs from the outside like GoldenGate does.

    I guess I should have explained XStream before providing those 2 sample programs. To make it short, everytime (up to 11.1), you use to exchange Streams messages between Streams and the outside, it had to be persistent messages. On the other side, messages used by Streams internally can be fully memory based i.e. way more efficient . With XStream, you can now (it provides the resend/tracking features) share non persistent messages with the outside.

    XStream can really speed up and scale replication streams with non-Oracle components. That's why I'm so enthusiast about it!

    ReplyDelete