上下文管理、redis发布订阅、RabbitMQ发布订阅、SQLAlchemy

一、上下文管理

 

import  contextlib
@contextlib.contextmanager
def work_state(state_list,worker_thread):state_list.append(worker_thread)try:yieldfinally:state_list.remove(worker_thread)
free_list=[]
current_thread="alex"
with work_state(free_list,current_thread):print(123)print(456)#以下为执行结果:
123
456

 

代码执行步骤

 

 

上下文用于需要 close()方法的模块

 

import  contextlib
import  socket@contextlib.contextmanager
def context_socket(host,port):sk=socket.socket()sk.bind((host,port))sk.listen(5)try:yield skfinally:sk.close()
with context_socket('127.0.0.1',8888) as sock:print(sock)#以下为执行结果:
<socket.socket fd=224, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 8888)>

 

 

 

二、redis 发布订阅

#redis2.py 主程序import  redis
class RedisHelper:def __init__(self):self.__conn=redis.Redis(host='192.168.11.87')def public(self,msg,chan):self.__conn.publish(chan,msg)return  Truedef subscribe(self,chan):pub=self.__conn.pubsub()pub.subscribe(chan)pub.parse_response()return  pub

 

 

订阅

import redis2obj= redis2.RedisHelper()
data=obj.subscribe('fm111.7')
print(data.parse_response())#接收到发布信息:
[b'message', b'fm111.7', b'aaaaaa']

 

发布

import redis2obj= redis2.RedisHelper()
obj.public('alex_db','f111.7')

 

 

三、RabbitMQ

import pika#生产者 发布
connection =pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))channel = connection.channel()
channel.queue_declare(queue='hello_wuwenyu')                 #创建队列,存在则忽略
channel.basic_publish(exchange='', routing_key='hello_wuwenyu', body='Hello World') print("[x] Sent 'Hello World!'") connection.close


 

 

import pika#消费者 订阅
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()
channel.queue_declare(queue='hello_wuwenyu')  #
def callback(ch,method,properties,body):print(" [x] Received %r" % body)
channel.basic_consume(callback,queue='hello_wuwenyu',no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()#接收到生产者发来的消息:
[*] Waiting for messages. To exit press CTRL+C
[x] Received b'Hello World'

 

  2 exchange 绑定多个队列

#

import pika#生产者 发布
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',type='fanout')message = '456'
channel.basic_publish(exchange='logs_fanout',routing_key='',body=message)
print(" [x] Sent %r" % message)
connection.close()

 

import pika#订阅
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',type='fanout')# 随机创建队列
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# 绑定
channel.queue_bind(exchange='logs_fanout',queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r" % body)channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()#执行多次消费端,随机产生多个队列,每个队列都接收到消息:
[*] Waiting for logs. To exit press CTRL+C[x] b'456'

 

 关键字

#生产者  severity = 'info'      severity = 'errer'   执行两次
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')severity = 'info'     
# severity = 'errer' message = '123' channel.basic_publish(exchange='direct_logs_wuwenyu',routing_key=severity,body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()

 

 

#订阅 消费 客户端1
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queueseverities =  ['error','info','warning']for severity in severities:channel.queue_bind(exchange='direct_logs_wuwenyu',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()#接受到的消息:[*] Waiting for logs. To exit press CTRL+C[x] 'error':b'123'[x] 'info':b'123'

 

 

#订阅 消费 客户端2
import pika
import sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.87'))
channel = connection.channel()channel.exchange_declare(exchange='direct_logs_wuwenyu',type='direct')result = channel.queue_declare(exclusive=True)
queue_name = result.method.queueseverities =  ['error',]for severity in severities:channel.queue_bind(exchange='direct_logs_wuwenyu',queue=queue_name,routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,queue=queue_name,no_ack=True)channel.start_consuming()
#接受到的消息:[*] Waiting for logs. To exit press CTRL+C[x] 'info':b'123'

 

四、SQLAlchemy

SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

MySQL-Python
mysql+mysqldb://:@[:]/pymysql
mysql+pymysql://:@/[?]MySQL-Connector
mysql+mysqlconnector://:@[:]/cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html

 


步骤一:

使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

#!/usr/bin/env python
# -*- coding:utf-8 -*-fromsqlalchemy importcreate_engineengine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)engine.execute(
"INSERT INTO ts_test (a, b) VALUES ('2', 'v1')"
)engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%s, %s)",
((555, "v1"),(666, "v1"),)
)
engine.execute(
"INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)",
id=999, name="v1"
)result =engine.execute('select * from ts_test')
result.fetchall()

 


事务操作

注:查看数据库连接:show status like 'Threads%';

步骤二:

使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。

#!/usr/bin/env python
# -*- coding:utf-8 -*-fromsqlalchemy importcreate_engine, Table, Column, Integer, String, MetaData, ForeignKeymetadata =MetaData()user =Table('user', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)color =Table('color', metadata,
Column('id', Integer, primary_key=True),
Column('name', String(20)),
)
engine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)metadata.create_all(engine)
# metadata.clear()
# metadata.remove()

 


增删改查

更多内容详见:

    http://www.jianshu.com/p/e6bba189fcbd

    http://docs.sqlalchemy.org/en/latest/core/expression_api.html

注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。

步骤三:

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

#!/usr/bin/env python
# -*- coding:utf-8 -*-fromsqlalchemy.ext.declarative importdeclarative_base
fromsqlalchemy importColumn, Integer, String
fromsqlalchemy.orm importsessionmaker
fromsqlalchemy importcreate_engineengine =create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)Base =declarative_base()classUser(Base):
__tablename__ ='users'
id=Column(Integer, primary_key=True)
name =Column(String(50))# 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
# Base.metadata.create_all(engine)Session =sessionmaker(bind=engine)
session =Session()# ########## 增 ##########
# u = User(id=2, name='sb')
# session.add(u)
# session.add_all([
#     User(id=3, name='sb'),
#     User(id=4, name='sb')
# ])
# session.commit()# ########## 删除 ##########
# session.query(User).filter(User.id > 2).delete()
# session.commit()# ########## 修改 ##########
# session.query(User).filter(User.id > 2).update({'cluster_id' : 0})
# session.commit()
# ########## 查 ##########
# ret = session.query(User).filter_by(name='sb').first()# ret = session.query(User).filter_by(name='sb').all()
# print ret# ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()
# print ret# ret = session.query(User.name.label('name_label')).all()
# print ret,type(ret)# ret = session.query(User).order_by(User.id).all()
# print ret# ret = session.query(User).order_by(User.id)[1:3]
# print ret
# session.commit()

 

 

  

  

  

转载于:https://www.cnblogs.com/wudalang/p/5700242.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/573313.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

php symbol table,gdb方式遍历EG(symbol_table) 哈希表的key_PHP教程

Sara Golemon写过一篇文章&#xff0c;里面提到&#xff1a;“是否存在特别的地方可以找到GLOBALS数组&#xff1f;”答案是“存在”&#xff0c;就是EG(symbol_table)&#xff0d;Executor Globals结构&#xff0c;她也给出了找的具体实例&#xff0c;如下PHP_FUNCTION(confir…

常用快捷键归纳

CtrlX     剪贴 CtrlC    复制 CtrlV    粘贴 CtrlD    删除对应行 CtrlA    全选 CtrlZ    撤销 Alt↑    两行代码互换 CtrlAltX J 运行代码 CtrlshiftO 添加缺少的包&#xff0c; Ctrlshift↓  向下复制本行代码转载于:https://www.cnblogs.c…

php edm 系统,edm.php

// ----------------------------------------------------------------------// | 科创众达// ----------------------------------------------------------------------// | Copyright (c) 2011 http://ctrlcoo.com All rights reserved.// -------------------------------…

jQuery选择id属性带有.点符号元素的方法

如果jquery要选择的元素id中带有点符号&#xff0c;在选择时需要在点前面加上两个反斜杠&#xff0c;如&#xff1a;$("#address\\.street").text("Enter this field");<div id"address.street"> http://www.jb51.net </div> 转载于…

单账户登录踢人 php,踢人下线

前言 在java的世界里&#xff0c;有很多优秀的权限认证框架&#xff0c;如Apache Shiro、Spring Security 等等。这些框架背景强大&#xff0c;历史悠久&#xff0c;其生态也比较齐全。 但同时这些框架也并非十分完美&#xff0c;在前后台分离已成标配的互联网时代&#xff0c;…

python 基础,包括列表,元组,字典,字符串,set集合,while循环,for循环,运算符。...

1.continue 的作用&#xff1a;跳出一次循环&#xff0c;进行下一次循环2.break 跳出不再循环3.常量 &#xff08;全是大写&#xff09;NAME cjk 一般改了会出错4.python的第三方库&#xff0c;先安装再导入。装模块&#xff1a;pip 命令例如&#xff1a;pip install pandas5…

10054 java,为什么Socket.Receive在远程主机断开连接时抛出SocketException(10054)?

我以前用C编写套接字程序&#xff0c;无法理解为什么会发生这种情况 .我的服务器在接收调用时阻塞&#xff0c;当它返回0时&#xff0c;我打破了while循环并关闭了线程 .public class MyServer {public MyServer() {}public void Init() {ThreadPool.QueueUserWorkItem(StartLi…

SQL 截取字符

select SUBSTRING(123,abcdefg,charindex(,,123,abcdefg,0)1,LEN(123,abcdefg)-charindex(,,123,abcdefg,0)) select SUBSTRING(123,abcdefg,0,charindex(,,123,abcdefg,0))转载于:https://www.cnblogs.com/kunEssay/p/5726672.html

hdoj 1004 学习思路

hdoj 1004题目大概讲的是&#xff0c;将输入的字符串根据输入次数多少&#xff0c;输出出现次数最多的字符串。 题目逻辑很简单&#xff0c;就是需要选择相应的数据结构&#xff0c;看了别人提交的discuss&#xff0c;明显发现可以使用多种数据结构解这道题。 其实我本是打算用…

oracle 关闭数据库实列,Oracle 11g 数据库启动和关闭

Oracles11数据库的启动状态Oracle11g在启动的时候必须经过三个状态&#xff1a;NOMOUNT,MOUNT,OPEN。NOMOUNT: 此状态下只打开数据库实例&#xff0c;读取参数文件。MOUNT: 根据参数文件信息&#xff0c;打开控制文件。读取控制文件中的各种信息&#xff0c;如数据文件位置&…

JavaScript进阶(下)

指定分隔符连接数组元素join() join()方法用于把数组中的所有元素放入一个字符串。元素是通过指定的分隔符进行分隔的。 语法&#xff1a; arrayObject.join(分隔符) 参数说明: 注意&#xff1a;返回一个字符串&#xff0c;该字符串把数组中的各个元素串起来&#xff0c;用<…

oracle报无效月份 注册表,在oracle中插入时间时出现“无效的月份”解决方法

这个问题是我曾经在使用中遇到的&#xff0c;在网上搜了一下&#xff0c;发现很多人都遇到过&#xff0c;并且也说明了很多解决方法。引起这个问题是有很多种可能的&#xff0c;现在我将在网上收集的资料结合我自己的解决经验总结一下&#xff0c;希望对大家有帮助。我用的是or…

oracle 用户禁止登录,[转] oracle限制用户在某个时间段内禁止登录数据库

原文: http://blog.itpub.net/29371470/viewspace-1081319/ [oraclerhel ~]$ sqlplus / as sysdba SQL*Plus: Release 10.2.0.5.0 - Production on Sat Feb 8 12:51:15 2014 Copyright (c) 1982, 2010, Oracle. All Rights Reserved. Connected to: Oracle Database 10g Enter…

ongl 表达式

struts.xml简单配置 <!-- &#xff08;默认false&#xff09;设置ognl表达式是否支持静态方法 --><constant name"struts.ognl.allowStaticMethodAccess" value"true"></constant><package name"ognl" namespace"/ogn…

oracle dd if=/dev/zero of=/dev,【转】dd if=/dev/zero of=的含义是什么?Linux 下的dd命令使用详解...

一、dd命令的解释dd&#xff1a;用指定大小的块拷贝一个文件&#xff0c;并在拷贝的同时进行指定的转换。注意&#xff1a;指定数字的地方若以下列字符结尾&#xff0c;则乘以相应的数字&#xff1a;b512&#xff1b;c1&#xff1b;k1024&#xff1b;w2参数注释&#xff1a;1. …

Python开发-- Lesson 2--Python数据类型(2016/07/30)

1、文件操作 python中对文件、文件夹&#xff08;文件操作函数&#xff09;的操作需要涉及到os模块和shutil模块。 得到当前工作目录&#xff0c;即当前Python脚本工作的目录路径: os.getcwd() 返回指定目录下的所有文件和目录名:os.listdir() 函数用来删除一个文件:os.remove(…

oracle什么是重复组,规范化:“重复组”是什么意思?

扬帆大鱼英语的价值一次又一次地重复。这是重复组吗&#xff1f;不。在SUBJECT_MODULE中英语的多次出现不是重复组&#xff0c;甚至不是人们误认为重复组的两件事中的任何一个。它们也不是冗余或缺乏规范化的证据。这样的多个外观可能与冗余或规范化有关&#xff0c;但是在没有…

清除浮动php,CSS清除浮动

今天看到一篇文章关于清除浮动的&#xff0c;突然间脑袋短路了&#xff0c;咦&#xff1f;为什么要清除浮动&#xff1f;原谅我的无知&#xff0c;搜了下原来是这样&#xff0c;又倒腾出原来的笔记&#xff0c;唉&#xff0c;本来就有记录啊&#xff0c;而且也会经常用到&#…

Linux下使用Speedtest测试网速

导读Speedtest是用来测试网络性能的开源软件&#xff0c;在Linux下面安装Speedtest可以用来测试网络出口的上传和下载速度&#xff0c;帮助排查网络方面导致的故障。Speedtest介绍由于公司几个项目用户访问的时候响应较慢&#xff0c;项目本身没问题&#xff0c;服务及调用的接…

oracle leg函数,oracle对象 约束索引 游标 函数

约束视图:视图是存储在数据库中的查询的SQL 语句,视图是一个虚拟表&#xff0c;其内容由查询定义。视图就如同一张表一样&#xff0c;对表能够进行的一般操作都可以应用于视图&#xff0c;例如查询&#xff0c;插入&#xff0c;修改&#xff0c;删除操作等。当对通过视图看到的…