python结合shell自动创建kafka的连接器实战教程

目录

环境

cat /etc/redhat-release 
CentOS Linux release 7.5.1804 (Core) 
[root@localhost ~]# uname -a
Linux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
python -V
Python 2.7.5

安装连接oracle的python包

pip install cx_Oracle==7.3

获取oracle表信息

cat query_oracle.py 
#!/usr/bin/env python
import cx_Oracle
import sys
import os
import csv
import traceback
file = open(\"oracle.txt\", \'w\').close()
user = \"test\"
passwd = \"test\"
listener = \'10.0.2.15:1521/orcl\'
conn = cx_Oracle.connect(user, passwd, listener)
cursor = conn.cursor()
sql = \"select table_name from user_tables\"
 
cursor.execute(sql)
LIST1=[]
while True:
    row = cursor.fetchone()
    if row == None:
        break
    for table in row:
        #print table
        LIST1.append(table)
LIST2=[]
for i in LIST1:
    sql3 = \"select COLUMN_NAME,DATA_TYPE,DATA_PRECISION,DATA_SCALE from cols WHERE TABLE_name=upper(\'%s\')\" %i
    cursor.execute(sql3)
    cursor.execute(sql3)
    row3 = cursor.fetchall()
    for data in row3:
        #LIST2.append(i)
        LIST2.extend(list(data))
    LIST2.append(i)
    f=open(\'oracle.txt\',\'a+\')
    print >> f,LIST2
    LIST2=[]
#f=open(\'test.txt\',\'a+\')
#select table_name,column_name,DATA_TYPE from cols WHERE TABLE_name=upper(\'student\'); 
#select column_name,DATA_TYPE from cols WHERE TABLE_name=upper(\'student\');

去掉多余部分

cat auto.sh 
#!/bin/bash
#python query_oracle.py |tr \",\" \' \'|tr \"\'\" \' \'|tr \"[\" \" \"|tr \"]\" \" \"
#>oracle.txt
>oracle_tables.txt
cat oracle.txt |tr \"[],\'\" \" \"|sed \"s#[ ][ ]*# #g\"|sed \'s/^[ \\t]*//g\' >> oracle_tables.txt
cat oracle_tables.txt 
SNO NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT DATE_DATE 
SNO2 NUMBER 19 0 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT2 INPUT_TIME
SNO3 NUMBER 19 2 SNAME VARCHAR2 None None SSEX VARCHAR2 None None SBIRTHDAY DATE None None SCLASS VARCHAR2 None None STUDENT3 DATA_DATE

shell 脚本处理表信息文件

cat connect.sh 
#!/bin/bash
#获取临时文件的行数
FILE_NUM=$(cat oracle_tables.txt |egrep -v \'#|^$\'|wc -l)
#清空自动创建连接器的脚本
>create-connect.sh
#循环临时文件每一行
for i in `seq $FILE_NUM`
do 
    FILE_LINE=$(sed -n ${i}p oracle_tables.txt)
    TABLE_NAME=$(echo ${FILE_LINE}|sed \'s/[ \\t]*$//g\'|awk \'{print $(NF-1)}\')
    COL_NUM=$(echo ${FILE_LINE}|sed \'s/[ \\t]*$//g\'|awk -F \"[ ]\" \'{print NF}\')
    REAL_COL_NUM=`expr $COL_NUM - 2`
    #清空临时文件
    >${TABLE_NAME}.txt
    >${TABLE_NAME}.sql
    #循环临时文件每行列名所在的列
    for j in `seq 1 4 $REAL_COL_NUM`
    do
        k=`expr $j + 1`
        m=`expr $j + 2`
        n=`expr $j + 3`
        COL_NAME=$(echo $FILE_LINE|cut -d \" \" -f${j})
        COL_DATA_TYPE=$(echo $FILE_LINE|cut -d \" \" -f${k})
        COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d \" \" -f${m})
        COL_DATA_SCALE=$(echo $FILE_LINE|cut -d \" \" -f${n})
        #判断列的数据类型是否是NUMBER
        if [ \"$COL_DATA_TYPE\" = \"NUMBER\" ]
        then
        #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中
            echo \"CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME\" >> ${TABLE_NAME}.txt
        else
        #循环拼接SQL查询中的列名部分,追加到临时文件中
            echo \"$COL_NAME\" >> ${TABLE_NAME}.txt
        fi
    done
    #拼接完整的SQL语句,追加到临时文件中
    echo \"select $(cat ${TABLE_NAME}.txt |tr \"\\n\" \",\"|sed -e \'s/,$/\\n/\') from $TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d \' \' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d \' \' -f$COL_NUM)<trunc(sysdate-1)\" >> ${TABLE_NAME}.sql
#循环追加每个表对应的连接器到自动创建连接器的脚本中
cat >> create-connect.sh << EOF
curl -X POST http://localhost:8083/connectors -H \"Content-Type: application/json\" -d \'{
\"name\": \"jdbc_source_$TABLE_NAME\",
\"config\": {
\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\",
\"connection.url\": \"jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:orcl\",
\"connection.user\": \"{{ ORACLE_USER }}\",
\"connection.password\": \"{{ ORACLE_PASSWD }}\",
\"topic.prefix\": \"YC_$TABLE_NAME\",
\"mode\": \"{{ CONNECT_MODE }}\",
\"query\": \"$(cat ${TABLE_NAME}.sql)\"
}
}\' >/dev/null 2>&1
EOF
done

说明:脚本中{{ 变量名 }}部分的内容是获取ansible中的变量,这个脚本是和ansible结合使用的。

增强版处理表信息脚本

#!/bin/bash
#获取临时文件的行数
FILE_NUM=$(cat oracle_time_tables.txt |egrep -v \'#|^$\'|wc -l)
#清空创建连接器的脚本并追加echos函数
> create-jdbc-connect.sh
cat >> create-jdbc-connect.sh << EOF
#!/bin/bash
echos(){
case \\$1 in
red)    echo -e \"\\033[31m \\$2 \\033[0m\";;
green)  echo -e \"\\033[32m \\$2 \\033[0m\";;
yellow) echo -e \"\\033[33m \\$2 \\033[0m\";;
blue)   echo -e \"\\033[34m \\$2 \\033[0m\";;
purple) echo -e \"\\033[35m \\$2 \\033[0m\";;
*)      echo \"\\$2\";;
esac
}
EOF
> create-jdbc-connect-time.sh
cat >> create-jdbc-connect-time.sh << EOF
#!/bin/bash
echos(){
case \\$1 in
red)    echo -e \"\\033[31m \\$2 \\033[0m\";;
green)  echo -e \"\\033[32m \\$2 \\033[0m\";;
yellow) echo -e \"\\033[33m \\$2 \\033[0m\";;
blue)   echo -e \"\\033[34m \\$2 \\033[0m\";;
purple) echo -e \"\\033[35m \\$2 \\033[0m\";;
*)      echo \"\\$2\";;
esac
}
EOF
#创建表相关文件目录
mkdir -p ./TABLE_TIME
#循环临时文件每一行
for i in `seq $FILE_NUM`
do 
    FILE_LINE=$(sed -n ${i}p oracle_time_tables.txt)
    TABLE_NAME=$(echo ${FILE_LINE}|sed \'s/[ \\t]*$//g\'|awk \'{print $(NF)}\')
    COL_NUM=$(echo ${FILE_LINE}|sed \'s/[ \\t]*$//g\'|awk -F \"[ ]\" \'{print NF}\')
    REAL_COL_NUM=`expr $COL_NUM - 2`
    #清空临时文件
    >./TABLE_TIME/${TABLE_NAME}_time.txt
    >./TABLE_TIME/${TABLE_NAME}_time.sql
    >./TABLE_TIME/${TABLE_NAME}.sql
    #循环临时文件每行列名所在的列
    for j in `seq 1 4 $REAL_COL_NUM`
    do
        k=`expr $j + 1`
        m=`expr $j + 2`
        n=`expr $j + 3`
        COL_NAME=$(echo $FILE_LINE|cut -d \" \" -f${j})
        COL_DATA_TYPE=$(echo $FILE_LINE|cut -d \" \" -f${k})
        COL_DATA_PRECISION=$(echo $FILE_LINE|cut -d \" \" -f${m})
        COL_DATA_SCALE=$(echo $FILE_LINE|cut -d \" \" -f${n})
        #判断列的数据类型是否是NUMBER
        if [ \"$COL_DATA_TYPE\" = \"NUMBER\" ]
        then
        #循环拼接SQL查询中的CAST(* AS *) AS *部分,追加到临时文件中
            echo \"CAST($COL_NAME AS $COL_DATA_TYPE($COL_DATA_PRECISION,$COL_DATA_SCALE)) AS $COL_NAME\" >> ./TABLE_TIME/${TABLE_NAME}_time.txt
        else
        #循环拼接SQL查询中的列名部分,追加到临时文件中
            echo \"$COL_NAME\" >> ./TABLE_TIME/${TABLE_NAME}_time.txt
        fi
        #判断是否存在hosts中定义的时间列,如果有就追加该列名进一个临时文件中
        TIME_COL=({{ TABLE_TIME_COL }})
        for TIME in ${TIME_COL[@]}
        do
            if [ \"$COL_NAME\" = \"$TIME\" ]
            then
                echo \"$COL_NAME\" > ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt
            fi
        done
    done
    #拼接完整的SQL语句,追加到临时文件中
    if [ -f \"./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt\" ]
    then
    #echo \"select $(cat ./TABLE_TIME/${TABLE_NAME}.txt |tr \"\\n\" \",\"|sed -e \'s/,$/\\n/\') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(sed -n ${i}p oracle_tables.txt|cut -d \' \' -f$COL_NUM)>=trunc(sysdate-2) and $(sed -n ${i}p oracle_tables.txt|cut -d \' \' -f$COL_NUM)<trunc(sysdate-1)\" >> ./TABLE_TIME/${TABLE_NAME}_time.sql
        echo \"select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr \"\\n\" \",\"|sed -e \'s/,$/\\n/\') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME where $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)>=trunc(sysdate-2) and $(cat ./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt)<trunc(sysdate-1)\" >> ./TABLE_TIME/${TABLE_NAME}_time.sql
    else
        echo \"select $(cat ./TABLE_TIME/${TABLE_NAME}_time.txt |tr \"\\n\" \",\"|sed -e \'s/,$/\\n/\') from {{ ORACLE_TABLES_USER }}.$TABLE_NAME\" >> ./TABLE_TIME/${TABLE_NAME}.sql
    fi
#循环追加每个表对应的连接器到自动创建连接器的脚本中
if [ -f \"./TABLE_TIME/${TABLE_NAME}_TIME_COL.txt\" ]
then
cat >> create-jdbc-connect-time.sh << EOF
#创建表 $TABLE_NAME 连接器的命令如下
curl -X POST http://localhost:8083/connectors -H \"Content-Type: application/json\" -d \'{
\"name\": \"jdbc_time_$TABLE_NAME\",
\"config\": {
\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\",
\"connection.url\": \"jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}\",
\"connection.user\": \"{{ ORACLE_USER }}\",
\"connection.password\": \"{{ ORACLE_PASSWD }}\",
\"topic.prefix\": \"YC_${TABLE_NAME}_INSERT\",
\"poll.interval.ms\": \"86400000\",
\"mode\": \"{{ CONNECT_MODE }}\",
\"numeric.mapping\": \"best_fit\",
\"query\": \"$(cat ./TABLE_TIME/${TABLE_NAME}_time.sql)\"
}
}\' >/dev/null 2>&1
#判断连接器是否创建成功
if [ \\$? -eq 0 ]
then
    echos green \"\\$(date +\"%F %H:%M:%S\") 创建jdbc_time_${TABLE_NAME} 连接器成功\"
else
    echos red \"\\$(date +\"%F %H:%M:%S\") 创建jdbc_time_${TABLE_NAME} 连接器失败\"
fi
EOF
else
cat >> create-jdbc-connect.sh << EOF
#创建表 $TABLE_NAME 连接器的命令如下
curl -X POST http://localhost:8083/connectors -H \"Content-Type: application/json\" -d \'{
\"name\": \"jdbc_$TABLE_NAME\",
\"config\": {
\"connector.class\": \"io.confluent.connect.jdbc.JdbcSourceConnector\",
\"connection.url\": \"jdbc:oracle:thin:@{{ ORACLE_IP }}:{{ ORACLE_PORT }}:{{ ORACLE_SERVER_NAME }}\",
\"connection.user\": \"{{ ORACLE_USER }}\",
\"connection.password\": \"{{ ORACLE_PASSWD }}\",
\"topic.prefix\": \"YC_${TABLE_NAME}_INSERT\",
\"poll.interval.ms\": \"86400000\",
\"mode\": \"{{ CONNECT_MODE }}\",
\"numeric.mapping\": \"best_fit\",
\"query\": \"$(cat ./TABLE_TIME/${TABLE_NAME}.sql)\"
}
}\' >/dev/null 2>&1
#判断连接器是否创建成功
if [ \\$? -eq 0 ]
then
    echos green \"\\$(date +\"%F %H:%M:%S\") 创建jdbc_${TABLE_NAME} 连接器成功\"
else
    echos red \"\\$(date +\"%F %H:%M:%S\") 创建jdbc_${TABLE_NAME} 连接器失败\"
fi
EOF
fi
done
© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容