基础工具类

IDate

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}trait IDate extends Serializable {def onDate(dt: String): Unitdef invoke(dt: String, dt1: String) = {if (dt1 < dt) {throw new IllegalArgumentException(s"dt1:${dt1}小于dt:${dt}")}// 日期格式化var sdf: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")// 起始日期val d1: Date = sdf.parse(dt)// 结束日期var d2: Date = sdf.parse(dt1)var tmp: Date = d1var dd: Calendar = Calendar.getInstancedd.setTime(d1)while (tmp.getTime <= d2.getTime) {tmp = dd.getTimeSystem.out.println("IDate遍历日期:" + sdf.format(tmp))onDate(sdf.format(tmp))// 天数加上1dd.add(Calendar.DAY_OF_MONTH, 1)tmp = dd.getTime}}
}

IMonth

import com.mingzhi.common.utils.DateUtil/*** 回调每月第一天的日期* [dt,dt1]包含的的所有MonthStart*/
trait IMonth extends Serializable {def onMonth(dt: String): Unitdef invoke(dt: String, dt1: String): Unit = {if (dt.equals(dt1)) {onMonth(DateUtil.getMonthStart(dt))} else {new IDate {override def onDate(dt: String): Unit = {val monthStart = DateUtil.getMonthStart(dt)if (dt.equalsIgnoreCase(monthStart)) {System.out.println("IMonth遍历日期:" + dt)onMonth(dt)}}}.invoke(dt, dt1)}}
}

MySaveMode


import org.apache.spark.annotation.InterfaceStability@InterfaceStability.Stable
object MySaveMode extends Enumeration {type MySaveMode = Valueval/*** 全表覆盖*/OverWriteAllTable,/*** 按dt进行sink,覆盖*/OverWriteByDt,/*** 按季度覆盖(涉及到的业务是民主测评)*/OverWriteByQuarter,/*** 月报或者按月分区快照*/OverWriteByMonth,/*** Ignore mode means that when saving a DataFrame to a data source, if data already exists,* the save operation is expected to not save the contents of the DataFrame and to not* change the existing data.** @since 1.3.0*/Ignore = Value
}

LoadStrategy

import org.apache.spark.annotation.InterfaceStability@InterfaceStability.Stable
object LoadStrategy extends Enumeration {type strategy = ValuevalALL,NEW_AND_CHANGE,NEW = Value
}```
## HiveUtil```java
import org.apache.spark.sql.SparkSessionobject HiveUtil {/*** 开启动态分区,非严格模式** @param spark*/def openDynamicPartition(spark: SparkSession) = {spark.sql("set hive.exec.dynamic.partition=true")spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")spark.sql("set spark.sql.shuffle.partitions=18")}/*** 使用snappy压缩** @param spark*/def useSnappyCompression(spark: SparkSession) = {spark.sql("set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec");spark.sql("set mapreduce.output.fileoutputformat.compress=true")spark.sql("set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec")}}

SparkUtils

import com.mingzhi.ConfigUtils
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.elasticsearch.hadoop.cfg.ConfigurationOptionsobject SparkUtils {def getBuilder: SparkSession.Builder = {val builder = SparkSession.builder().config("dfs.replication", 1).config("hive.exec.dynamic.partition.mode", "nonstrict").config("spark.sql.parquet.writeLegacyFormat", "true")//Controls whether to clean checkpoint files if the reference is out of scope..config("spark.cleaner.referenceTracking.cleanCheckpoints", value = true).config("spark.cleaner.periodicGC.interval", "1min")/*** The SQL config 'spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation' was removed in the version 3.0.0. It was removed to prevent loosing of users data for non-default value.*/.config("spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation", "true").config("hive.exec.dynamic.partition", "true").config("spark.sql.shuffle.partitions", "12").config(ConfigurationOptions.ES_PORT, 9200).config(ConfigurationOptions.ES_MAPPING_DATE_RICH_OBJECT, value = false).config(ConfigurationOptions.ES_SCROLL_SIZE, 10000).config(ConfigurationOptions.ES_MAX_DOCS_PER_PARTITION, 1000000).config(ConfigurationOptions.ES_HTTP_TIMEOUT, "5m").config(ConfigurationOptions.ES_SCROLL_KEEPALIVE, "10m").enableHiveSupport()if (ConfigUtils.isWindowEvn) {builder.config("spark.ui.port", "4040").master("local[*]")}builder}def setCheckPoint(spark: SparkSession, dir: String): Unit = {var ckPath = "/ck/" + dirif (ConfigUtils.isWindowEvn) {} else {ckPath = s"hdfs://mz-hadoop-01:8020/ck/$dir"}spark.sparkContext.setCheckpointDir(ckPath)}def persistDataFrame(spark: SparkSession, frame: DataFrame): (DataFrame, String) = {val db = "temp"val table = StringUtil.getRandomString(20)val tempTableName = s"${db}.${table}_" + System.currentTimeMillis()frame.write.format("parquet").mode(SaveMode.Overwrite).saveAsTable(tempTableName)(spark.sql(s"select * from $tempTableName "), tempTableName)}def unPersistDataFrame(spark: SparkSession, tableName: String): DataFrame = {spark.sql(s"drop table ${tableName} ")}
}

StringUtil

package com.mingzhi.common.utils;import org.apache.commons.lang3.StringUtils;import java.util.Objects;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;public class StringUtil {public static void main(String[] args) {String regEx1 = "[\\u4e00-\\u9fa5]";String regEx2 = "[a-z||A-Z]";String regEx3 = "[0-9]";String str = "0我,大傻逼?!¥{}。。。c";String s1 = getCharacter(str);
//        String s2 = matchResult(Pattern.compile(regEx2), str);
//        String s3 = matchResult(Pattern.compile(regEx3), str);
//        System.out.println(s1 + "\n" + s2 + "\n" + s3);System.out.println(s1);System.out.println(getRandomString(10));}public static String getCharacter(String str) {String regEx1 = "[\\u4e00-\\u9fa5]";regEx1 = "[(\\u4e00-\\u9fa5)(\\u3002|\\uff1f|\\uff01|\\uff0c|\\u3001|\\uff1b|\\uff1a|\\u201c|\\u201d|\\u2018|\\u2019|\\uff08|\\uff09|\\u300a|\\u300b|\\u3010|\\u3011|\\u007e)]";return matchResult(Pattern.compile(regEx1), str);}public static boolean isBlank(CharSequence sequence) {return StringUtils.isBlank(sequence);}public static boolean isNotBlank(CharSequence sequence) {return StringUtils.isNotBlank(sequence);}public static void assertNotBlank(String filed, String msg) {Objects.requireNonNull(filed, msg);}private static String matchResult(Pattern p, String str) {StringBuilder sb = new StringBuilder();Matcher m = p.matcher(str);while (m.find()) {for (int i = 0; i <= m.groupCount(); i++) {sb.append(m.group());}}return sb.toString();}private static final String CHARACTERS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";// 生成指定长度的随机字符串,可包含大写和小写字母和数字public static String getRandomString(int length) {StringBuilder sb = new StringBuilder();Random random = new Random();for (int i = 0; i < length; i++) {int index = random.nextInt(CHARACTERS.length());sb.append(CHARACTERS.charAt(index));}return sb.toString();}
}

JsonUtil

import com.alibaba.fastjson.JSON;public class JsonUtil {public static <T> T getObjFromJson(String json, Class<T> tClass) {try {return JSON.parseObject(json, tClass);} catch (Exception e) {return null;}}public static String getJsonStrFromObj(Object o) {return JSON.toJSONString(o);}
}

FileUtil

import org.apache.commons.io.FileUtils;import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;public class FileUtil {public static void main(String[] args) throws IOException {write("xxx1.txt", "hello\nmoto1");System.out.println(read("xxx1.txt"));}public static String read(String resourceName) {StringBuilder sb = new StringBuilder();InputStream inputStream = null;BufferedInputStream bufferedInputStream = null;BufferedReader bufferedReader = null;try {File f = new File(resourceName);inputStream = new FileInputStream(f);bufferedInputStream = new BufferedInputStream(inputStream);bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream, StandardCharsets.UTF_8), 2 * 1024 * 1024);String line = "";while ((line = bufferedReader.readLine()) != null) {sb.append(line).append("\n");}} catch (IOException e) {e.printStackTrace();} finally {try {assert bufferedReader != null;bufferedReader.close();bufferedInputStream.close();inputStream.close();} catch (IOException e) {e.printStackTrace();}}return sb.toString();}public static void write(String resourceName, String data) throws IOException {File f = new File(resourceName);System.out.println("Writing to " + f.getCanonicalPath());ArrayList<String> lines = new ArrayList<>();lines.add(data);FileUtils.writeLines(f, lines, false);}public static String readFromResource(String resourceName) {StringBuilder sb = new StringBuilder();InputStream inputStream = null;BufferedInputStream bufferedInputStream = null;BufferedReader bufferedReader = null;try {
//            inputStream = org.apache.commons.io.FileUtils.class.getClassLoader().getResourceAsStream(resourceName);inputStream = ClassLoader.getSystemResourceAsStream(resourceName);assert inputStream != null;bufferedInputStream = new BufferedInputStream(inputStream);bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream, StandardCharsets.UTF_8), 2 * 1024 * 1024);String line = "";while ((line = bufferedReader.readLine()) != null) {sb.append(line).append("\n");}} catch (IOException e) {e.printStackTrace();} finally {try {assert bufferedReader != null;bufferedReader.close();bufferedInputStream.close();inputStream.close();} catch (IOException e) {e.printStackTrace();}}return sb.toString();}
}

DateUtil

package com.mingzhi.common.utils;import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.temporal.TemporalAdjusters;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;public class DateUtil {public static void main(String[] args) throws ParseException {System.out.println(getDateStrFromMill(1617701847078L));System.out.println("today:" + getToday());System.out.println("yesterday:" + getYesterday());System.out.println("weekStart" + getWeekStart());System.out.println(getMonthStart());System.out.println(getWeek("2021-07-01 01:00:00"));System.out.println(getTime("2021-07-01 01:00:00"));Long mills = getMillsFromString("2022-04-07 17:01:02");//1649322062000System.out.println(mills);System.out.println(DateUtil.getDateStrFromMill(mills));System.out.println(getLastDay("2022", "02"));System.out.println(getQuarterStart("2022-09-18"));System.out.println(getQuarterEnd("2022-09-30"));System.out.println(getQuarterEnd("2022-09-31"));}public static boolean isValidDate(String str) {boolean valid = true;// 指定日期格式为四位年/两位月份/两位日期,注意yyyy/MM/dd区分大小写;SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {// 设置lenient为false.// 否则SimpleDateFormat会比较宽松地验证日期,比如2007/02/29会被接受,并转换成2007/03/01format.setLenient(false);format.parse(str);} catch (Exception e) {// 如果throw java.text.ParseException或者NullPointerException,就说明格式不对valid = false;}return valid;}public static ArrayList<String> getDateList(String beginDateStr, String endDateStr) throws ParseException {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");int dateFiled = Calendar.DAY_OF_MONTH;Date beginDate = dateFormat.parse(beginDateStr);Date endDate = dateFormat.parse(endDateStr);Calendar calendar = Calendar.getInstance();calendar.setTime(beginDate);
//        ArrayBuffer<String> dateArray = new scala.collection.mutable.ArrayBuffer();final ArrayList<String> list = new ArrayList<>();while (beginDate.compareTo(endDate) <= 0) {
//            dateArray += dateFormat.format(beginDate)list.add(dateFormat.format(beginDate));calendar.add(dateFiled, 1);beginDate = calendar.getTime();}return list;}public static boolean before(String dt, String dt1) throws ParseException {SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");Date dateTime1 = dateFormat.parse(dt);Date dateTime2 = dateFormat.parse(dt1);return dateTime1.before(dateTime2);}public static String getYesterday() {return getDateStrFromMill(System.currentTimeMillis() - 3600 * 1000 * 24).split(" ")[0];}public static String back1Week(String t) {return getDateStrFromMill(getMillsFromString(t) - 60 * 60 * 1000 * 168);}/*** dt的后一天*/public static String next1Day(String t) {return getDateStrFromMill(getMillsFromString(t) + 60 * 60 * 1000 * 24);}/*** dt的前一天*/public static String back1Day(String t) {return getDateStrFromMill(getMillsFromString(t) - 60 * 60 * 1000 * 24);}public static Long getWindowStartWithOffset(Long timestamp, Long offset, Long windowSize) {return timestamp - (timestamp - offset + windowSize) % windowSize;}public static String back1Quarter(String t) {return getDateStrFromMill(getMillsFromString(t) - 15 * 60 * 1000);}/*** https://blog.csdn.net/liaomingwu/article/details/122498015*/public static String getQuarterStart(String dt) throws ParseException {final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");sdf.setLenient(false);final Date dBegin = sdf.parse(dt);final Calendar calendar = Calendar.getInstance();calendar.setTime(dBegin);final int remainder = calendar.get(Calendar.MONTH) % 3;int month = (remainder != 0) ? calendar.get(Calendar.MONTH) - remainder : calendar.get(Calendar.MONTH);calendar.set(Calendar.MONTH, month);calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMinimum(Calendar.DAY_OF_MONTH));calendar.setTime(calendar.getTime());return sdf.format(calendar.getTime());}public static String getQuarterEnd(String dt) throws ParseException {String year = dt.substring(0, 4);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");// 设置lenient为false.// 否则SimpleDateFormat会比较宽松地验证日期,比如2007/02/29会被接受,并转换成2007/03/01sdf.setLenient(false);Calendar calendar = Calendar.getInstance();calendar.setTime(sdf.parse(dt));int quarter = calendar.get(Calendar.MONTH) / 3 + 1;int lastMonth = quarter * 3 - 1;// 根据月获取开始时间Calendar cal = Calendar.getInstance();cal.set(Calendar.YEAR, Integer.parseInt(year));cal.set(Calendar.MONTH, lastMonth);cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH));cal.set(Calendar.HOUR, 0);cal.set(Calendar.MINUTE, 0);cal.set(Calendar.SECOND, 0);return sdf.format(cal.getTime());}/*** yyyy-MM-dd HH:mm:ss*/public static Long getMillsFromString(String time) {long mill = 0L;try {mill = ThreadSafeFormatter.dateFormatter.get().parse(time).getTime();} catch (ParseException e) {e.printStackTrace();}return mill;}public static String getDateStrFromMill(long ts) {return ThreadSafeFormatter.dateFormatter.get().format(new Date(ts));}public static Date getDate(String dateTime) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");try {Date date = sdf.parse(dateTime);return date;} catch (ParseException e) {e.printStackTrace();}return null;}/*** @param dateTime 2021-07-07 09:51:30* @return*/public static String getWeek(String dateTime) {String[] weekDays = {"sunday", "monday", "tuesday", "wednesday", "thursday", "friday", "saturday"};Calendar cal = Calendar.getInstance();cal.setTime(DateUtil.getDate(dateTime));int w = cal.get(Calendar.DAY_OF_WEEK) - 1;if (w < 0)w = 0;return weekDays[w];}/*** @param dateTime 2021-07-07 09:51:30* @return*/public static String getTime(String dateTime) {SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");String time = sdf.format(DateUtil.getDate(dateTime));return time;}/*** 获取今天*/public static String getToday() {return new SimpleDateFormat("yyyy-MM-dd").format(new Date());}/*** 当前周第一天*/public static String getWeekStart() {Calendar cal = Calendar.getInstance();cal.add(Calendar.WEEK_OF_MONTH, 0);cal.set(Calendar.DAY_OF_WEEK, 2);Date time = cal.getTime();return new SimpleDateFormat("yyyy-MM-dd").format(time);}/*** 获取本周的最后一天**/public static String getWeekEnd() {Calendar cal = Calendar.getInstance();cal.set(Calendar.DAY_OF_WEEK, cal.getActualMaximum(Calendar.DAY_OF_WEEK));cal.add(Calendar.DAY_OF_WEEK, 1);Date time = cal.getTime();return new SimpleDateFormat("yyyy-MM-dd").format(time);}/*** 获取本月开始日期**/public static String getMonthStart() {Calendar cal = Calendar.getInstance();cal.add(Calendar.MONTH, 0);cal.set(Calendar.DAY_OF_MONTH, 1);Date time = cal.getTime();return new SimpleDateFormat("yyyy-MM-dd").format(time);}/*** 获取本月最后一天**/public static String getMonthEnd() {Calendar cal = Calendar.getInstance();cal.set(Calendar.DAY_OF_MONTH, cal.getActualMaximum(Calendar.DAY_OF_MONTH));Date time = cal.getTime();return new SimpleDateFormat("yyyy-MM-dd").format(time);}/*** 指定年月的最后一天*/public static String getLastDay(String year, String month) {LocalDate first = LocalDate.of(Integer.parseInt(year), Integer.parseInt(month), 1);LocalDate last = first.with(TemporalAdjusters.lastDayOfMonth());return last.toString();}/*** dt所在月的首天*/public static String getMonthStart(String dt) {final LocalDate localDate = LocalDate.parse(dt);return localDate.with(TemporalAdjusters.firstDayOfMonth()).toString();}/*** dt所在月的最后一天*/public static String getMonthEnd(String dt) {final LocalDate localDate = LocalDate.parse(dt);return localDate.with(TemporalAdjusters.lastDayOfMonth()).toString();}}class ThreadSafeFormatter {public static ThreadLocal<SimpleDateFormat> dateFormatter= ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
}

DruidConnection

import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.SneakyThrows;import javax.sql.DataSource;
import java.sql.*;
import java.util.Properties;public class DruidConnection {private final DataSource dataSource;private static volatile DruidConnection instance = null;@SneakyThrowsprivate DruidConnection(String host, String port, String db, String username, String pwd) {Properties props = new Properties();props.put("driverClassName", "com.mysql.jdbc.Driver");props.put("url", "jdbc:mysql://" + host + ":" + port + "/" + db + "?characterEncoding=UTF-8");props.put("username", username);props.put("password", pwd);dataSource = DruidDataSourceFactory.createDataSource(props);}public static Connection getConnection(String host, String port, String db, String username, String pwd) throws SQLException {if (instance == null) {synchronized (DruidConnection.class) {if (instance == null) {instance = new DruidConnection(host, port, db, username, pwd);}}}return instance.dataSource.getConnection();}@SneakyThrowspublic static void release(ResultSet resultSet, Statement statement, Connection connection) {try {if (resultSet != null) {resultSet.close();}if (statement != null) {statement.close();}} catch (SQLException e) {e.printStackTrace();} finally {if (connection != null) {connection.close();}}}
}

LogUtils

import org.apache.log4j.Level;
import org.apache.log4j.Logger;public class LogUtils {public static final Logger logger;static {logger = Logger.getLogger("logger");if (ConfigUtils.IS_TEST) {logger.setLevel(Level.ALL);} else {logger.setLevel(Level.OFF);}}public static void out(String msg) {out(LogUtils.class, msg);}public static <T> void out(Class<T> callerClz, String msg) {if (logger.isInfoEnabled()) {System.out.println(callerClz.getName() + ":" + msg);}}
}

ESHighRestUtil


import org.apache.http.HttpHost;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.junit.Test;import java.io.IOException;public class ESHighRestUtil {private static RestHighLevelClient restHighLevelClient;public static RestHighLevelClient init(String esHost) {HttpHost host1 = new HttpHost(esHost, 9200, "http");RestClientBuilder restClientBuilder = RestClient.builder(host1);restHighLevelClient = new RestHighLevelClient(restClientBuilder);return restHighLevelClient;}public static  void deleteByQuery(String esHost,String indexName,String splitFieldName,String splitFieldValue,String dataPartitionName,String dataPartitionValue){restHighLevelClient = new ESHighRestUtil().init(esHost);DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(indexName);TermQueryBuilder termQueryBuilder1 = new TermQueryBuilder(splitFieldName, splitFieldValue);TermQueryBuilder termQueryBuilder2 = new TermQueryBuilder(dataPartitionName, dataPartitionValue);BoolQueryBuilder boolQueryBuilder=new BoolQueryBuilder().must(termQueryBuilder1).must(termQueryBuilder2);deleteByQueryRequest.setQuery(boolQueryBuilder);try {restHighLevelClient.deleteByQuery(deleteByQueryRequest, RequestOptions.DEFAULT);restHighLevelClient.close();} catch (IOException e) {e.printStackTrace();}}/*** 根据条件删除测试* @throws Exception*/@Testpublic void deleteByQueryTest() throws Exception {deleteByQuery("192.168.33.163","ins_ads_ins_record_analysis_202308","signin_date","2023-02-07","dt","2023-02-07");}@Testpublic void bulkDeleteTest() throws IOException {restHighLevelClient = new ESHighRestUtil().init("192.168.20.175");BulkRequest bulkRequest = new BulkRequest();bulkRequest.add(new DeleteRequest("ins_ads_ins_record_analysis_202302", "_doc", "10001"));restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);restHighLevelClient.close();}}

ConfigUtils

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;public class ConfigUtils {private static final String fileName = "application.properties";private static Config configInfo = ConfigFactory.load(fileName);public static Boolean IS_TEST;public static String IOT_ES_HOST;public static String IOT_ES_HOST2;public static String IOT_ES_PORT;public static String IOT_ES_PORT2;public static String WFS_MYSQL_HOST;public static String WFS_MYSQL_USERNAME;public static String WFS_MYSQL_PWD;public static String ABI_MYSQL_HOST;public static String ABI_MYSQL_USERNAME;public static String ABI_MYSQL_PWD;public static String SEGI_MYSQL_HOST;public static String SEGI_MYSQL_USERNAME;public static String SEGI_MYSQL_PWD;public static String UAC_MYSQL_HOST;public static String UAC_MYSQL_USERNAME;public static String UAC_MYSQL_PWD;public static String IOT_MYSQL_HOST;public static String IOT_MYSQL_USERNAME;public static String IOT_MYSQL_PWD;public static String SDS_MYSQL_HOST;public static String SDS_MYSQL_USERNAME;public static String SDS_MYSQL_PWD;public static String WMP_MYSQL_HOST;public static String WMP_MYSQL_USERNAME;public static String WMP_MYSQL_PWD;public static String AZKABAN_MYSQL_HOST;public static String AZKABAN_MYSQL_USERNAME;public static String AZKABAN_MYSQL_PWD;static {load();}public ConfigUtils() {if (configInfo == null) {configInfo = ConfigFactory.load(fileName);}getProperties();}public static void load() {if (configInfo == null) {configInfo = ConfigFactory.load(fileName);}getProperties();}public static void getProperties() {try {IOT_ES_HOST = configInfo.getString("iot.es.host");IOT_ES_PORT = configInfo.getString("iot.es.port");IOT_ES_HOST2 = configInfo.getString("iot.es.host2");IOT_ES_PORT2 = configInfo.getString("iot.es.port2");} catch (Exception e) {e.printStackTrace();}try {IS_TEST = configInfo.getBoolean("isTest");WFS_MYSQL_HOST = configInfo.getString("wfs.mysql.host");WFS_MYSQL_USERNAME = configInfo.getString("wfs.mysql.username");WFS_MYSQL_PWD = configInfo.getString("wfs.mysql.pwd");ABI_MYSQL_HOST = configInfo.getString("abi.mysql.host");ABI_MYSQL_USERNAME = configInfo.getString("abi.mysql.username");ABI_MYSQL_PWD = configInfo.getString("abi.mysql.pwd");SEGI_MYSQL_HOST = configInfo.getString("segi.mysql.host");SEGI_MYSQL_USERNAME = configInfo.getString("segi.mysql.username");SEGI_MYSQL_PWD = configInfo.getString("segi.mysql.pwd");UAC_MYSQL_HOST = configInfo.getString("uac.mysql.host");UAC_MYSQL_USERNAME = configInfo.getString("uac.mysql.username");UAC_MYSQL_PWD = configInfo.getString("uac.mysql.pwd");IOT_MYSQL_HOST = configInfo.getString("iot.mysql.host");IOT_MYSQL_USERNAME = configInfo.getString("iot.mysql.username");IOT_MYSQL_PWD = configInfo.getString("iot.mysql.pwd");SDS_MYSQL_HOST = configInfo.getString("sds.mysql.host");SDS_MYSQL_USERNAME = configInfo.getString("sds.mysql.username");SDS_MYSQL_PWD = configInfo.getString("sds.mysql.pwd");WMP_MYSQL_HOST = configInfo.getString("wmp.mysql.host");WMP_MYSQL_USERNAME = configInfo.getString("wmp.mysql.username");WMP_MYSQL_PWD = configInfo.getString("wmp.mysql.pwd");AZKABAN_MYSQL_HOST = configInfo.getString("azkaban.mysql.host");AZKABAN_MYSQL_USERNAME = configInfo.getString("azkaban.mysql.username");AZKABAN_MYSQL_PWD = configInfo.getString("azkaban.mysql.pwd");} catch (Exception e) {e.printStackTrace();}}public static boolean isWindowEvn() {return System.getProperties().getProperty("os.name").toLowerCase().contains("Windows".toLowerCase());}public static void main(String[] args) {
//        System.out.println(ConfigUtils.IS_TEST);
//        System.out.println(ConfigUtils.IOT_ES_HOST);
//        System.out.println(ConfigUtils.UAC_MYSQL_HOST);
//        System.out.println(ConfigUtils.IOT_MYSQL_HOST);
//        System.out.println(ConfigUtils.SDS_MYSQL_HOST);
//        System.out.println("wmp:" + ConfigUtils.WMP_MYSQL_HOST);System.out.println("wfs:" + ConfigUtils.WFS_MYSQL_HOST);System.out.println("IOT_ES_HOST2:" + ConfigUtils.IOT_ES_HOST2);System.out.println("IOT_ES_PORT2:" + ConfigUtils.IOT_ES_PORT2);System.out.println("azkaban:" + ConfigUtils.AZKABAN_MYSQL_HOST);}}

MysqlDao

import com.mingzhi.ConfigUtils;
import com.mingzhi.common.utils.DateUtil;
import com.mingzhi.common.utils.DruidConnection;
import com.mingzhi.common.utils.StringUtil;
import lombok.SneakyThrows;import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;/*** java引用scala* 先后编译的原因* 简单原则,其他module避免引入scala依赖,dao需要使用java对其他module提供api* 逐步废弃scala工具类*/
public class MysqlDao {public static void main(String[] args) throws Exception {try {String host = ConfigUtils.WFS_MYSQL_HOST;String port = "3306";String database = "paascloud_wfs";String user = ConfigUtils.WFS_MYSQL_USERNAME;String password = ConfigUtils.WFS_MYSQL_PWD;final Map<String, Integer> resultMonth = new MysqlDao().getResultMonth(host, "3306", database, user, password, "ads_order_overall_cube,ads_order_result_cube", "2020-11-29");final Map<String, Integer> result = new MysqlDao().getResultMap(host, "3306", database, user, password, "tbwork_order", null, null);final Map<String, Integer> resultQuarter = new MysqlDao().getResultQuarter(host, "3306", database, user, password, "ads_order_overall_cube", "2023-08-29");} catch (SQLException throwables) {throwables.printStackTrace();}}public Map<String, Integer> getResultMap(String host, String port, String database, String userName, String password, String tables, String dt, String dt1) throws Exception {HashMap<String, Integer> map = new HashMap<>();Connection conn = DruidConnection.getConnection(host, port, database, userName, password);Arrays.stream(tables.split(",")).forEach(new Consumer<String>() {@SneakyThrows@Overridepublic void accept(String t) {String sql;if (dt == null) {sql = "SELECT COUNT(*) as cnts FROM " + t + ";";} else {sql = "SELECT dt,COUNT(*) as cnts FROM " + t +" WHERE dt between '" + dt + "' and '" + dt1 + "' GROUP BY dt ORDER BY dt DESC;";}System.out.println("sql:" + sql);PreparedStatement statement = conn.prepareStatement(sql);ResultSet rs = statement.executeQuery();String dtx = "1970-01-01";int cnts = -1;while (rs.next()) {/*** Column 'dt' not found*/dtx = dt != null ? rs.getString("dt") : dtx;cnts = rs.getInt("cnts");map.put(dtx, cnts);System.out.println("dt is " + dtx + " and cnts is " + cnts);}
//                DruidConnection.release(rs, statement, null);}});DruidConnection.release(null, null, conn);return map;}public Map<String, Integer> getResultMonth(String host, String port, String database, String userName, String password, String tables, String dt) throws Exception {StringUtil.assertNotBlank(dt, "dt can not null");final String monthStart = DateUtil.getMonthStart(dt);final String monthEnd = DateUtil.getMonthEnd(dt);return getResultMap(host, port, database, userName, password, tables, monthStart, monthEnd);}public Map<String, Integer> getResultQuarter(String host, String port, String database, String userName, String password, String tables, String dt) throws Exception {StringUtil.assertNotBlank(dt, "dt can not null");final String monthStart = DateUtil.getMonthStart(dt);final String monthEnd = DateUtil.getMonthEnd(dt);return getResultMap(host, port, database, userName, password, tables, monthStart, monthEnd);}public Map<String, Integer> getResultQuarter(String host, String port, String database, String userName, String password, String tables, String dt) throws Exception {StringUtil.assertNotBlank(dt, "dt can not null");String t_start = DateUtil.getQuarterStart(dt);String t_end = DateUtil.getQuarterEnd(dt);return getResultMap(host, port, database, userName, password, tables, t_start, t_end);}public AzkabanExecInfo getAzkabanJobExecInfo(String flow_id) throws SQLException {String host = ConfigUtils.AZKABAN_MYSQL_HOST;String port = "3306";String database = "azkaban38";String user = ConfigUtils.AZKABAN_MYSQL_USERNAME;String password = ConfigUtils.AZKABAN_MYSQL_PWD;Connection conn = DruidConnection.getConnection(host, port, database, user, password);String sql = "SELECT * FROM execution_flows where flow_id='" + flow_id + "' ORDER BY exec_id DESC limit 1;";PreparedStatement statement = conn.prepareStatement(sql);/*** 获取某个flow的最新的执行记录** status 30 执行中* status 50 ok* status 60 killed* status 70 failed*/ResultSet rs = statement.executeQuery();String status = "";String submit_time = "00001";String end_time = "00001";while (rs.next()) {status = rs.getString("status");submit_time = rs.getString("submit_time");end_time = rs.getString("end_time");}DruidConnection.release(rs, statement, conn);String submitDay = DateUtil.getDateStrFromMill(Long.parseLong(submit_time)).split(" ")[0];String today = DateUtil.getToday();String endTime = DateUtil.getDateStrFromMill(Long.parseLong(end_time));String reasonDesc = "";boolean okResult = today.equalsIgnoreCase(submitDay) && "50".equalsIgnoreCase(status);if (okResult) {} else {//今天是否执行过if (!Objects.equals(today, submitDay)) {reasonDesc = "submitDay is " + submitDay + " and today is " + today;}//查看statusif ("30".equalsIgnoreCase(status)) {reasonDesc = flow_id + " is running,please wait for a while...";} else if ("40".equalsIgnoreCase(status)) {reasonDesc = flow_id + " is paused in " + endTime + "...";} else if ("60".equalsIgnoreCase(status)) {reasonDesc = flow_id + " is killed in " + endTime;} else if ("70".equalsIgnoreCase(status)) {reasonDesc = flow_id + " is failed in" + endTime;}}return new AzkabanExecInfo(okResult, status, reasonDesc, endTime);}public static class AzkabanExecInfo {Boolean isOk;String status;String reason;String endTime;public AzkabanExecInfo(Boolean isOk, String status, String reason, String endTime) {this.isOk = isOk;this.status = status;this.reason = reason;this.endTime = endTime;}public String getEndTime() {return endTime;}public Boolean getOk() {return isOk;}public String getStatus() {return status;}public String getReason() {return reason;}}
}

TableUtils

import com.mingzhi.ConfigUtils
import com.mingzhi.common.interf.IDate
import org.apache.spark.sql.functions.{col, desc, length}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.slf4j.LoggerFactoryimport java.sql.{Connection, DatabaseMetaData, DriverManager, ResultSet}
import java.util
import java.util.function.Consumer
import scala.collection.mutable
import scala.collection.mutable.ListBufferobject TableUtils {private val logger = LoggerFactory.getLogger(TableUtils.getClass)def show(frame: DataFrame): Unit = {show(frame, 20, false)}def show(frame: DataFrame, rows: Int, truncate: Boolean): Unit = {if (ConfigUtils.IS_TEST) {frame.show(rows, truncate)}}/*** 检查每个表的文件数*/def getFileCount(spark: SparkSession, db: String, tableName: String): Integer = {spark.sql(s"use $db")val f1 = spark.sql(s"""||show table extended like '$tableName'||""".stripMargin)//    f1.show(false)val information = f1.select("information").collect().mkString("@")println("information:" + information)1000}/*** 业务表导入时先删除指定dt的分区*/def delPartitions(spark: SparkSession, dt: String, dt1: String, to_db: String, to_table: String): Unit = {if (isPartitionedByDt(spark, to_db, to_table)) {new IDate {override def onDate(dt: String): Unit = {spark.sql(s"ALTER TABLE $to_db.$to_table  DROP IF EXISTS PARTITION (dt='${dt}')")}}.invoke(dt, dt1)} else {println(s"$to_table is not partitioned by dt,skip....")}}def isPartitionExists(spark: SparkSession, db: String, table: String, dt: String): Boolean = {spark.sql(s"use $db")val frame = spark.sql(s" show partitions $table")/*** +-------------+* |partition    |* +-------------+* |dt=2020-08-18|* |dt=2021-09-01|* |dt=2021-09-02|* |dt=2021-09-03|* |dt=2021-09-04|* |dt=2021-09-06|* +-------------+*/frame.show(false)import scala.collection.JavaConversions._val dts: mutable.Seq[String] = frame.collectAsList().map({ case Row(ct: String) => ct.split("=")(1): String })/*** ArrayBuffer(2020-08-18, 2021-09-01, 2021-09-02, 2021-09-03, 2021-09-04, 2021-09-06)*/println(dts.toString())val isPartitionExist = dts.contains(dt)println("is partition exist:" + isPartitionExist)isPartitionExist}def getCompressType(spark: SparkSession, db: String, table: String): String = {val frame = spark.sql(s" show create table $table")val createTableStatement = frame.select("createtab_stmt").collect().toSet.mkString(",")/*** spark表* USING parquet* hive表* org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe*/if (createTableStatement.contains("USING parquet")) {"parquet"} else if (createTableStatement.contains("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe")) {"HiveFileFormat"} else {throw new UnsupportedOperationException("未知的压缩类型")}}/*** 表t是否是按照dt进行分区的*/private def isPartitionedByDt(spark: SparkSession, db: String, table: String): Boolean = {spark.sql(s"use $db")val frame = spark.sql(s" show create table $table")val createTableStatement = frame.select("createtab_stmt").collect().toSet.mkString(",")createTableStatement.contains("PARTITIONED BY (`dt` STRING)") || createTableStatement.contains("PARTITIONED BY (dt")}/*** 判断某个表在某个库中是否存在,不考虑虚表*/def tableExists(spark: SparkSession, to_db: String, to_table: String): Boolean = {spark.sql(s"use $to_db")val frame_table = spark.sql(s"show tables")/*** +--------+--------------------+-----------+* |database|           tableName|isTemporary|* +--------+--------------------+-----------+* |     iot|            ads_elec|      false|* |     iot|ads_iot_electrici...|      false|* |     iot|ads_iot_electrici...|      false|* |     iot|ads_iot_humiture_...|      false|* |        |t_iot_rule_meter_...|       true|* +--------+--------------------+-----------+*///    frame_table.show(100, false)/*** |-- database: string (nullable = false)* |-- tableName: string (nullable = false)* |-- isTemporary: boolean (nullable = false)*///    frame_table.printSchema()val str = frame_table.filter(r => {!r.getAs[Boolean]("isTemporary")}).select("tableName").collect().toSet.mkString(",")str.contains(s"[${to_table.toLowerCase}]")}def queryExecuteDate(spark: SparkSession, sql: String, doDate: String): Array[String] = {val querySql = sql.replaceAll("do_date", doDate);logger.info("querySql=" + querySql)val frame_table = spark.sql(querySql).distinct().cache().filter(" date is not null ")var data = frame_table.rdd.map(line => line.getAs("date").toString).distinct().collect().sortedif (data.length == 0) data = Array(doDate)data.foreach(date => {logger.info(s"$doDate 修改过那一天的数据:" + date)})data}def queryExecuteDate(spark: SparkSession, sql: String, beginDate: String, endDate: String): Array[String] = {val querySql = sql.replaceAll("begin_date", beginDate).replaceAll("end_date", endDate);logger.info("querySql=" + querySql)val frame_table = spark.sql(querySql).distinct().cache().filter(" date is not null ")var data: Array[String] = frame_table.rdd.map(line => line.getAs("date").toString).distinct().collect().sortedif (data.length == 0) {data = DateUtils.getDateList(beginDate, endDate)}data.foreach(date => {logger.info(s"【$beginDate,$endDate】修改过那一天的数据:" + date)})data}def queryExecuteMonth(spark: SparkSession, sql: String, doDate: String): Array[String] = {val querySql = sql.replaceAll("do_date", doDate);logger.info("querySql=" + querySql)val frame_table = spark.sql(querySql).distinct().cache().filter(" date is not null ")var data = frame_table.rdd.map(line => line.getAs("date").toString).distinct().collect().sortedif (data.length == 0) data = Array(doDate)data.foreach(date => {logger.info(s"$doDate 修改过那一天的数据:" + date)})data}def formattedData(spark: SparkSession, frame: DataFrame): DataFrame = {var schema = frame.schemaval fields: Array[StructField] = schema.fieldsvar resultDataFrame = frame.rdd.mapPartitions(it => {var result = new ListBuffer[Row]()it.foreach(record => {var list = new ListBuffer[Any]()fields.foreach(StructField => {val clunnName = StructField.nameval dateType = StructField.dataTypedateType match {case _: ByteType => list.append(record.getAs(clunnName))case _: ShortType => list.append(record.getAs(clunnName))case _: IntegerType => list.append(record.getAs(clunnName))case _: LongType => list.append(record.getAs(clunnName))case _: BooleanType => list.append(record.getAs(clunnName))case _: FloatType => list.append(record.getAs(clunnName))case _: DoubleType => list.append(record.getAs(clunnName))case _: StringType => if (record.getAs(clunnName) == null) {list.append(record.getAs(clunnName))} else {list.append(record.getAs(clunnName).toString.replaceAll("\r", "").replaceAll("\n", ""))}case _: TimestampType => list.append(record.getAs(clunnName))case _: DateType => list.append(record.getAs(clunnName))case _: DecimalType => list.append(record.getAs(clunnName))case _ => list.append(record.getAs(clunnName))}})result.append(Row.fromSeq(list))})result.iterator})val targetDataFrame = spark.createDataFrame(resultDataFrame, schema).drop("@timestamp", "@version")targetDataFrame}/*** 基于DataFrame创建对应的Mysql表*/def createMysqlTableDDL(t_name: String, frame: DataFrame): String = {var sql = s"create table if not EXISTS $t_name(\n"frame.schema.foreach(f => {val col_name = f.namevar tp = "varchar(64)"val comment: Option[String] = f.getComment()tp = f.dataType match {case _: StringType => {val df: Dataset[Row] = frame.select(col_name).groupBy(col_name).agg(length(col(col_name)).as(s"len_${col_name}")).orderBy(desc(s"len_${col_name}"))/*** +---------------+--------+* |           name|len_name|* +---------------+--------+* |bbbbbbbbbbbbbbb|      15|* |        aaaaaaa|       7|* |              c|       1|* |              e|       1|* |              f|       1|* |              d|       1|* +---------------+--------+*/
//          df.show()/*** String类型包含abi无法识别的text大文本* 其长度范围需要动态处理*/var len_str = df.select(s"len_${col_name}").take(1).mkString.replace("[", "").replace("]", "")if ("null".equalsIgnoreCase(len_str)) {len_str = "32"}var len = len_str.toInt//          println(s"col ${col_name} max length:" + len)/*** 为避免频繁的alter操作,初始化适当冗余空间*/if (len == 0) {len = 10} else if (len > 0 && len < 1024) {len = len * 3} else {len = (len * 1.5).toInt}if (len <= 10000) {s"varchar($len)"} else {"text"}}case _: DateType | _: TimestampType => "varchar(64)"case _: IntegerType => "int"case _: LongType => "bigint"case _: DoubleType | _: FloatType | _: DecimalType => "decimal(16,4)"case _ => "text"}sql = sql + s"`$col_name` $tp COMMENT '${comment.getOrElse("")}',\n"})val ddl_create = sql.substring(0, sql.length - 2) + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;\n"ddl_create}/*** 表存在,基于对比表metaData和frame参数动态修改mysql表ddl*/def updateMysqlTableDDL(t_name: String, frame: DataFrame, metaData: DatabaseMetaData): String = {val targetNameAndLength = new util.HashMap[String, Integer]()val sb = new StringBuilder()val rs: ResultSet = metaData.getColumns(null, "%", t_name, "%")var columnName2 = ""var columnType2 = ""var COLUMN_SIZE2 = "0"val count = rs.getMetaData.getColumnCountwhile (rs.next()) {columnName2 = rs.getString("COLUMN_NAME")COLUMN_SIZE2 = rs.getString("COLUMN_SIZE")columnType2 = rs.getString("TYPE_NAME")println("columnType2:" + columnType2)targetNameAndLength.put(columnName2.toLowerCase(), COLUMN_SIZE2.toInt)}println("targetNameAndLength:" + targetNameAndLength)frame.schema.foreach(f => {val col_name = f.name.toLowerCase()val lenInMysql: Integer = targetNameAndLength.getOrDefault(col_name, -1)/*** hive数据集新增的字段,mysql中同步更新*/if (lenInMysql == -1) {println(s"add col to mysql: $col_name")f.dataType match {case _: StringType => {//按实际长度处理}case _: DateType | _: TimestampType => {sb.append(s"alter table ${t_name} add column ${col_name} varchar(64);\n")}case _: IntegerType =>sb.append(s"alter table ${t_name} add column ${col_name} int;\n")case _: LongType =>sb.append(s"alter table ${t_name} add column ${col_name} bigint;\n")case _: DoubleType | _: FloatType | _: DecimalType =>sb.append(s"alter table ${t_name} add column ${col_name} decimal(16,4);\n")case _ => sb.append(s"alter table ${t_name} add column ${col_name} text;\n")}}f.dataType match {case _: StringType => {val df: Dataset[Row] = frame.select(col_name).groupBy(col_name).agg(length(col(col_name)).as(s"len_${col_name}")).orderBy(desc(s"len_${col_name}"))var len_str = df.select(s"len_${col_name}").take(1).mkString.replace("[", "").replace("]", "")//            .toIntif ("null".equalsIgnoreCase(len_str)) {len_str = "99"}val len = len_str.toIntprintln(s"col ${col_name} max length:" + len)/*** 对于新增的String字段*/if (lenInMysql == -1) {sb.append(s"alter table ${t_name} add column ${col_name} varchar($len);\n")}/*** 由于alter对于历史表的操作是一个非常重的操作,* 因此必须要进行比较后,确实需要修改才进行该操作*/if (lenInMysql >= 0 && lenInMysql < len) {sb.append(s"ALTER TABLE ${t_name} MODIFY ${col_name}  VARCHAR(${len});\n")}}case _ => {println(s"other type ${f.dataType},do nothing")}}})val ddl_update = sb.toString()println("ddl update:" + ddl_update)ddl_update}/*** 基于DataFrame创建对应的样例类*/def createCaseClass(clzName: String, frame: DataFrame): String = {var sql = s"case class $clzName(\n"frame.schema.foreach(s => {val name = s.namevar tp = "String"tp = s.dataType.typeName match {case "string" | "date" | "timestamp" => "String"case "integer" => "Integer"case "double" | "float" => "Double"case "long" => "Long"case "array" => "Array[Any]"case "map" => "Map[String,String]"case _ =>if (s.dataType.typeName.contains("decimal")) {"BigDecimal"} else {throw new IllegalArgumentException(s"未知的数据类型:${s.dataType},${s.dataType.typeName}")}}sql = sql + s"var $name:$tp,\n"})sql.substring(0, sql.length - 2) + "\n)"}/*** DataFrame的字段添加注释*/def addComment(spark: SparkSession, f: DataFrame, commentMap: java.util.Map[String, String]): DataFrame = {val schema = f.schema.map(s => {s.withComment(commentMap.getOrDefault(s.name, ""))})spark.createDataFrame(f.rdd, StructType(schema))}/*** 基于查询的表的metaInfo以及droCols,重新构建查询的sql*/def createQuerySql(url: String, userName: String, pwd: String, t_source: String, dropCols: String): String = {val sb = new StringBuilder()val list = new util.ArrayList[String]()var conn: Connection = nullvar querySql = ""try {conn = DriverManager.getConnection(url, userName, pwd)val rs: ResultSet = conn.getMetaData.getColumns(null, "%", t_source, "%")while (rs.next()) {val columnName = rs.getString("COLUMN_NAME")println("columnName:" + columnName)list.add(columnName)}dropCols.split(",|-").foreach(col => {println(s"drop col====>${col}")list.remove(col)})list.forEach(new Consumer[String] {override def accept(t: String): Unit = {/*** 预防业务表字段使用关键字*/sb.append("`" + t + "`").append(",")}})val cols = sb.substring(0, sb.size - 1)querySql = s"select $cols from $t_source"println("querySql:" + querySql)} catch {case e: Exception => println(e)} finally {DruidConnection.release(null, null, conn)}querySql}def main(args: Array[String]): Unit = {val spark: SparkSession = SparkUtils.getBuilder.master("local[*]").getOrCreate()//    isPartitionExists(spark, "paascloud", "dwd_order_info_abi", "2021-09-07")println(isPartitionedByDt(spark, "paascloud", "dwd_order_info_abi"))println(getCompressType(spark, "paascloud", "dwd_order_info_abi"))//    val persons = spark.createDataFrame(Seq(//      ("aaaaaaa", new Random().nextInt(10), "2022-01-01"),//      ("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbxxx", 20, "2022-01-01"),//      ("c", 20, "2022-01-01"),//      ("d", 19, "2022-01-01"),//      ("e", 21, "2022-01-01"),//      ("f", 20, "2022-01-02")//    )).toDF("name", "age", "dt")//      .withColumn("x", lit(2.3))////    persons.show(false)//////    val fields: Array[StructField] = persons.schema.fields////    fields.foreach(f => {////      println("name:" + f.name + ",dataType:" + f.dataType + "," + "metadata:" + f.metadata + ",comment:" + f.getComment() + ",toDDL:" + f.toDDL)////    })//    f.schema.foreach(s => {//////      println("name:" + s.name + ",typeName:" + s.dataType.typeName + ",metadata:" + s.metadata + ",comment:" + s.getComment().getOrElse(""))//    })//    println(createCaseClass("A", persons.toDF()))//    println(createMysqlTableDDL("person", persons.toDF()))//    testGetMysqlMetaData(persons.toDF())//    testCreateQuerySql()testGetFileCount(spark, "common", "t_uac_user")testGetFileCount(spark, "paascloud", "dwd_order_info_abi")}def testGetFileCount(spark: SparkSession, db: String, tableName: String): Unit = {getFileCount(spark, db, tableName)}def testCreateQuerySql(): Unit = {var host = "192.168.32.179"var port = "3308"val db = "vwork"val userName = "root"val pwd = "123456"val target_table = "administrative_organization"val url = s"jdbc:mysql://${host}:${port}/${db}?useUnicode=true&characterEncoding=UTF-8&connectionRequestTimout=300000&connectionTimeout=300000&socketTimeout=300000&useSSL=false"createQuerySql(url, userName, pwd, target_table, "dt,x")createQuerySql(url, userName, pwd, target_table, "org_code_macro")}def testGetMysqlMetaData(frame_result: DataFrame): Unit = {var host = "192.168.32.179"var port = "3308"val db = "vwork"val userName = "root"val pwd = "123456"val target_table = "person"val url = s"jdbc:mysql://${host}:${port}/${db}?useUnicode=true&characterEncoding=UTF-8&connectionRequestTimout=300000&connectionTimeout=300000&socketTimeout=300000&useSSL=false"var conn: Connection = nulltry {conn = DriverManager.getConnection(url, userName, pwd)println("updateMysqlTableDDL:" + updateMysqlTableDDL(target_table, frame_result, conn.getMetaData))} catch {case e: Exception => logger.error("MySQL exception {}:", e)} finally {if (null != conn && !conn.isClosed) {conn.close()}}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/145524.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java设计模式-创建型模式-原型模式

原型模式 原型模式浅拷贝深拷贝 原型模式 要求&#xff1a;以一个已经创建的对象为原型&#xff0c;复制一个新的对象 使用场景&#xff1a; 创建对象的成本比较大的时候&#xff08;如从耗时较长的计算或者从查询耗时长的RPC接口获取数据&#xff09;&#xff0c;直接拷贝已…

双向链表的知识点+例题

1.链表的种类 题中常考查以下两种&#xff1a; 上一讲我们学了无头单向非循环链表&#xff0c;这节&#xff0c;让我们看一下双向链表的操作吧~ 2基本操作 1&#xff0c;定义双向链表 2&#xff0c;创建一个节点 3&#xff0c;初始化双链表 4&#xff0c;尾插一个节点 5打印…

全球温度数据下载

1.全球年平均温度下载https://www.ncei.noaa.gov/data/global-summary-of-the-year/archive/ 2.全球月平均气温下载https://www.ncei.noaa.gov/data/global-summary-of-the-month/archive/ 3.全球日平均气温下载https://www.ncei.noaa.gov/data/global-summary-of-the-day/ar…

一、认识STM32

目录 一、初识STM32 1.1 STM32的命名规则介绍 1.2 STM32F103ZET6资源配置介绍 二、如何识别芯片管脚 2.1 如何寻找 IO 的功能说明 三、构成最小系统的要素 一、初识STM32 1.1 STM32的命名规则介绍 以 STM32F103ZET6 来讲解下 STM32 的命名方法&#xff1a; &…

.Net8 Blazor 尝鲜

全栈 Web UI 随着 .NET 8 的发布&#xff0c;Blazor 已成为全堆栈 Web UI 框架&#xff0c;可用于开发在组件或页面级别呈现内容的应用&#xff0c;其中包含&#xff1a; 用于生成静态 HTML 的静态服务器呈现。使用 Blazor Server 托管模型的交互式服务器呈现。使用 Blazor W…

忆联消费级SSD AH660:将用户体验推向新高度

自1989年IBM推出世界上第一款固态硬盘&#xff08;SSD&#xff09;以来&#xff0c;SSD在三十多年的时间中历经了多次技术革新和市场变革&#xff0c;早已成为个人电脑、汽车电子、数据中心、物联网终端等领域的主流存储产品&#xff0c;并广泛应用于各行各业&#xff0c;在202…

node 第十八天 中间件express-session实现会话密钥

express-session 文档 express-session 一个简单的express会话中间件 使用场景 在一个系统中&#xff0c; 需要维持一个临时的与登录态无关的会话密钥 比如登录系统后&#xff0c; 请求某一个接口&#xff0c; 接口的行为与登录态无关&#xff0c; 也就是说任何人对接口的访问…

【JAVA-排列组合】一个套路速解排列组合题

说明 在初遇排列组合题目时&#xff0c;总让人摸不着头脑&#xff0c;但是做多了题目后&#xff0c;发现几乎能用同一个模板做完所有这种类型的题目&#xff0c;大大提高了解题效率。本文简要介绍这种方法。 题目列表 所有题目均从leetcode查找&#xff0c;便于在线验证 46.…

C语言判断素数(ZZULIOJ1057:素数判定)

题目描述 输入一个正整数n&#xff0c;判断n是否是素数&#xff0c;若n是素数&#xff0c;输出”Yes”,否则输出”No”。 注意&#xff1a;1不是素数。 输入&#xff1a;输入一个正整数n(n<1000) 输出&#xff1a;如果n是素数输出"Yes"&#xff0c;否则输出"…

spark性能调优 | 默认并行度

Spark Sql默认并行度 看官网&#xff0c;默认并行度200 https://spark.apache.org/docs/2.4.5/sql-performance-tuning.html#other-configuration-options 优化 在数仓中 task最好是cpu的两倍或者3倍(最好是倍数&#xff0c;不要使基数) 拓展 在本地 task需要自己设置&a…

如何使用Matplotlib模块的text()函数给柱形图添加美丽的标签数据?

如何使用Matplotlib模块的text函数给柱形图添加美丽的标签数据&#xff1f; 1 简单引入2 关于text()函数2.1 Matplotlib安装2.2 text()引入2.3 text()源码2.4 text()参数说明2.5 text()两个简单示例 3 柱形图绘制并添加标签3.1 目标数据3.2 读取excel数据3.3 设置窗口大小和xy轴…

如何提升软件测试效率?本文为你揭示秘密

在软件开发中&#xff0c;测试是至关重要的一个环节。它能帮助我们发现并修复问题&#xff0c;从而确保我们提供的软件具有高质量。然而&#xff0c;测试过程往往费时费力。那么&#xff0c;有没有方法可以提升我们的软件测试效率呢&#xff1f;答案是肯定的。下面&#xff0c;…

骨传导耳机品牌排名前十,盘点最受欢迎的五款TOP级骨传导耳机

骨传导耳机品牌排名前十&#xff0c;最受欢迎的五款TOP级骨传导耳机是什么&#xff1f; 耳机市场上有很多品牌和型号的骨传导耳机&#xff0c;每个人对耳机的需求和使用场景也不尽相同。因此&#xff0c;在选择耳机时&#xff0c;确实不能盲目跟风或者仅仅看重品牌。为了帮助大…

spring cloud之配置中心

Config 统一配置中心(*) 1.简介 # 统一配置中心 - 官网:https://cloud.spring.io/spring-cloud-static/spring-cloud-config/2.2.3.RELEASE/reference/html/#_spring_cloud_config_server- config 分为 config server 和 config client。用来统一管理所有微服务的配置统一配置…

ChatGPT 从零到一打造私人智能英语学习助手

近几年&#xff0c;随着智能化技术的发展和人工智能的兴起&#xff0c;越来越多的应用程序开始涌现出来。在这些应用中&#xff0c;语音识别、自然语言处理以及机器翻译等技术都得到了广泛的应用。其中&#xff0c;聊天机器人成为了最受欢迎的人工智能应用之一&#xff0c;它们…

php脚本执行timeout

ini_set(‘memory_limit’,‘3072M’); // 临时设置最大内存占用为3G set_time_limit(0);// 设置脚本最大执行时间 为0 永不过期

element-china-area-data使用问题

使用CodeToText报错&#xff0c;下载的时候默认下载最新版本的&#xff0c; 稳定版本5.0.2版本才可以 npm install element-china-area-data5.0.2 -S

日志存档及解析

网络中的每个设备都会生成大量日志数据&#xff0c;日志数据包含有关网络中发生的所有活动的关键信息&#xff0c;存储所有这些数据并对其进行管理对组织来说是一项挑战&#xff0c;因此&#xff0c;这些日志文件被压缩并存储在效率较低的存储介质中&#xff0c;无法轻松检索。…

--max-old-space-size=8192报错

vue项目运行时&#xff0c;如果经常运行慢&#xff0c;崩溃停止服务&#xff0c;报如下错误 FATAL ERROR: CALL_AND_RETRY_LAST Allocation failed - JavaScript heap out of memory 因为在 Node 中&#xff0c;通过JavaScript使用内存时只能使用部分内存&#xff08;64位系统&…

简单介绍二分类问题评价指标

正确率(Accuracy) Accuracy ​(TP TN)/(TP TN FP FN)精准率(Precision) 记忆&#xff1a;在识别出某标签中正确的比例&#xff1b; 比如识别为某标签的一共有105个&#xff0c;其中有95个是识别对的&#xff0c;那Precision就是95/105&#xff1b; TP/(TPFP)召回率(Recall…