1. 请写一个 Python 逻辑,计算一个文件中的大写字母数量
答:读取‘A.txt’中的大写字母数量
with open('A.txt') as f:"""计算一个文件中的大写字母数量"""count = 0for i in f.read():if i.isupper():count += 1
print(count)
运行结果为4。
2.了解数据库的三范式么?
答: 经过研究和对使用中问题的总结,对于设计数据库提出了一些规范,这些规范被称为范式,一般需要遵守下面3范式即可::
第一范式(1NF):强调的是列的原子性,即列不能够再分成其他几列。
第二范式(2NF):首先是 1NF,另外包含两部分内容,一是表必须有一个主键;二是没有包含在主键中的列必须完全依赖于主键,而不能只依赖于主键的一部分。
第三范式(3NF):首先是 2NF,另外非主键列必须直接依赖于主键,不能存在传递依赖。即不能存在:非主键列 A 依赖于非主键列 B,非主键列 B 依赖于主键的情况。
数据库三大范式和五大约束
3.了解分布式锁么
答: 分布式锁是控制分布式系统之间的同步访问共享资源的一种方式。 对于分布式锁的目标,我们必须首先明确三点:
任何一个时间点必须只能够有一个客户端拥有锁。
不能够有死锁,也就是最终客户端都能够获得锁,尽管可能会经历失败。
错误容忍性要好,只要有大部分的Redis实例存活,客户端就应该能够获得锁。
分布式锁的条件:
互斥性:分布式锁需要保证在不同节点的不同线程的互斥
可重入性:同一个节点上的同一个线程如果获取了锁之后,能够再次获取这个锁。
锁超时:支持超时释放锁,防止死锁 高效,
高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
支持阻塞和非阻塞:可以实现超时获取失败,tryLock(long timeOut) 支持公平锁和非公平锁
分布式锁的实现方案 1、数据库实现(乐观锁) 2、基于zookeeper的实现 3、基于Redis的实现(推荐)
添加链接描述
添加链接描述
4.用 Python 实现一个 Reids 的分布式锁的功能
答:REDIS分布式锁实现的方式:SETNX + GETSET,NX是Not eXists的缩写,如SETNX命令就应该理解为:SET if Not eXists。 多个进程执行以下Redis命令:
SETNX lock.foo <current Unix time + lock timeout + 1>
如果 SETNX 返回1,说明该进程获得锁,SETNX将键 lock.foo 的值设置为锁的超时时间(当前时间 + 锁的有效时间)。 如果 SETNX 返回0,说明其他进程已经获得了锁,进程不能进入临界区。进程可以在一个循环中不断地尝试 SETNX 操作,以获得锁。
//锁定的方法-伪代码
publicbooleanlock(){connection.setAutoCommit(false)for(){result =select* from user where id = 100 for update;if(result){//结果不为空,//则说明获取到了锁return true; }//没有获取到锁,继续获取sleep(1000); }return false;}//释放锁-伪代码connection.commit();
import time
import redis
from conf.config import REDIS_HOST, REDIS_PORT, REDIS_PASSWORDclass RedisLock:def __init__(self):self.conn = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, db=1)self._lock = 0self.lock_key = ""@staticmethoddef my_float(timestamp):"""Args:timestamp:Returns:float或者0如果取出的是None,说明原本锁并没人用,getset已经写入,返回0,可以继续操作。"""if timestamp:return float(timestamp)else:#防止取出的值为None,转换float报错return 0@staticmethoddef get_lock(cls, key, timeout=10):cls.lock_key = f"{key}_dynamic_lock"while cls._lock != 1:timestamp = time.time() + timeout + 1cls._lock = cls.conn.setnx(cls.lock_key, timestamp)# if 条件中,可能在运行到or之后被释放,也可能在and之后被释放# 将导致 get到一个None,float失败。if cls._lock == 1 or (time.time() > cls.my_float(cls.conn.get(cls.lock_key)) andtime.time() > cls.my_float(cls.conn.getset(cls.lock_key, timestamp))):breakelse:time.sleep(0.3)@staticmethoddef release(cls):if cls.conn.get(cls.lock_key) and time.time() < cls.conn.get(cls.lock_key):cls.conn.delete(cls.lock_key)def redis_lock_deco(cls):def _deco(func):def __deco(*args, **kwargs):cls.get_lock(cls, args[1])try:return func(*args, **kwargs)finally:cls.release(cls)return __decoreturn _deco@redis_lock_deco(RedisLock())
def my_func():print("myfunc() called.")time.sleep(20)if __name__ == "__main__":my_func()
5. 请写一段 Python连接Mongo数据库,并查询代码。
答:
import pymongo
db_configs = {'type': 'mongo','host': '地址','port': '端口','user': 'spider_data','passwd': '密码','db_name': 'spider_data'
}class Mongo():def __init__(self, db=db_configs["db_name"], username=db_configs["user"],password=db_configs["passwd"]):self.client = pymongo.MongoClient(f'mongodb://{db_configs["host"]}:db_configs["port"]')self.username = usernameself.password = passwordif self.username and self.password:self.db1 = self.client[db].authenticate(self.username, self.password)self.db1 = self.client[db]def find_data(self):# 获取状态为0的数据data = self.db1.test.find({"status": 0})gen = (item for item in data)return genif __name__ == '__main__':m = Mongo()print(m.find_data())
6.写一段 Python 使用 mongo 数据库创建索引的代码:
答:
import pymongo
db_configs = {'type': 'mongo','host': '地址','port': '端口','user': 'spider_data','passwd': '密码','db_name': 'spider_data'
}class Mongo():def __init__(self, db=db_configs["db_name"], username=db_configs["user"],password=db_configs["passwd"]):self.client = pymongo.MongoClient(f'mongodb://{db_configs["host"]}:{db_configs["port"]}')self.username = usernameself.password = passwordif self.username and self.password:self.db1 = self.client[db].authenticate(self.username, self.password)self.db1 = self.client[db]def add_index(self):"""通过create_index添加索引"""self.db1.test.create_index([('name', pymongo.ASCENDING)], unique=True)def get_index(self,):"""查看索引列表"""indexlist=self.db1.test.list_indexes()for index in indexlist:print(index)if __name__ == '__main__':m = Mongo()m.add_index()print(m.get_index())
7.说一说Redis的基本类型
答: Redis 支持五种数据类型: string(字符串) 、 hash(哈希)、list(列表) 、 set(集合) 及 zset(sorted set: 有序集合)。
8.了解Redis的事务么
答: 简单理解,可以认为 redis 事务是一些列 redis 命令的集合,并且有如下两个特点: 1.事务是一个单独的隔离操作:事务中的所有命令都会序列化、按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。 2.事务是一个原子操作:事务中的命令要么全部被执行,要么全部都不执行。 一般来说,事务有四个性质称为ACID,分别是原子性,一致性,隔离性和持久性。 一个事务从开始到执行会经历以下三个阶段:
开始事务
命令入队
执行事务 代码示例:
import redis
import sys
def run(): try:conn=redis.StrictRedis('192.168.80.41')# Python中redis事务是通过pipeline的封装实现的pipe=conn.pipeline()pipe.sadd('s001','a')sys.exit()#在事务还没有提交前退出,所以事务不会被执行。pipe.sadd('s001','b')pipe.execute()passexcept Exception as err:print(err)pass
if __name__=="__main__":run()
9. 请写一段 Python连接Redis数据库的代码。
答:
from redis import StrictRedis, ConnectionPool
redis_url="redis://:xxxx@112.27.10.168:6379/15"
pool = ConnectionPool.from_url(redis_url, decode_responses=True)
r= StrictRedis(connection_pool=pool)
10. 请写一段 Python连接Mysql数据库的代码。
答:
# 导入pymysql模块
import pymysql
# 连接database
conn = pymysql.connect(host=“你的数据库地址”, user=“用户名”,password=“密码”,database=“数据库名”,charset=“utf8”)
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
# 定义要执行的SQL语句
sql = """
CREATE TABLE USER1 (
id INT auto_increment PRIMARY KEY ,
name CHAR(10) NOT NULL UNIQUE,
age TINYINT NOT NULL
)ENGINE=innodb DEFAULT CHARSET=utf8;
"""
# 执行SQL语句
cursor.execute(sql)
# 关闭光标对象
cursor.close()
# 关闭数据库连接
conn.close()
推荐文献:Python连接MySQL数据库之pymysql模块使用
谢谢作者分享!