IDate
import java. text. SimpleDateFormat
import java. util. { Calendar , Date } trait IDate extends Serializable { def onDate ( dt: String ) : Unit def invoke ( dt: String , dt1: String ) = { if ( dt1 < dt) { throw new IllegalArgumentException ( s"dt1:${dt1}小于dt:${dt}" ) } var sdf: SimpleDateFormat = new SimpleDateFormat ( "yyyy-MM-dd" ) val d1: Date = sdf. parse ( dt) var d2: Date = sdf. parse ( dt1) var tmp: Date = d1var dd: Calendar = Calendar . getInstancedd. setTime ( d1) while ( tmp. getTime <= d2. getTime) { tmp = dd. getTimeSystem . out. println ( "IDate遍历日期:" + sdf. format ( tmp) ) onDate ( sdf. format ( tmp) ) dd. add ( Calendar . DAY_OF_MONTH , 1 ) tmp = dd. getTime} }
}
IMonth
import com. mingzhi. common. utils. DateUtil
trait IMonth extends Serializable { def onMonth ( dt: String ) : Unit def invoke ( dt: String , dt1: String ) : Unit = { if ( dt. equals ( dt1) ) { onMonth ( DateUtil . getMonthStart ( dt) ) } else { new IDate { override def onDate ( dt: String ) : Unit = { val monthStart = DateUtil . getMonthStart ( dt) if ( dt. equalsIgnoreCase ( monthStart) ) { System . out. println ( "IMonth遍历日期:" + dt) onMonth ( dt) } } } . invoke ( dt, dt1) } }
}
MySaveMode
import org. apache. spark. annotation. InterfaceStability @InterfaceStability.Stable
object MySaveMode extends Enumeration { type MySaveMode = Value valOverWriteAllTable , OverWriteByDt , OverWriteByQuarter , OverWriteByMonth , Ignore = Value
}
LoadStrategy
import org. apache. spark. annotation. InterfaceStability @InterfaceStability.Stable
object LoadStrategy extends Enumeration { type strategy = Value valALL , NEW_AND_CHANGE , NEW = Value
} ```
## HiveUtil ```java
import org. apache. spark. sql. SparkSession object HiveUtil { def openDynamicPartition ( spark: SparkSession ) = { spark. sql ( "set hive.exec.dynamic.partition=true" ) spark. sql ( "set hive.exec.dynamic.partition.mode=nonstrict" ) spark. sql ( "set spark.sql.shuffle.partitions=18" ) } def useSnappyCompression ( spark: SparkSession ) = { spark. sql ( "set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec" ) ; spark. sql ( "set mapreduce.output.fileoutputformat.compress=true" ) spark. sql ( "set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec" ) } }
SparkUtils
import com. mingzhi. ConfigUtils
import org. apache. spark. sql. { DataFrame , SaveMode , SparkSession }
import org. elasticsearch. hadoop. cfg. ConfigurationOptions object SparkUtils { def getBuilder: SparkSession. Builder = { val builder = SparkSession . builder ( ) . config ( "dfs.replication" , 1 ) . config ( "hive.exec.dynamic.partition.mode" , "nonstrict" ) . config ( "spark.sql.parquet.writeLegacyFormat" , "true" ) . config ( "spark.cleaner.referenceTracking.cleanCheckpoints" , value = true ) . config ( "spark.cleaner.periodicGC.interval" , "1min" ) . config ( "spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation" , "true" ) . config ( "hive.exec.dynamic.partition" , "true" ) . config ( "spark.sql.shuffle.partitions" , "12" ) . config ( ConfigurationOptions . ES_PORT , 9200 ) . config ( ConfigurationOptions . ES_MAPPING_DATE_RICH_OBJECT , value = false ) . config ( ConfigurationOptions . ES_SCROLL_SIZE , 10000 ) . config ( ConfigurationOptions . ES_MAX_DOCS_PER_PARTITION , 1000000 ) . config ( ConfigurationOptions . ES_HTTP_TIMEOUT , "5m" ) . config ( ConfigurationOptions . ES_SCROLL_KEEPALIVE , "10m" ) . enableHiveSupport ( ) if ( ConfigUtils . isWindowEvn) { builder. config ( "spark.ui.port" , "4040" ) . master ( "local[*]" ) } builder} def setCheckPoint ( spark: SparkSession , dir: String ) : Unit = { var ckPath = "/ck/" + dirif ( ConfigUtils . isWindowEvn) { } else { ckPath = s"hdfs://mz-hadoop-01:8020/ck/$dir" } spark. sparkContext. setCheckpointDir ( ckPath) } def persistDataFrame ( spark: SparkSession , frame: DataFrame ) : ( DataFrame , String ) = { val db = "temp" val table = StringUtil . getRandomString ( 20 ) val tempTableName = s"${db}.${table}_" + System . currentTimeMillis ( ) frame. write. format ( "parquet" ) . mode ( SaveMode. Overwrite ) . saveAsTable ( tempTableName) ( spark. sql ( s"select * from $tempTableName " ) , tempTableName) } def unPersistDataFrame ( spark: SparkSession , tableName: String ) : DataFrame = { spark. sql ( s"drop table ${tableName} " ) }
}
StringUtil
package com. mingzhi. common. utils ; import org. apache. commons. lang3. StringUtils ; import java. util. Objects ;
import java. util. Random ;
import java. util. regex. Matcher ;
import java. util. regex. Pattern ; public class StringUtil { public static void main ( String [ ] args) { String regEx1 = "[\\u4e00-\\u9fa5]" ; String regEx2 = "[a-z||A-Z]" ; String regEx3 = "[0-9]" ; String str = "0我,大傻逼?!¥{}。。。c" ; String s1 = getCharacter ( str) ;
System . out. println ( s1) ; System . out. println ( getRandomString ( 10 ) ) ; } public static String getCharacter ( String str) { String regEx1 = "[\\u4e00-\\u9fa5]" ; regEx1 = "[(\\u4e00-\\u9fa5)(\\u3002|\\uff1f|\\uff01|\\uff0c|\\u3001|\\uff1b|\\uff1a|\\u201c|\\u201d|\\u2018|\\u2019|\\uff08|\\uff09|\\u300a|\\u300b|\\u3010|\\u3011|\\u007e)]" ; return matchResult ( Pattern . compile ( regEx1) , str) ; } public static boolean isBlank ( CharSequence sequence) { return StringUtils . isBlank ( sequence) ; } public static boolean isNotBlank ( CharSequence sequence) { return StringUtils . isNotBlank ( sequence) ; } public static void assertNotBlank ( String filed, String msg) { Objects . requireNonNull ( filed, msg) ; } private static String matchResult ( Pattern p, String str) { StringBuilder sb = new StringBuilder ( ) ; Matcher m = p. matcher ( str) ; while ( m. find ( ) ) { for ( int i = 0 ; i <= m. groupCount ( ) ; i++ ) { sb. append ( m. group ( ) ) ; } } return sb. toString ( ) ; } private static final String CHARACTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" ; public static String getRandomString ( int length) { StringBuilder sb = new StringBuilder ( ) ; Random random = new Random ( ) ; for ( int i = 0 ; i < length; i++ ) { int index = random. nextInt ( CHARACTERS . length ( ) ) ; sb. append ( CHARACTERS . charAt ( index) ) ; } return sb. toString ( ) ; }
}
JsonUtil
import com. alibaba. fastjson. JSON ; public class JsonUtil { public static < T > T getObjFromJson ( String json, Class < T > tClass) { try { return JSON . parseObject ( json, tClass) ; } catch ( Exception e) { return null ; } } public static String getJsonStrFromObj ( Object o) { return JSON . toJSONString ( o) ; }
}
FileUtil
import org. apache. commons. io. FileUtils ; import java. io. * ;
import java. nio. charset. StandardCharsets ;
import java. util. ArrayList ; public class FileUtil { public static void main ( String [ ] args) throws IOException { write ( "xxx1.txt" , "hello\nmoto1" ) ; System . out. println ( read ( "xxx1.txt" ) ) ; } public static String read ( String resourceName) { StringBuilder sb = new StringBuilder ( ) ; InputStream inputStream = null ; BufferedInputStream bufferedInputStream = null ; BufferedReader bufferedReader = null ; try { File f = new File ( resourceName) ; inputStream = new FileInputStream ( f) ; bufferedInputStream = new BufferedInputStream ( inputStream) ; bufferedReader = new BufferedReader ( new InputStreamReader ( bufferedInputStream, StandardCharsets . UTF_8 ) , 2 * 1024 * 1024 ) ; String line = "" ; while ( ( line = bufferedReader. readLine ( ) ) != null ) { sb. append ( line) . append ( "\n" ) ; } } catch ( IOException e) { e. printStackTrace ( ) ; } finally { try { assert bufferedReader != null ; bufferedReader. close ( ) ; bufferedInputStream. close ( ) ; inputStream. close ( ) ; } catch ( IOException e) { e. printStackTrace ( ) ; } } return sb. toString ( ) ; } public static void write ( String resourceName, String data) throws IOException { File f = new File ( resourceName) ; System . out. println ( "Writing to " + f. getCanonicalPath ( ) ) ; ArrayList < String > lines = new ArrayList < > ( ) ; lines. add ( data) ; FileUtils . writeLines ( f, lines, false ) ; } public static String readFromResource ( String resourceName) { StringBuilder sb = new StringBuilder ( ) ; InputStream inputStream = null ; BufferedInputStream bufferedInputStream = null ; BufferedReader bufferedReader = null ; try {
inputStream = ClassLoader . getSystemResourceAsStream ( resourceName) ; assert inputStream != null ; bufferedInputStream = new BufferedInputStream ( inputStream) ; bufferedReader = new BufferedReader ( new InputStreamReader ( bufferedInputStream, StandardCharsets . UTF_8 ) , 2 * 1024 * 1024 ) ; String line = "" ; while ( ( line = bufferedReader. readLine ( ) ) != null ) { sb. append ( line) . append ( "\n" ) ; } } catch ( IOException e) { e. printStackTrace ( ) ; } finally { try { assert bufferedReader != null ; bufferedReader. close ( ) ; bufferedInputStream. close ( ) ; inputStream. close ( ) ; } catch ( IOException e) { e. printStackTrace ( ) ; } } return sb. toString ( ) ; }
}
DateUtil
package com. mingzhi. common. utils ; import java. text. ParseException ;
import java. text. SimpleDateFormat ;
import java. time. LocalDate ;
import java. time. temporal. TemporalAdjusters ;
import java. util. ArrayList ;
import java. util. Calendar ;
import java. util. Date ; public class DateUtil { public static void main ( String [ ] args) throws ParseException { System . out. println ( getDateStrFromMill ( 1617701847078L ) ) ; System . out. println ( "today:" + getToday ( ) ) ; System . out. println ( "yesterday:" + getYesterday ( ) ) ; System . out. println ( "weekStart" + getWeekStart ( ) ) ; System . out. println ( getMonthStart ( ) ) ; System . out. println ( getWeek ( "2021-07-01 01:00:00" ) ) ; System . out. println ( getTime ( "2021-07-01 01:00:00" ) ) ; Long mills = getMillsFromString ( "2022-04-07 17:01:02" ) ; System . out. println ( mills) ; System . out. println ( DateUtil . getDateStrFromMill ( mills) ) ; System . out. println ( getLastDay ( "2022" , "02" ) ) ; System . out. println ( getQuarterStart ( "2022-09-18" ) ) ; System . out. println ( getQuarterEnd ( "2022-09-30" ) ) ; System . out. println ( getQuarterEnd ( "2022-09-31" ) ) ; } public static boolean isValidDate ( String str) { boolean valid = true ; SimpleDateFormat format = new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) ; try { format. setLenient ( false ) ; format. parse ( str) ; } catch ( Exception e) { valid = false ; } return valid; } public static ArrayList < String > getDateList ( String beginDateStr, String endDateStr) throws ParseException { SimpleDateFormat dateFormat = new SimpleDateFormat ( "yyyy-MM-dd" ) ; int dateFiled = Calendar . DAY_OF_MONTH ; Date beginDate = dateFormat. parse ( beginDateStr) ; Date endDate = dateFormat. parse ( endDateStr) ; Calendar calendar = Calendar . getInstance ( ) ; calendar. setTime ( beginDate) ;
final ArrayList < String > list = new ArrayList < > ( ) ; while ( beginDate. compareTo ( endDate) <= 0 ) {
list. add ( dateFormat. format ( beginDate) ) ; calendar. add ( dateFiled, 1 ) ; beginDate = calendar. getTime ( ) ; } return list; } public static boolean before ( String dt, String dt1) throws ParseException { SimpleDateFormat dateFormat = new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) ; Date dateTime1 = dateFormat. parse ( dt) ; Date dateTime2 = dateFormat. parse ( dt1) ; return dateTime1. before ( dateTime2) ; } public static String getYesterday ( ) { return getDateStrFromMill ( System . currentTimeMillis ( ) - 3600 * 1000 * 24 ) . split ( " " ) [ 0 ] ; } public static String back1Week ( String t) { return getDateStrFromMill ( getMillsFromString ( t) - 60 * 60 * 1000 * 168 ) ; } public static String next1Day ( String t) { return getDateStrFromMill ( getMillsFromString ( t) + 60 * 60 * 1000 * 24 ) ; } public static String back1Day ( String t) { return getDateStrFromMill ( getMillsFromString ( t) - 60 * 60 * 1000 * 24 ) ; } public static Long getWindowStartWithOffset ( Long timestamp, Long offset, Long windowSize) { return timestamp - ( timestamp - offset + windowSize) % windowSize; } public static String back1Quarter ( String t) { return getDateStrFromMill ( getMillsFromString ( t) - 15 * 60 * 1000 ) ; } public static String getQuarterStart ( String dt) throws ParseException { final SimpleDateFormat sdf = new SimpleDateFormat ( "yyyy-MM-dd" ) ; sdf. setLenient ( false ) ; final Date dBegin = sdf. parse ( dt) ; final Calendar calendar = Calendar . getInstance ( ) ; calendar. setTime ( dBegin) ; final int remainder = calendar. get ( Calendar . MONTH ) % 3 ; int month = ( remainder != 0 ) ? calendar. get ( Calendar . MONTH ) - remainder : calendar. get ( Calendar . MONTH ) ; calendar. set ( Calendar . MONTH , month) ; calendar. set ( Calendar . DAY_OF_MONTH , calendar. getActualMinimum ( Calendar . DAY_OF_MONTH ) ) ; calendar. setTime ( calendar. getTime ( ) ) ; return sdf. format ( calendar. getTime ( ) ) ; } public static String getQuarterEnd ( String dt) throws ParseException { String year = dt. substring ( 0 , 4 ) ; SimpleDateFormat sdf = new SimpleDateFormat ( "yyyy-MM-dd" ) ; sdf. setLenient ( false ) ; Calendar calendar = Calendar . getInstance ( ) ; calendar. setTime ( sdf. parse ( dt) ) ; int quarter = calendar. get ( Calendar . MONTH ) / 3 + 1 ; int lastMonth = quarter * 3 - 1 ; Calendar cal = Calendar . getInstance ( ) ; cal. set ( Calendar . YEAR , Integer . parseInt ( year) ) ; cal. set ( Calendar . MONTH , lastMonth) ; cal. set ( Calendar . DAY_OF_MONTH , cal. getActualMaximum ( Calendar . DAY_OF_MONTH ) ) ; cal. set ( Calendar . HOUR , 0 ) ; cal. set ( Calendar . MINUTE , 0 ) ; cal. set ( Calendar . SECOND , 0 ) ; return sdf. format ( cal. getTime ( ) ) ; } public static Long getMillsFromString ( String time) { long mill = 0L ; try { mill = ThreadSafeFormatter . dateFormatter. get ( ) . parse ( time) . getTime ( ) ; } catch ( ParseException e) { e. printStackTrace ( ) ; } return mill; } public static String getDateStrFromMill ( long ts) { return ThreadSafeFormatter . dateFormatter. get ( ) . format ( new Date ( ts) ) ; } public static Date getDate ( String dateTime) { SimpleDateFormat sdf = new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) ; try { Date date = sdf. parse ( dateTime) ; return date; } catch ( ParseException e) { e. printStackTrace ( ) ; } return null ; } public static String getWeek ( String dateTime) { String [ ] weekDays = { "sunday" , "monday" , "tuesday" , "wednesday" , "thursday" , "friday" , "saturday" } ; Calendar cal = Calendar . getInstance ( ) ; cal. setTime ( DateUtil . getDate ( dateTime) ) ; int w = cal. get ( Calendar . DAY_OF_WEEK ) - 1 ; if ( w < 0 ) w = 0 ; return weekDays[ w] ; } public static String getTime ( String dateTime) { SimpleDateFormat sdf = new SimpleDateFormat ( "HH:mm:ss" ) ; String time = sdf. format ( DateUtil . getDate ( dateTime) ) ; return time; } public static String getToday ( ) { return new SimpleDateFormat ( "yyyy-MM-dd" ) . format ( new Date ( ) ) ; } public static String getWeekStart ( ) { Calendar cal = Calendar . getInstance ( ) ; cal. add ( Calendar . WEEK_OF_MONTH , 0 ) ; cal. set ( Calendar . DAY_OF_WEEK , 2 ) ; Date time = cal. getTime ( ) ; return new SimpleDateFormat ( "yyyy-MM-dd" ) . format ( time) ; } public static String getWeekEnd ( ) { Calendar cal = Calendar . getInstance ( ) ; cal. set ( Calendar . DAY_OF_WEEK , cal. getActualMaximum ( Calendar . DAY_OF_WEEK ) ) ; cal. add ( Calendar . DAY_OF_WEEK , 1 ) ; Date time = cal. getTime ( ) ; return new SimpleDateFormat ( "yyyy-MM-dd" ) . format ( time) ; } public static String getMonthStart ( ) { Calendar cal = Calendar . getInstance ( ) ; cal. add ( Calendar . MONTH , 0 ) ; cal. set ( Calendar . DAY_OF_MONTH , 1 ) ; Date time = cal. getTime ( ) ; return new SimpleDateFormat ( "yyyy-MM-dd" ) . format ( time) ; } public static String getMonthEnd ( ) { Calendar cal = Calendar . getInstance ( ) ; cal. set ( Calendar . DAY_OF_MONTH , cal. getActualMaximum ( Calendar . DAY_OF_MONTH ) ) ; Date time = cal. getTime ( ) ; return new SimpleDateFormat ( "yyyy-MM-dd" ) . format ( time) ; } public static String getLastDay ( String year, String month) { LocalDate first = LocalDate . of ( Integer . parseInt ( year) , Integer . parseInt ( month) , 1 ) ; LocalDate last = first. with ( TemporalAdjusters . lastDayOfMonth ( ) ) ; return last. toString ( ) ; } public static String getMonthStart ( String dt) { final LocalDate localDate = LocalDate . parse ( dt) ; return localDate. with ( TemporalAdjusters . firstDayOfMonth ( ) ) . toString ( ) ; } public static String getMonthEnd ( String dt) { final LocalDate localDate = LocalDate . parse ( dt) ; return localDate. with ( TemporalAdjusters . lastDayOfMonth ( ) ) . toString ( ) ; } } class ThreadSafeFormatter { public static ThreadLocal < SimpleDateFormat > dateFormatter= ThreadLocal . withInitial ( ( ) -> new SimpleDateFormat ( "yyyy-MM-dd HH:mm:ss" ) ) ;
}
DruidConnection
import com. alibaba. druid. pool. DruidDataSourceFactory ;
import lombok. SneakyThrows ; import javax. sql. DataSource ;
import java. sql. * ;
import java. util. Properties ; public class DruidConnection { private final DataSource dataSource; private static volatile DruidConnection instance = null ; @SneakyThrows private DruidConnection ( String host, String port, String db, String username, String pwd) { Properties props = new Properties ( ) ; props. put ( "driverClassName" , "com.mysql.jdbc.Driver" ) ; props. put ( "url" , "jdbc:mysql://" + host + ":" + port + "/" + db + "?characterEncoding=UTF-8" ) ; props. put ( "username" , username) ; props. put ( "password" , pwd) ; dataSource = DruidDataSourceFactory . createDataSource ( props) ; } public static Connection getConnection ( String host, String port, String db, String username, String pwd) throws SQLException { if ( instance == null ) { synchronized ( DruidConnection . class ) { if ( instance == null ) { instance = new DruidConnection ( host, port, db, username, pwd) ; } } } return instance. dataSource. getConnection ( ) ; } @SneakyThrows public static void release ( ResultSet resultSet, Statement statement, Connection connection) { try { if ( resultSet != null ) { resultSet. close ( ) ; } if ( statement != null ) { statement. close ( ) ; } } catch ( SQLException e) { e. printStackTrace ( ) ; } finally { if ( connection != null ) { connection. close ( ) ; } } }
}
LogUtils
import org. apache. log4j. Level ;
import org. apache. log4j. Logger ; public class LogUtils { public static final Logger logger; static { logger = Logger . getLogger ( "logger" ) ; if ( ConfigUtils . IS_TEST ) { logger. setLevel ( Level . ALL ) ; } else { logger. setLevel ( Level . OFF ) ; } } public static void out ( String msg) { out ( LogUtils . class , msg) ; } public static < T > void out ( Class < T > callerClz, String msg) { if ( logger. isInfoEnabled ( ) ) { System . out. println ( callerClz. getName ( ) + ":" + msg) ; } }
}
ESHighRestUtil
import org. apache. http. HttpHost ;
import org. elasticsearch. action. bulk. BulkRequest ;
import org. elasticsearch. action. delete. DeleteRequest ;
import org. elasticsearch. client. RequestOptions ;
import org. elasticsearch. client. RestClient ;
import org. elasticsearch. client. RestClientBuilder ;
import org. elasticsearch. client. RestHighLevelClient ;
import org. elasticsearch. index. query. BoolQueryBuilder ;
import org. elasticsearch. index. query. TermQueryBuilder ;
import org. elasticsearch. index. reindex. DeleteByQueryRequest ;
import org. junit. Test ; import java. io. IOException ; public class ESHighRestUtil { private static RestHighLevelClient restHighLevelClient; public static RestHighLevelClient init ( String esHost) { HttpHost host1 = new HttpHost ( esHost, 9200 , "http" ) ; RestClientBuilder restClientBuilder = RestClient . builder ( host1) ; restHighLevelClient = new RestHighLevelClient ( restClientBuilder) ; return restHighLevelClient; } public static void deleteByQuery ( String esHost, String indexName, String splitFieldName, String splitFieldValue, String dataPartitionName, String dataPartitionValue) { restHighLevelClient = new ESHighRestUtil ( ) . init ( esHost) ; DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest ( indexName) ; TermQueryBuilder termQueryBuilder1 = new TermQueryBuilder ( splitFieldName, splitFieldValue) ; TermQueryBuilder termQueryBuilder2 = new TermQueryBuilder ( dataPartitionName, dataPartitionValue) ; BoolQueryBuilder boolQueryBuilder= new BoolQueryBuilder ( ) . must ( termQueryBuilder1) . must ( termQueryBuilder2) ; deleteByQueryRequest. setQuery ( boolQueryBuilder) ; try { restHighLevelClient. deleteByQuery ( deleteByQueryRequest, RequestOptions . DEFAULT ) ; restHighLevelClient. close ( ) ; } catch ( IOException e) { e. printStackTrace ( ) ; } } @Test public void deleteByQueryTest ( ) throws Exception { deleteByQuery ( "192.168.33.163" , "ins_ads_ins_record_analysis_202308" , "signin_date" , "2023-02-07" , "dt" , "2023-02-07" ) ; } @Test public void bulkDeleteTest ( ) throws IOException { restHighLevelClient = new ESHighRestUtil ( ) . init ( "192.168.20.175" ) ; BulkRequest bulkRequest = new BulkRequest ( ) ; bulkRequest. add ( new DeleteRequest ( "ins_ads_ins_record_analysis_202302" , "_doc" , "10001" ) ) ; restHighLevelClient. bulk ( bulkRequest, RequestOptions . DEFAULT ) ; restHighLevelClient. close ( ) ; } }
ConfigUtils
import com. typesafe. config. Config ;
import com. typesafe. config. ConfigFactory ; public class ConfigUtils { private static final String fileName = "application.properties" ; private static Config configInfo = ConfigFactory . load ( fileName) ; public static Boolean IS_TEST ; public static String IOT_ES_HOST ; public static String IOT_ES_HOST2 ; public static String IOT_ES_PORT ; public static String IOT_ES_PORT2 ; public static String WFS_MYSQL_HOST ; public static String WFS_MYSQL_USERNAME ; public static String WFS_MYSQL_PWD ; public static String ABI_MYSQL_HOST ; public static String ABI_MYSQL_USERNAME ; public static String ABI_MYSQL_PWD ; public static String SEGI_MYSQL_HOST ; public static String SEGI_MYSQL_USERNAME ; public static String SEGI_MYSQL_PWD ; public static String UAC_MYSQL_HOST ; public static String UAC_MYSQL_USERNAME ; public static String UAC_MYSQL_PWD ; public static String IOT_MYSQL_HOST ; public static String IOT_MYSQL_USERNAME ; public static String IOT_MYSQL_PWD ; public static String SDS_MYSQL_HOST ; public static String SDS_MYSQL_USERNAME ; public static String SDS_MYSQL_PWD ; public static String WMP_MYSQL_HOST ; public static String WMP_MYSQL_USERNAME ; public static String WMP_MYSQL_PWD ; public static String AZKABAN_MYSQL_HOST ; public static String AZKABAN_MYSQL_USERNAME ; public static String AZKABAN_MYSQL_PWD ; static { load ( ) ; } public ConfigUtils ( ) { if ( configInfo == null ) { configInfo = ConfigFactory . load ( fileName) ; } getProperties ( ) ; } public static void load ( ) { if ( configInfo == null ) { configInfo = ConfigFactory . load ( fileName) ; } getProperties ( ) ; } public static void getProperties ( ) { try { IOT_ES_HOST = configInfo. getString ( "iot.es.host" ) ; IOT_ES_PORT = configInfo. getString ( "iot.es.port" ) ; IOT_ES_HOST2 = configInfo. getString ( "iot.es.host2" ) ; IOT_ES_PORT2 = configInfo. getString ( "iot.es.port2" ) ; } catch ( Exception e) { e. printStackTrace ( ) ; } try { IS_TEST = configInfo. getBoolean ( "isTest" ) ; WFS_MYSQL_HOST = configInfo. getString ( "wfs.mysql.host" ) ; WFS_MYSQL_USERNAME = configInfo. getString ( "wfs.mysql.username" ) ; WFS_MYSQL_PWD = configInfo. getString ( "wfs.mysql.pwd" ) ; ABI_MYSQL_HOST = configInfo. getString ( "abi.mysql.host" ) ; ABI_MYSQL_USERNAME = configInfo. getString ( "abi.mysql.username" ) ; ABI_MYSQL_PWD = configInfo. getString ( "abi.mysql.pwd" ) ; SEGI_MYSQL_HOST = configInfo. getString ( "segi.mysql.host" ) ; SEGI_MYSQL_USERNAME = configInfo. getString ( "segi.mysql.username" ) ; SEGI_MYSQL_PWD = configInfo. getString ( "segi.mysql.pwd" ) ; UAC_MYSQL_HOST = configInfo. getString ( "uac.mysql.host" ) ; UAC_MYSQL_USERNAME = configInfo. getString ( "uac.mysql.username" ) ; UAC_MYSQL_PWD = configInfo. getString ( "uac.mysql.pwd" ) ; IOT_MYSQL_HOST = configInfo. getString ( "iot.mysql.host" ) ; IOT_MYSQL_USERNAME = configInfo. getString ( "iot.mysql.username" ) ; IOT_MYSQL_PWD = configInfo. getString ( "iot.mysql.pwd" ) ; SDS_MYSQL_HOST = configInfo. getString ( "sds.mysql.host" ) ; SDS_MYSQL_USERNAME = configInfo. getString ( "sds.mysql.username" ) ; SDS_MYSQL_PWD = configInfo. getString ( "sds.mysql.pwd" ) ; WMP_MYSQL_HOST = configInfo. getString ( "wmp.mysql.host" ) ; WMP_MYSQL_USERNAME = configInfo. getString ( "wmp.mysql.username" ) ; WMP_MYSQL_PWD = configInfo. getString ( "wmp.mysql.pwd" ) ; AZKABAN_MYSQL_HOST = configInfo. getString ( "azkaban.mysql.host" ) ; AZKABAN_MYSQL_USERNAME = configInfo. getString ( "azkaban.mysql.username" ) ; AZKABAN_MYSQL_PWD = configInfo. getString ( "azkaban.mysql.pwd" ) ; } catch ( Exception e) { e. printStackTrace ( ) ; } } public static boolean isWindowEvn ( ) { return System . getProperties ( ) . getProperty ( "os.name" ) . toLowerCase ( ) . contains ( "Windows" . toLowerCase ( ) ) ; } public static void main ( String [ ] args) {
System . out. println ( "wfs:" + ConfigUtils . WFS_MYSQL_HOST ) ; System . out. println ( "IOT_ES_HOST2:" + ConfigUtils . IOT_ES_HOST2 ) ; System . out. println ( "IOT_ES_PORT2:" + ConfigUtils . IOT_ES_PORT2 ) ; System . out. println ( "azkaban:" + ConfigUtils . AZKABAN_MYSQL_HOST ) ; } }
MysqlDao
import com. mingzhi. ConfigUtils ;
import com. mingzhi. common. utils. DateUtil ;
import com. mingzhi. common. utils. DruidConnection ;
import com. mingzhi. common. utils. StringUtil ;
import lombok. SneakyThrows ; import java. sql. Connection ;
import java. sql. PreparedStatement ;
import java. sql. ResultSet ;
import java. sql. SQLException ;
import java. util. Arrays ;
import java. util. HashMap ;
import java. util. Map ;
import java. util. Objects ;
import java. util. function. Consumer ;
public class MysqlDao { public static void main ( String [ ] args) throws Exception { try { String host = ConfigUtils . WFS_MYSQL_HOST ; String port = "3306" ; String database = "paascloud_wfs" ; String user = ConfigUtils . WFS_MYSQL_USERNAME ; String password = ConfigUtils . WFS_MYSQL_PWD ; final Map < String , Integer > resultMonth = new MysqlDao ( ) . getResultMonth ( host, "3306" , database, user, password, "ads_order_overall_cube,ads_order_result_cube" , "2020-11-29" ) ; final Map < String , Integer > result = new MysqlDao ( ) . getResultMap ( host, "3306" , database, user, password, "tbwork_order" , null , null ) ; final Map < String , Integer > resultQuarter = new MysqlDao ( ) . getResultQuarter ( host, "3306" , database, user, password, "ads_order_overall_cube" , "2023-08-29" ) ; } catch ( SQLException throwables) { throwables. printStackTrace ( ) ; } } public Map < String , Integer > getResultMap ( String host, String port, String database, String userName, String password, String tables, String dt, String dt1) throws Exception { HashMap < String , Integer > map = new HashMap < > ( ) ; Connection conn = DruidConnection . getConnection ( host, port, database, userName, password) ; Arrays . stream ( tables. split ( "," ) ) . forEach ( new Consumer < String > ( ) { @SneakyThrows @Override public void accept ( String t) { String sql; if ( dt == null ) { sql = "SELECT COUNT(*) as cnts FROM " + t + ";" ; } else { sql = "SELECT dt,COUNT(*) as cnts FROM " + t + " WHERE dt between '" + dt + "' and '" + dt1 + "' GROUP BY dt ORDER BY dt DESC;" ; } System . out. println ( "sql:" + sql) ; PreparedStatement statement = conn. prepareStatement ( sql) ; ResultSet rs = statement. executeQuery ( ) ; String dtx = "1970-01-01" ; int cnts = - 1 ; while ( rs. next ( ) ) { dtx = dt != null ? rs. getString ( "dt" ) : dtx; cnts = rs. getInt ( "cnts" ) ; map. put ( dtx, cnts) ; System . out. println ( "dt is " + dtx + " and cnts is " + cnts) ; }
} } ) ; DruidConnection . release ( null , null , conn) ; return map; } public Map < String , Integer > getResultMonth ( String host, String port, String database, String userName, String password, String tables, String dt) throws Exception { StringUtil . assertNotBlank ( dt, "dt can not null" ) ; final String monthStart = DateUtil . getMonthStart ( dt) ; final String monthEnd = DateUtil . getMonthEnd ( dt) ; return getResultMap ( host, port, database, userName, password, tables, monthStart, monthEnd) ; } public Map < String , Integer > getResultQuarter ( String host, String port, String database, String userName, String password, String tables, String dt) throws Exception { StringUtil . assertNotBlank ( dt, "dt can not null" ) ; final String monthStart = DateUtil . getMonthStart ( dt) ; final String monthEnd = DateUtil . getMonthEnd ( dt) ; return getResultMap ( host, port, database, userName, password, tables, monthStart, monthEnd) ; } public Map < String , Integer > getResultQuarter ( String host, String port, String database, String userName, String password, String tables, String dt) throws Exception { StringUtil . assertNotBlank ( dt, "dt can not null" ) ; String t_start = DateUtil . getQuarterStart ( dt) ; String t_end = DateUtil . getQuarterEnd ( dt) ; return getResultMap ( host, port, database, userName, password, tables, t_start, t_end) ; } public AzkabanExecInfo getAzkabanJobExecInfo ( String flow_id) throws SQLException { String host = ConfigUtils . AZKABAN_MYSQL_HOST ; String port = "3306" ; String database = "azkaban38" ; String user = ConfigUtils . AZKABAN_MYSQL_USERNAME ; String password = ConfigUtils . AZKABAN_MYSQL_PWD ; Connection conn = DruidConnection . getConnection ( host, port, database, user, password) ; String sql = "SELECT * FROM execution_flows where flow_id='" + flow_id + "' ORDER BY exec_id DESC limit 1;" ; PreparedStatement statement = conn. prepareStatement ( sql) ; ResultSet rs = statement. executeQuery ( ) ; String status = "" ; String submit_time = "00001" ; String end_time = "00001" ; while ( rs. next ( ) ) { status = rs. getString ( "status" ) ; submit_time = rs. getString ( "submit_time" ) ; end_time = rs. getString ( "end_time" ) ; } DruidConnection . release ( rs, statement, conn) ; String submitDay = DateUtil . getDateStrFromMill ( Long . parseLong ( submit_time) ) . split ( " " ) [ 0 ] ; String today = DateUtil . getToday ( ) ; String endTime = DateUtil . getDateStrFromMill ( Long . parseLong ( end_time) ) ; String reasonDesc = "" ; boolean okResult = today. equalsIgnoreCase ( submitDay) && "50" . equalsIgnoreCase ( status) ; if ( okResult) { } else { if ( ! Objects . equals ( today, submitDay) ) { reasonDesc = "submitDay is " + submitDay + " and today is " + today; } if ( "30" . equalsIgnoreCase ( status) ) { reasonDesc = flow_id + " is running,please wait for a while..." ; } else if ( "40" . equalsIgnoreCase ( status) ) { reasonDesc = flow_id + " is paused in " + endTime + "..." ; } else if ( "60" . equalsIgnoreCase ( status) ) { reasonDesc = flow_id + " is killed in " + endTime; } else if ( "70" . equalsIgnoreCase ( status) ) { reasonDesc = flow_id + " is failed in" + endTime; } } return new AzkabanExecInfo ( okResult, status, reasonDesc, endTime) ; } public static class AzkabanExecInfo { Boolean isOk; String status; String reason; String endTime; public AzkabanExecInfo ( Boolean isOk, String status, String reason, String endTime) { this . isOk = isOk; this . status = status; this . reason = reason; this . endTime = endTime; } public String getEndTime ( ) { return endTime; } public Boolean getOk ( ) { return isOk; } public String getStatus ( ) { return status; } public String getReason ( ) { return reason; } }
}
TableUtils
import com. mingzhi. ConfigUtils
import com. mingzhi. common. interf. IDate
import org. apache. spark. sql. functions. { col, desc, length}
import org. apache. spark. sql. types. _
import org. apache. spark. sql. { DataFrame , Dataset , Row , SparkSession }
import org. slf4j. LoggerFactory import java. sql. { Connection , DatabaseMetaData , DriverManager , ResultSet }
import java. util
import java. util. function. Consumer
import scala. collection. mutable
import scala. collection. mutable. ListBuffer object TableUtils { private val logger = LoggerFactory . getLogger ( TableUtils . getClass) def show ( frame: DataFrame ) : Unit = { show ( frame, 20 , false ) } def show ( frame: DataFrame , rows: Int , truncate: Boolean ) : Unit = { if ( ConfigUtils . IS_TEST ) { frame. show ( rows, truncate) } } def getFileCount ( spark: SparkSession , db: String , tableName: String ) : Integer = { spark. sql ( s"use $db" ) val f1 = spark. sql ( s"""||show table extended like '$tableName'||""" . stripMargin) val information = f1. select ( "information" ) . collect ( ) . mkString ( "@" ) println ( "information:" + information) 1000 } def delPartitions ( spark: SparkSession , dt: String , dt1: String , to_db: String , to_table: String ) : Unit = { if ( isPartitionedByDt ( spark, to_db, to_table) ) { new IDate { override def onDate ( dt: String ) : Unit = { spark. sql ( s"ALTER TABLE $to_db.$to_table DROP IF EXISTS PARTITION (dt='${dt}')" ) } } . invoke ( dt, dt1) } else { println ( s"$to_table is not partitioned by dt,skip...." ) } } def isPartitionExists ( spark: SparkSession , db: String , table: String , dt: String ) : Boolean = { spark. sql ( s"use $db" ) val frame = spark. sql ( s" show partitions $table" ) frame. show ( false ) import scala. collection. JavaConversions . _val dts: mutable. Seq[ String ] = frame. collectAsList ( ) . map ( { case Row ( ct: String ) = > ct. split ( "=" ) ( 1 ) : String } ) println ( dts. toString ( ) ) val isPartitionExist = dts. contains ( dt) println ( "is partition exist:" + isPartitionExist) isPartitionExist} def getCompressType ( spark: SparkSession , db: String , table: String ) : String = { val frame = spark. sql ( s" show create table $table" ) val createTableStatement = frame. select ( "createtab_stmt" ) . collect ( ) . toSet. mkString ( "," ) if ( createTableStatement. contains ( "USING parquet" ) ) { "parquet" } else if ( createTableStatement. contains ( "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe" ) ) { "HiveFileFormat" } else { throw new UnsupportedOperationException ( "未知的压缩类型" ) } } private def isPartitionedByDt ( spark: SparkSession , db: String , table: String ) : Boolean = { spark. sql ( s"use $db" ) val frame = spark. sql ( s" show create table $table" ) val createTableStatement = frame. select ( "createtab_stmt" ) . collect ( ) . toSet. mkString ( "," ) createTableStatement. contains ( "PARTITIONED BY (`dt` STRING)" ) || createTableStatement. contains ( "PARTITIONED BY (dt" ) } def tableExists ( spark: SparkSession , to_db: String , to_table: String ) : Boolean = { spark. sql ( s"use $to_db" ) val frame_table = spark. sql ( s"show tables" ) val str = frame_table. filter ( r = > { ! r. getAs[ Boolean ] ( "isTemporary" ) } ) . select ( "tableName" ) . collect ( ) . toSet. mkString ( "," ) str. contains ( s"[${to_table.toLowerCase}]" ) } def queryExecuteDate ( spark: SparkSession , sql: String , doDate: String ) : Array [ String ] = { val querySql = sql. replaceAll ( "do_date" , doDate) ; logger. info ( "querySql=" + querySql) val frame_table = spark. sql ( querySql) . distinct ( ) . cache ( ) . filter ( " date is not null " ) var data = frame_table. rdd. map ( line = > line. getAs ( "date" ) . toString) . distinct ( ) . collect ( ) . sortedif ( data. length == 0 ) data = Array ( doDate) data. foreach ( date = > { logger. info ( s"$doDate 修改过那一天的数据:" + date) } ) data} def queryExecuteDate ( spark: SparkSession , sql: String , beginDate: String , endDate: String ) : Array [ String ] = { val querySql = sql. replaceAll ( "begin_date" , beginDate) . replaceAll ( "end_date" , endDate) ; logger. info ( "querySql=" + querySql) val frame_table = spark. sql ( querySql) . distinct ( ) . cache ( ) . filter ( " date is not null " ) var data: Array [ String ] = frame_table. rdd. map ( line = > line. getAs ( "date" ) . toString) . distinct ( ) . collect ( ) . sortedif ( data. length == 0 ) { data = DateUtils . getDateList ( beginDate, endDate) } data. foreach ( date = > { logger. info ( s"【$beginDate,$endDate】修改过那一天的数据:" + date) } ) data} def queryExecuteMonth ( spark: SparkSession , sql: String , doDate: String ) : Array [ String ] = { val querySql = sql. replaceAll ( "do_date" , doDate) ; logger. info ( "querySql=" + querySql) val frame_table = spark. sql ( querySql) . distinct ( ) . cache ( ) . filter ( " date is not null " ) var data = frame_table. rdd. map ( line = > line. getAs ( "date" ) . toString) . distinct ( ) . collect ( ) . sortedif ( data. length == 0 ) data = Array ( doDate) data. foreach ( date = > { logger. info ( s"$doDate 修改过那一天的数据:" + date) } ) data} def formattedData ( spark: SparkSession , frame: DataFrame ) : DataFrame = { var schema = frame. schemaval fields: Array [ StructField ] = schema. fieldsvar resultDataFrame = frame. rdd. mapPartitions ( it = > { var result = new ListBuffer [ Row ] ( ) it. foreach ( record = > { var list = new ListBuffer [ Any ] ( ) fields. foreach ( StructField = > { val clunnName = StructField . nameval dateType = StructField . dataTypedateType match { case _: ByteType = > list. append ( record. getAs ( clunnName) ) case _: ShortType = > list. append ( record. getAs ( clunnName) ) case _: IntegerType = > list. append ( record. getAs ( clunnName) ) case _: LongType = > list. append ( record. getAs ( clunnName) ) case _: BooleanType = > list. append ( record. getAs ( clunnName) ) case _: FloatType = > list. append ( record. getAs ( clunnName) ) case _: DoubleType = > list. append ( record. getAs ( clunnName) ) case _: StringType = > if ( record. getAs ( clunnName) == null ) { list. append ( record. getAs ( clunnName) ) } else { list. append ( record. getAs ( clunnName) . toString. replaceAll ( "\r" , "" ) . replaceAll ( "\n" , "" ) ) } case _: TimestampType = > list. append ( record. getAs ( clunnName) ) case _: DateType = > list. append ( record. getAs ( clunnName) ) case _: DecimalType = > list. append ( record. getAs ( clunnName) ) case _ = > list. append ( record. getAs ( clunnName) ) } } ) result. append ( Row . fromSeq ( list) ) } ) result. iterator} ) val targetDataFrame = spark. createDataFrame ( resultDataFrame, schema) . drop ( "@timestamp" , "@version" ) targetDataFrame} def createMysqlTableDDL ( t_name: String , frame: DataFrame ) : String = { var sql = s"create table if not EXISTS $t_name(\n" frame. schema. foreach ( f = > { val col_name = f. namevar tp = "varchar(64)" val comment: Option [ String ] = f. getComment ( ) tp = f. dataType match { case _: StringType = > { val df: Dataset [ Row ] = frame. select ( col_name) . groupBy ( col_name) . agg ( length ( col ( col_name) ) . as ( s"len_${col_name}" ) ) . orderBy ( desc ( s"len_${col_name}" ) )
var len_str = df. select ( s"len_${col_name}" ) . take ( 1 ) . mkString. replace ( "[" , "" ) . replace ( "]" , "" ) if ( "null" . equalsIgnoreCase ( len_str) ) { len_str = "32" } var len = len_str. toIntif ( len == 0 ) { len = 10 } else if ( len > 0 && len < 1024 ) { len = len * 3 } else { len = ( len * 1.5 ) . toInt} if ( len <= 10000 ) { s"varchar($len)" } else { "text" } } case _: DateType | _: TimestampType = > "varchar(64)" case _: IntegerType = > "int" case _: LongType = > "bigint" case _: DoubleType | _: FloatType | _: DecimalType = > "decimal(16,4)" case _ = > "text" } sql = sql + s"`$col_name` $tp COMMENT '${comment.getOrElse(" ") } ', \n"} ) val ddl_create = sql. substring ( 0 , sql. length - 2 ) + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;\n" ddl_create} def updateMysqlTableDDL ( t_name: String , frame: DataFrame , metaData: DatabaseMetaData ) : String = { val targetNameAndLength = new util. HashMap[ String , Integer ] ( ) val sb = new StringBuilder ( ) val rs: ResultSet = metaData. getColumns ( null , "%" , t_name, "%" ) var columnName2 = "" var columnType2 = "" var COLUMN_SIZE2 = "0" val count = rs. getMetaData. getColumnCountwhile ( rs. next ( ) ) { columnName2 = rs. getString ( "COLUMN_NAME" ) COLUMN_SIZE2 = rs. getString ( "COLUMN_SIZE" ) columnType2 = rs. getString ( "TYPE_NAME" ) println ( "columnType2:" + columnType2) targetNameAndLength. put ( columnName2. toLowerCase ( ) , COLUMN_SIZE2 . toInt) } println ( "targetNameAndLength:" + targetNameAndLength) frame. schema. foreach ( f = > { val col_name = f. name. toLowerCase ( ) val lenInMysql: Integer = targetNameAndLength. getOrDefault ( col_name, - 1 ) if ( lenInMysql == - 1 ) { println ( s"add col to mysql: $col_name" ) f. dataType match { case _: StringType = > { } case _: DateType | _: TimestampType = > { sb. append ( s"alter table ${t_name} add column ${col_name} varchar(64);\n" ) } case _: IntegerType = > sb. append ( s"alter table ${t_name} add column ${col_name} int;\n" ) case _: LongType = > sb. append ( s"alter table ${t_name} add column ${col_name} bigint;\n" ) case _: DoubleType | _: FloatType | _: DecimalType = > sb. append ( s"alter table ${t_name} add column ${col_name} decimal(16,4);\n" ) case _ = > sb. append ( s"alter table ${t_name} add column ${col_name} text;\n" ) } } f. dataType match { case _: StringType = > { val df: Dataset [ Row ] = frame. select ( col_name) . groupBy ( col_name) . agg ( length ( col ( col_name) ) . as ( s"len_${col_name}" ) ) . orderBy ( desc ( s"len_${col_name}" ) ) var len_str = df. select ( s"len_${col_name}" ) . take ( 1 ) . mkString. replace ( "[" , "" ) . replace ( "]" , "" ) if ( "null" . equalsIgnoreCase ( len_str) ) { len_str = "99" } val len = len_str. toIntprintln ( s"col ${col_name} max length:" + len) if ( lenInMysql == - 1 ) { sb. append ( s"alter table ${t_name} add column ${col_name} varchar($len);\n" ) } if ( lenInMysql >= 0 && lenInMysql < len) { sb. append ( s"ALTER TABLE ${t_name} MODIFY ${col_name} VARCHAR(${len});\n" ) } } case _ = > { println ( s"other type ${f.dataType},do nothing" ) } } } ) val ddl_update = sb. toString ( ) println ( "ddl update:" + ddl_update) ddl_update} def createCaseClass ( clzName: String , frame: DataFrame ) : String = { var sql = s"case class $clzName(\n" frame. schema. foreach ( s = > { val name = s. namevar tp = "String" tp = s. dataType. typeName match { case "string" | "date" | "timestamp" = > "String" case "integer" = > "Integer" case "double" | "float" = > "Double" case "long" = > "Long" case "array" = > "Array[Any]" case "map" = > "Map[String,String]" case _ = > if ( s. dataType. typeName. contains ( "decimal" ) ) { "BigDecimal" } else { throw new IllegalArgumentException ( s"未知的数据类型:${s.dataType},${s.dataType.typeName}" ) } } sql = sql + s"var $name:$tp,\n" } ) sql. substring ( 0 , sql. length - 2 ) + "\n)" } def addComment ( spark: SparkSession , f: DataFrame , commentMap: java. util. Map[ String , String ] ) : DataFrame = { val schema = f. schema. map ( s = > { s. withComment ( commentMap. getOrDefault ( s. name, "" ) ) } ) spark. createDataFrame ( f. rdd, StructType ( schema) ) } def createQuerySql ( url: String , userName: String , pwd: String , t_source: String , dropCols: String ) : String = { val sb = new StringBuilder ( ) val list = new util. ArrayList[ String ] ( ) var conn: Connection = null var querySql = "" try { conn = DriverManager . getConnection ( url, userName, pwd) val rs: ResultSet = conn. getMetaData. getColumns ( null , "%" , t_source, "%" ) while ( rs. next ( ) ) { val columnName = rs. getString ( "COLUMN_NAME" ) println ( "columnName:" + columnName) list. add ( columnName) } dropCols. split ( ",|-" ) . foreach ( col = > { println ( s"drop col====>${col}" ) list. remove ( col) } ) list. forEach ( new Consumer [ String ] { override def accept ( t: String ) : Unit = { sb. append ( "`" + t + "`" ) . append ( "," ) } } ) val cols = sb. substring ( 0 , sb. size - 1 ) querySql = s"select $cols from $t_source" println ( "querySql:" + querySql) } catch { case e: Exception = > println ( e) } finally { DruidConnection . release ( null , null , conn) } querySql} def main ( args: Array [ String ] ) : Unit = { val spark: SparkSession = SparkUtils . getBuilder. master ( "local[*]" ) . getOrCreate ( ) println ( isPartitionedByDt ( spark, "paascloud" , "dwd_order_info_abi" ) ) println ( getCompressType ( spark, "paascloud" , "dwd_order_info_abi" ) ) testGetFileCount ( spark, "common" , "t_uac_user" ) testGetFileCount ( spark, "paascloud" , "dwd_order_info_abi" ) } def testGetFileCount ( spark: SparkSession , db: String , tableName: String ) : Unit = { getFileCount ( spark, db, tableName) } def testCreateQuerySql ( ) : Unit = { var host = "192.168.32.179" var port = "3308" val db = "vwork" val userName = "root" val pwd = "123456" val target_table = "administrative_organization" val url = s"jdbc:mysql://${host}:${port}/${db}?useUnicode=true&characterEncoding=UTF-8&connectionRequestTimout=300000&connectionTimeout=300000&socketTimeout=300000&useSSL=false" createQuerySql ( url, userName, pwd, target_table, "dt,x" ) createQuerySql ( url, userName, pwd, target_table, "org_code_macro" ) } def testGetMysqlMetaData ( frame_result: DataFrame ) : Unit = { var host = "192.168.32.179" var port = "3308" val db = "vwork" val userName = "root" val pwd = "123456" val target_table = "person" val url = s"jdbc:mysql://${host}:${port}/${db}?useUnicode=true&characterEncoding=UTF-8&connectionRequestTimout=300000&connectionTimeout=300000&socketTimeout=300000&useSSL=false" var conn: Connection = null try { conn = DriverManager . getConnection ( url, userName, pwd) println ( "updateMysqlTableDDL:" + updateMysqlTableDDL ( target_table, frame_result, conn. getMetaData) ) } catch { case e: Exception = > logger. error ( "MySQL exception {}:" , e) } finally { if ( null != conn && ! conn. isClosed) { conn. close ( ) } } }
}