This post is about XStream; it provides a Java program that uses "XStream In" to enqueue messages to a buffered queue. A Streams apply process applies them to a table named
SOURCE.T8
.To know everything about XStream, read the following documentation:
- Oracle® Database XStream Guide
- Oracle® Database XStream Java API Reference
- Oracle GoldenGate 10.4 Licencing Guide
Step 1: Create a Sample Schema
For this example, you'll need a schema and a table:sqlplus / as sysdba
create user source
identified by source
default tablespace users
temporary tablespace temp;
grant connect,resource to source;
col dbname new_value dbname
select value dbname
from v$parameter
where name='db_unique_name';
prompt &&dbname
connect source/source
create table t8(
id number primary key,
text1 varchar2(80),
text2 varchar2(80));
commit;
Step 2: Create a Streams Administrator
To use XStream, you'll also need a Streams administrator and a queue to stage buffered messages :connect / as sysdba
create tablespace streams_tbs
datafile '/u01/app/oracle/oradata/BLACK/streams_tbs01.dbf'
size 25M autoextend on maxsize 256M;
CREATE USER strmadmin
IDENTIFIED BY strmadmin
DEFAULT TABLESPACE streams_tbs
QUOTA UNLIMITED ON streams_tbs
temporary tablespace temp;
grant dba to strmadmin;
begin
dbms_streams_adm.set_up_queue(
queue_table => 'strmadmin.streams_queue_table',
queue_name => 'strmadmin.streams_queue');
end;
/
exec dbms_streams_auth.grant_admin_privilege('strmadmin', true);
select *
from dba_streams_administrator;
Step 3: Create a Simple XStream In Configuration
You must create and start a XStream In configuration before you use a Java or OCI client to enqueue messages:connect strmadmin/strmadmin
BEGIN
DBMS_XSTREAM_ADM.CREATE_INBOUND(
server_name => 'xin',
queue_name => 'xin_queue');
END;
/
SELECT *
FROM DBA_XSTREAM_INBOUND;
set pages 1000
select *
from dba_apply
where purpose='XSTREAM IN';
exec DBMS_APPLY_ADM.START_APPLY('xin');
Step 4: Create a JAVA XStream Inbound Client
Create aXStreamInDemo.java
file that enqueue messages to the buffered queue. Change the part in red to match your environment:import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Date;
import oracle.jdbc.internal.OracleConnection;
import oracle.sql.CHAR;
import oracle.sql.DATE;
import oracle.streams.ColumnValue;
import oracle.streams.DefaultColumnValue;
import oracle.streams.DefaultRowLCR;
import oracle.streams.RowLCR;
import oracle.streams.XStreamIn;
public class XStreamInDemo {
public static void main(String args[])
{
String in_url = "jdbc:oracle:oci:@arkzoyd-easyteam:1521:BLACK";
/*
* Connect to the Database
*/
Connection in_conn = null;
try
{
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
in_conn=DriverManager.getConnection(in_url, "strmadmin", "strmadmin");
}
catch(Exception e)
{
System.out.println("DB Connection Failed: " + in_url);
e.printStackTrace();
}
/*
* Get a XStream In Handler
*/
XStreamIn xsIn=null;
String xsinName="XIN";
byte[] lastPosition = null;
int transaction=0;
int rank=0;
try
{
xsIn = XStreamIn.attach ((OracleConnection)in_conn, xsinName,
"HI2" , XStreamIn.DEFAULT_MODE);
// use last position to decide where should we start sending LCRs
System.out.println("Attached to inbound server:"+xsinName);
System.out.print("Inbound Server Last Position is: ");
lastPosition = xsIn.getLastPosition();
if (null == lastPosition)
{
System.out.println("null");
transaction=1;
rank=1;
}
else {
printHex(lastPosition); System.out.println("");
transaction = getTransaction(lastPosition);
rank = getRank(lastPosition);
if (rank==1) rank=2; else { rank=1; transaction++; }
}
}
catch(Exception e)
{
System.out.println("cannot attach to inbound server: "+xsinName);
System.out.println(e.getMessage());
e.printStackTrace();
}
/*
* Create a
*/
try {
DATE mydate;
DefaultRowLCR alcr;
byte[] processedLowPosition;
while(true) {
if (rank==1) {
mydate = new DATE();
System.out.println("-- " +
Integer.toString(transaction) +
" -------------------------");
alcr=new DefaultRowLCR(
"BLACK", RowLCR.INSERT, "SOURCE", "T8",
"X."+Integer.toString(transaction), null,
encode2bytes(transaction, rank), mydate);
ColumnValue[] newcolumn= new ColumnValue[3];
newcolumn[0]= new DefaultColumnValue("ID",
new oracle.sql.NUMBER(transaction));
newcolumn[1]= new DefaultColumnValue("TEXT1",
new CHAR("Hello2", CHAR.DEFAULT_CHARSET));
newcolumn[2]= new DefaultColumnValue("TEXT2",
new CHAR("Hello2", CHAR.DEFAULT_CHARSET));
alcr.setNewValues(newcolumn);
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
System.out.println(" " +alcr.getStatement(false));
xsIn.flush(XStreamIn.DEFAULT_MODE);
rank++;
Thread.sleep(500);
} else {
mydate = new DATE();
alcr=new DefaultRowLCR(
"BLACK", RowLCR.COMMIT, null, null,
"X."+Integer.toString(transaction), null,
encode2bytes(transaction, rank), mydate);
xsIn.sendLCR(alcr, XStreamIn.DEFAULT_MODE);
xsIn.flush(XStreamIn.DEFAULT_MODE);
System.out.println(" " +alcr.getStatement(false));
rank=1;
transaction++;
Thread.sleep(500);
}
System.out.print(" ");
processedLowPosition =
xsIn.getProcessedLowWatermark();
if (processedLowPosition != null) {
System.out.print("processedLowPosition: ");
printHex(processedLowPosition);
System.out.print(" (" +
Integer.toString(getTransaction(processedLowPosition))+
", " +
Integer.toString(getRank(processedLowPosition)) +
")");
}
else {
System.out.print("processedLowPosition: null");
}
lastPosition =
xsIn.getLastPosition();
System.out.println("");
}
} catch (Exception e) {
System.out.println("exception when processing LCRs");
System.out.println(e.getMessage());
e.printStackTrace();
}
}
public static void printHex(byte[] b)
{
for (int i = 0; i < b.length; ++i)
{
System.out.print(
Integer.toHexString((b[i]&0xFF) | 0x100).substring(1,3));
}
}
public static byte[] encode2bytes(int transaction, int rank)
{
byte[] mybyte= new byte[5];
mybyte[0] =(byte)( transaction >> 24 );
mybyte[1] =(byte)( (transaction << 8) >> 24 );
mybyte[2] =(byte)( (transaction << 16) >> 24 );
mybyte[3] =(byte)( (transaction << 24) >> 24 );
mybyte[4] =(byte)( rank ) ;
return mybyte;
}
public static int getTransaction(byte[] mybyte)
{
int i = 0;
int pos = 0;
i += ((int) mybyte[pos++] & 0xFF) << 24;
i += ((int) mybyte[pos++] & 0xFF) << 16;
i += ((int) mybyte[pos++] & 0xFF) << 8;
i += ((int) mybyte[pos] & 0xFF);
return i;
}
public static int getRank(byte[] mybyte)
{
int foo;
foo =((int)mybyte[4] & 0xFF);
return foo;
}
}
Notes:
- getLastPosition should be used once after you attach the server to get the last message handled by XStream In
- The client has to be OCI or Java/OCI. You must include the xstreams.jar library in the classpath
To compile the code, set the
CLASSPATH
, the PATH
and the JAVA_HOME
. One done, you can simply run it:export CLASSPATH=.:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/jdbc/lib/ojdbc6.jar:$CLASSPATH
export CLASSPATH=$ORACLE_HOME/rdbms/jlib/xstreams.jar:$CLASSPATH
export JAVA_HOME=/opt/jdk1.6.0_13
export PATH=$JAVA_HOME/bin:$PATH
javac XStreamInDemo.java
java XStreamInDemo
Check messages are applied to the SOURCE.T8
table
The program enqueue messages. You can verify messages are applied to the table like in the script below:sqlplus / as sysdba
select count(*)
from source.T8;
COUNT(*)
----------
1073
/
COUNT(*)
----------
1076
Clean Up the Environment
To clean up the environment, execute the script below:connect strmadmin/strmadmin
exec DBMS_APPLY_ADM.STOP_APPLY('xin');
BEGIN
DBMS_XSTREAM_ADM.DROP_INBOUND(
server_name => 'xin');
END;
/
select *
from DBA_XSTREAM_INBOUND;
select *
from dba_apply
where purpose='XSTREAM IN';
connect / as sysdba
drop user source cascade;
drop user strmadmin cascade;
thank you for an example of using XStream. But is the XStream realy something new? Maybe just a new closed framework from oracle based on Streams technology as for example dbms_cdc*, which functionality could be just selfprogrammed with standard Streams components?
ReplyDeleteThanks,
Gennadiy.
* Yes, XStream is a new feature of 11g release 2
ReplyDelete* No, it doesn't rely on any already existing features of Streams
* Yes, XStream provides a very nice extension to Streams that you would probably not be able to build manually, even with months of custom development, except maybe, if you mine the content of the redo/archived logs from the outside like GoldenGate does.
I guess I should have explained XStream before providing those 2 sample programs. To make it short, everytime (up to 11.1), you use to exchange Streams messages between Streams and the outside, it had to be persistent messages. On the other side, messages used by Streams internally can be fully memory based i.e. way more efficient . With XStream, you can now (it provides the resend/tracking features) share non persistent messages with the outside.
XStream can really speed up and scale replication streams with non-Oracle components. That's why I'm so enthusiast about it!